Skip to content

Commit

Permalink
1st pass at refactoring mail downloads to index immediately
Browse files Browse the repository at this point in the history
  • Loading branch information
BjarniRunar committed Oct 23, 2014
1 parent d072c8c commit 2e1e3e2
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 81 deletions.
6 changes: 6 additions & 0 deletions mailpile/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,7 @@ def __init__(self, workdir=None, rules={}):
self.http_worker = None
self.dumb_worker = DumbWorker('Dumb worker', None)
self.slow_worker = self.dumb_worker
self.scan_worker = self.dumb_worker
self.save_worker = self.dumb_worker
self.async_worker = self.dumb_worker
self.other_workers = []
Expand Down Expand Up @@ -1837,6 +1838,9 @@ def start_httpd(sspec=None):
if config.slow_worker == config.dumb_worker:
config.slow_worker = Worker('Slow worker', session)
config.slow_worker.start()
if config.scan_worker == config.dumb_worker:
config.scan_worker = Worker('Scan worker', session)
config.scan_worker.start()
if config.async_worker == config.dumb_worker:
config.async_worker = Worker('Async worker', session)
config.async_worker.start()
Expand Down Expand Up @@ -1900,10 +1904,12 @@ def stop_workers(config):
[config.http_worker,
config.async_worker,
config.slow_worker,
config.scan_worker,
config.cron_worker])
config.other_workers = []
config.http_worker = config.cron_worker = None
config.slow_worker = config.dumb_worker
config.scan_worker = config.dumb_worker
config.async_worker = config.dumb_worker

for wait in (False, True):
Expand Down
70 changes: 45 additions & 25 deletions mailpile/mail_source/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class BaseMailSource(threading.Thread):
DEFAULT_JITTER = 15 # Fudge factor to tame thundering herds
SAVE_STATE_INTERVAL = 3600 # How frequently we pickle our state
INTERNAL_ERROR_SLEEP = 900 # Pause time on error, in seconds
RESCAN_BATCH_SIZE = 100 # Index at most this many new e-mails at once
RESCAN_BATCH_SIZE = 250 # Index at most this many new e-mails at once
MAX_MAILBOXES = 100 # Max number of mailboxes we add
MAX_PATHS = 5000 # Abort if asked to scan too many directories

Expand Down Expand Up @@ -480,7 +480,8 @@ def interrupt_rescan(self, reason):
def _process_new(self, msg, msg_ts, keywords, snippet):
return ProcessNew(self.session, msg, msg_ts, keywords, snippet)

def _copy_new_messages(self, mbx_key, mbx_cfg, stop_after=-1):
def _copy_new_messages(self, mbx_key, mbx_cfg,
stop_after=-1, scan_args=None):
session, config = self.session, self.session.config
self.event.data['copying'] = progress = {
'running': True,
Expand All @@ -489,11 +490,13 @@ def _copy_new_messages(self, mbx_key, mbx_cfg, stop_after=-1):
'copied_bytes': 0,
'complete': False
}
scan_args = scan_args or {}
count = 0
try:
src = config.open_mailbox(session, mbx_key, prefer_local=False)
loc = config.open_mailbox(session, mbx_key, prefer_local=True)
if src == loc:
return
return count

keys = list(src.iterkeys())
progress.update({
Expand All @@ -502,25 +505,32 @@ def _copy_new_messages(self, mbx_key, mbx_cfg, stop_after=-1):
})
for key in keys:
if self._check_interrupt(clear=False):
return
return count
play_nice_with_threads()
if key not in loc.source_map:
session.ui.mark(_('Copying message: %s') % key)
data = src.get_bytes(key)
loc.add_from_source(key, data)
loc_key = loc.add_from_source(key, data)
self.event.data['counters']['copied_messages'] += 1
progress['copied_messages'] += 1
progress['copied_bytes'] += len(data)

# This forks off a scan job to index the message
config.index.scan_one_message(
session, mbx_key, loc, loc_key,
wait=False, **scan_args)

stop_after -= 1
if stop_after == 0:
return
return count
progress['complete'] = True
except IOError:
# These just abort the download/read, which we're going to just
# take in stride for now.
pass
finally:
progress['running'] = False
return count

