Skip to content
This repository has been archived by the owner on Oct 31, 2020. It is now read-only.

Commit

Permalink
Merge pull request #3 from willsowerbutts/master
Browse files Browse the repository at this point in the history
Patches to improve python-slimta-diskstorage
  • Loading branch information
icgood committed Feb 6, 2017
2 parents e24896a + bf3bc60 commit d3d4c14
Showing 1 changed file with 22 additions and 9 deletions.
31 changes: 22 additions & 9 deletions slimta/diskstorage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from pyaio import aio_read, aio_write
import gevent
from gevent.event import AsyncResult
from gevent.lock import Semaphore

from slimta.queue import QueueStorage
from slimta import logging
Expand All @@ -54,6 +55,7 @@ class AioFile(object):

_keep_awake_thread = None
_keep_awake_refs = 0
_keep_awake_lock = Semaphore(1)

chunk_size = (16 << 10)

Expand All @@ -63,16 +65,24 @@ def __init__(self, path, tmp_dir=None):

@classmethod
def _start_keep_awake_thread(cls):
if not cls._keep_awake_thread:
cls._keep_awake_thread = gevent.spawn(cls._keep_awake)
cls._keep_awake_refs += 1
cls._keep_awake_lock.acquire()
try:
if not cls._keep_awake_thread:
cls._keep_awake_thread = gevent.spawn(cls._keep_awake)
cls._keep_awake_refs += 1
finally:
cls._keep_awake_lock.release()

@classmethod
def _stop_keep_awake_thread(cls):
cls._keep_awake_refs -= 1
if cls._keep_awake_refs <= 0:
cls._keep_awake_thread.kill()
cls._keep_awake_thread = None
cls._keep_awake_lock.acquire()
try:
cls._keep_awake_refs -= 1
if cls._keep_awake_refs <= 0:
cls._keep_awake_thread.kill()
cls._keep_awake_thread = None
finally:
cls._keep_awake_lock.release()

@classmethod
def _keep_awake(cls):
Expand Down Expand Up @@ -254,8 +264,11 @@ def set_recipients_delivered(self, id, rcpt_indexes):

def load(self):
for id in self.ops.get_ids():
meta = self.ops.read_meta(id)
yield (meta['timestamp'], id)
try:
meta = self.ops.read_meta(id)
yield (meta['timestamp'], id)
except OSError:
logging.log_exception(__name__, queue_id=id)

def get(self, id):
meta = self.ops.read_meta(id)
Expand Down

0 comments on commit d3d4c14

Please sign in to comment.