def rescan_mailbox(self, mbx_key, mbx_cfg, path, stop_after=None):
session, config = self.session, self.session.config
Expand All @@ -534,16 +544,6 @@ def rescan_mailbox(self, mbx_key, mbx_cfg, path, stop_after=None):
try:
ostate, self._state = self._state, 'Rescan(%s, %s)' % (mbx_key,
stop_after)
if mbx_cfg.local or self.my_config.discovery.local_copy:
# Note: We copy fewer messages than the batch allows for,
# because we might have been aborted on an earlier run and
# the rescan may need to catch up. We also start with smaller
# batch sizes, because folks are impatient.
self._log_status(_('Copying mail: %s') % path)
self._create_local_mailbox(mbx_cfg)
max_copy = min(self._loop_count,
int(1 + stop_after / (mailboxes + 1)))
self._copy_new_messages(mbx_key, mbx_cfg, stop_after=max_copy)

with self._lock:
apply_tags = mbx_cfg.apply_tags[:]
Expand All @@ -553,20 +553,40 @@ def rescan_mailbox(self, mbx_key, mbx_cfg, path, stop_after=None):
if tid:
apply_tags.append(tid)

scan_mailbox_args = {
'process_new': (mbx_cfg.process_new and
self._process_new or False),
'apply_tags': (apply_tags or []),
'stop_after': stop_after,
'event': self.event
}
count = 0

if mbx_cfg.local or self.my_config.discovery.local_copy:
# Note: We copy fewer messages than the batch allows for,
# because we might have been aborted on an earlier run and
# the rescan may need to catch up. We also start with smaller
# batch sizes, because folks are impatient.
self._log_status(_('Copying mail: %s') % path)
self._create_local_mailbox(mbx_cfg)
max_copy = min(self._loop_count * 10,
int(1 + stop_after / (mailboxes + 1)))
count += self._copy_new_messages(mbx_key, mbx_cfg,
stop_after=max_copy,
scan_args=scan_mailbox_args)
# Wait for background message scans to complete...
config.scan_worker.do(session, 'Wait', lambda: 1)

play_nice_with_threads()
self._log_status(_('Rescanning: %s') % path)
if 'rescans' in self.event.data:
self.event.data['rescans'][:-mailboxes] = []

return config.index.scan_mailbox(
session, mbx_key, mbx_cfg.local or path,
config.open_mailbox,
process_new=(mbx_cfg.process_new and
self._process_new or False),
apply_tags=(apply_tags or []),
stop_after=stop_after,
event=self.event)

return count + config.index.scan_mailbox(session,
mbx_key,
mbx_cfg.local or path,
config.open_mailbox,
**scan_mailbox_args)
except ValueError:
session.ui.debug(traceback.format_exc())
return -1
Expand Down
143 changes: 91 additions & 52 deletions mailpile/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,26 +544,34 @@ def get_msg_id(self, msg, msg_ptr):
print _('WARNING: No proper Message-ID for %s') % msg_ptr
return self.encode_msg_id(raw_msg_id or msg_ptr)

def scan_mailbox(self, session, mailbox_idx, mailbox_fn, mailbox_opener,
process_new=None, apply_tags=None, stop_after=None,
editable=False, event=None):
def _get_scan_progress(self, mailbox_idx, event=None, reset=False):
if event and 'rescans' not in event.data:
event.data['rescans'] = []
event.data['rescan'] = progress = {}
if reset:
event.data['rescan'] = {}
progress = event.data['rescan']
else:
progress = {}
reset = True
if reset:
progress.update({
'running': True,
'complete': False,
'mailbox_id': mailbox_idx,
'errors': [],
'added': 0,
'updated': 0,
'total': 0,
'batch_size': 0
})
return progress

def scan_mailbox(self, session, mailbox_idx, mailbox_fn, mailbox_opener,
process_new=None, apply_tags=None, stop_after=None,
editable=False, event=None):
mailbox_idx = FormatMbxId(mailbox_idx)
progress.update({
'running': True,
'complete': False,
'mailbox_id': mailbox_idx,
'errors': [],
'added': 0,
'updated': 0,
'total': 0,
'batch_size': 0
})
progress = self._get_scan_progress(mailbox_idx,
event=event, reset=True)

def finito(code, message, **kwargs):
if event:
Expand Down Expand Up @@ -644,44 +652,19 @@ def parse_status(ui):
session.ui.mark(parse_status(ui))

# Message new or modified, let's parse it.
if 'rescan' in session.config.sys.debug:
session.ui.debug('Reading message %s/%s' % (mailbox_idx, i))
try:
msg_fd = mbox.get_file(i)
msg = ParseMessage(msg_fd,
pgpmime=session.config.prefs.index_encrypted,
config=session.config)
except (IOError, OSError, ValueError, IndexError, KeyError):
if session.config.sys.debug:
traceback.print_exc()
progress['errors'].append(i)
session.ui.warning(('Reading message %s/%s FAILED, skipping'
) % (mailbox_idx, i))
continue

optimize = False
msg_id = self.get_msg_id(msg, msg_ptr)
if msg_id in self.MSGIDS:
with self._lock:
self._update_location(session, self.MSGIDS[msg_id], msg_ptr)
updated += 1
else:
msg_info = self._index_incoming_message(
session, msg_id, msg_ptr, msg_fd.tell(), msg,
last_date + 1, mailbox_idx, process_new, apply_tags)
last_date = long(msg_info[self.MSG_DATE], 36)
optimize = True
added += 1

play_nice_with_threads()
if optimize:
GlobalPostingList.Optimize(session, self,
lazy=True, quick=True)

progress.update({
'added': added,
'updated': updated,
})
last_date, a, u = self.scan_one_message(session,
mailbox_idx, mbox, i,
msg_ptr=msg_ptr,
last_date=last_date,
wait=True,
process_new=process_new,
apply_tags=apply_tags,
stop_after=stop_after,
editable=editable,
event=event,
progress=progress)
added += a
updated += u

with self._lock:
for msg_ptr in self.PTRS.keys():
Expand All @@ -704,6 +687,62 @@ def parse_status(ui):
updated=updated,
complete=(messages_md5 != not_done_yet))

def scan_one_message(self, session, mailbox_idx, mbox, msg_mbox_key,
wait=False, **kwargs):
args = [session, mailbox_idx, mbox, msg_mbox_key]
task = 'scan:%s/%s' % (mailbox_idx, msg_mbox_key)
if wait:
return session.config.scan_worker.do(
session, task, lambda: self._real_scan_one(*args, **kwargs))
else:
session.config.scan_worker.add_task(
session, task, lambda: self._real_scan_one(*args, **kwargs))
return 0, 0, 0

def _real_scan_one(self, session,
mailbox_idx, mbox, msg_mbox_idx,
msg_ptr=None, last_date=None,
process_new=None, apply_tags=None, stop_after=None,
editable=False, event=None, progress=None):
added = updated = 0
msg_ptr = msg_ptr or mbox.get_msg_ptr(mailbox_idx, msg_mbox_idx)
last_date = last_date or long(time.time())
progress = progress or self._get_scan_progress(mailbox_idx,
event=event)

if 'rescan' in session.config.sys.debug:
session.ui.debug('Reading message %s/%s'
% (mailbox_idx, msg_mbox_idx))
try:
msg_fd = mbox.get_file(msg_mbox_idx)
msg = ParseMessage(msg_fd,
pgpmime=session.config.prefs.index_encrypted,
config=session.config)
except (IOError, OSError, ValueError, IndexError, KeyError):
if session.config.sys.debug:
traceback.print_exc()
progress['errors'].append(i)
session.ui.warning(('Reading message %s/%s FAILED, skipping'
) % (mailbox_idx, msg_mbox_idx))
return last_date, added, updated

msg_id = self.get_msg_id(msg, msg_ptr)
if msg_id in self.MSGIDS:
with self._lock:
self._update_location(session, self.MSGIDS[msg_id], msg_ptr)
updated += 1
else:
msg_info = self._index_incoming_message(
session, msg_id, msg_ptr, msg_fd.tell(), msg,
last_date + 1, mailbox_idx, process_new, apply_tags)
last_date = long(msg_info[self.MSG_DATE], 36)
added += 1

play_nice_with_threads()
progress['added'] += added
progress['updated'] += updated
return last_date, added, updated

def edit_msg_info(self, msg_info,
msg_mid=None, raw_msg_id=None, msg_id=None, msg_ts=None,
msg_from=None, msg_subject=None, msg_body=None,
Expand Down
11 changes: 7 additions & 4 deletions mailpile/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,20 @@ def __str__(self):
time.time() - self.last_run,
len(self.JOBS)))

def add_task(self, session, name, task, unique=False):
def add_task(self, session, name, task, unique=False, first=False):
with self.LOCK:
if unique:
for s, n, t in self.JOBS:
if n == name:
return
self.JOBS.append((session, name, task))
if first:
self.JOBS.append((session, name, task))
else:
self.JOBS[:0] = [(session, name, task)]
self.LOCK.notify()

def add_unique_task(self, session, name, task):
return self.add_task(session, name, task, unique=True)
def add_unique_task(self, session, name, task, **kwargs):
return self.add_task(session, name, task, unique=True, **kwargs)

def do(self, session, name, task, unique=False):
if session and session.main:
Expand Down

0 comments on commit 2e1e3e2

Please sign in to comment.