Skip to content

Commit

Permalink
filelock for unseen mail fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
inkhey committed Dec 12, 2017
1 parent ef63bbe commit 80ec9e0
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 10 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -70,6 +70,7 @@ wsgidav.conf
# Temporary files
*~
*.sqlite
*.lock

# npm packages
/node_modules/
Expand Down
1 change: 1 addition & 0 deletions install/requirements.txt
Expand Up @@ -67,3 +67,4 @@ rq==0.7.1
click==6.7
markdown==2.6.9
email_reply_parser==0.5.9
filelock==2.0.13
3 changes: 3 additions & 0 deletions tracim/development.ini.base
Expand Up @@ -228,6 +228,9 @@ email.reply.token = mysecuretoken
email.reply.check.heartbeat = 60
email.reply.use_html_parsing = true
email.reply.use_txt_parsing = true
# Lockfile path is required for email_reply feature,
# it's just an empty file use to prevent concurrent access to imap unseen mail
email.reply.filelock_path = %(here)s/email_fetcher.lock

## Radical (CalDav server) configuration
# radicale.server.host = 0.0.0.0
Expand Down
3 changes: 3 additions & 0 deletions tracim/tracim/config/app_cfg.py
Expand Up @@ -392,6 +392,9 @@ def __init__(self):
'email.reply.use_txt_parsing',
True,
))
self.EMAIL_REPLY_FILELOCK_PATH = tg.config.get(
'email.reply.filelock_path',
)

self.TRACKER_JS_PATH = tg.config.get(
'js_tracker_path',
Expand Down
1 change: 1 addition & 0 deletions tracim/tracim/lib/daemons.py
Expand Up @@ -179,6 +179,7 @@ def run(self) -> None:
token=cfg.EMAIL_REPLY_TOKEN,
use_html_parsing=cfg.EMAIL_REPLY_USE_HTML_PARSING,
use_txt_parsing=cfg.EMAIL_REPLY_USE_TXT_PARSING,
filelock_path=cfg.EMAIL_REPLY_FILELOCK_PATH,
)
self._fetcher.run()

Expand Down
65 changes: 55 additions & 10 deletions tracim/tracim/lib/email_fetcher.py
Expand Up @@ -10,6 +10,7 @@
from email.message import Message
from email.utils import parseaddr

import filelock
import markdown
import requests
from email_reply_parser import EmailReplyParser
Expand All @@ -21,6 +22,8 @@
CONTENT_TYPE_TEXT_PLAIN = 'text/plain'
CONTENT_TYPE_TEXT_HTML = 'text/html'

IMAP_SEEN_FLAG = '\\Seen'
IMAP_CHECKED_FLAG = '\\Flagged'

class MessageContainer(object):
def __init__(self, message: Message, uid: int) -> None:
Expand Down Expand Up @@ -153,6 +156,7 @@ def __init__(
token: str,
use_html_parsing: bool,
use_txt_parsing: bool,
filelock_path: str,
) -> None:
"""
Fetch mail from a mailbox folder through IMAP and add their content to
Expand Down Expand Up @@ -182,7 +186,7 @@ def __init__(
self.token = token
self.use_html_parsing = use_html_parsing
self.use_txt_parsing = use_txt_parsing

self.lock = filelock.FileLock(filelock_path)
self._is_active = True

def run(self) -> None:
Expand All @@ -192,7 +196,8 @@ def run(self) -> None:
time.sleep(self.delay)
try:
self._connect()
messages = self._fetch()
with self.lock.acquire(timeout=10):
messages = self._fetch()
cleaned_mails = [DecodedMail(m.message, m.uid)
for m in messages]
self._notify_tracim(cleaned_mails)
Expand All @@ -204,6 +209,7 @@ def run(self) -> None:

def stop(self) -> None:
self._is_active = False
del self.lock

def _connect(self) -> None:
# TODO - G.M - 2017-11-15 Verify connection/disconnection
Expand Down Expand Up @@ -262,6 +268,7 @@ def _fetch(self) -> typing.List[MessageContainer]:
# Unseen file or All file from a directory (old one should be
# moved/ deleted from mailbox during this process) ?
logger.debug(self, 'Fetch unseen messages')

rv, data = self._connection.search(None, "(UNSEEN)")
logger.debug(self, 'Response status {}'.format(
rv,
Expand All @@ -287,6 +294,7 @@ def _fetch(self) -> typing.List[MessageContainer]:
msg = message_from_bytes(data[0][1])
msg_container = MessageContainer(msg, uid)
messages.append(msg_container)
self._set_flag(uid, IMAP_SEEN_FLAG)
else:
log = 'IMAP : Unable to get mail : {}'
logger.error(self, log.format(str(rv)))
Expand All @@ -301,7 +309,7 @@ def _fetch(self) -> typing.List[MessageContainer]:
def _notify_tracim(
self,
mails: typing.List[DecodedMail],
) -> typing.List[DecodedMail]:
) -> None:
"""
Send http request to tracim endpoint
:param mails: list of mails to send
Expand Down Expand Up @@ -341,29 +349,66 @@ def _notify_tracim(
str(r.status_code),
details,
))
# Flag all correctly checked mail, unseen the others
if r.status_code in [200, 204, 400]:
self._set_flag(mail.uid, IMAP_CHECKED_FLAG)
else:
self._set_flag(mail.uid)
self._unset_flag(mail.uid, IMAP_SEEN_FLAG)
# TODO - G.M - Verify exception correctly works
except requests.exceptions.Timeout as e:
log = 'Timeout error to transmit fetched mail to tracim : {}'
logger.error(self, log.format(str(e)))
unsended_mails.append(mail)
self._unset_flag(mail.uid, IMAP_SEEN_FLAG)
except requests.exceptions.RequestException as e:
log = 'Fail to transmit fetched mail to tracim : {}'
logger.error(self, log.format(str(e)))
self._unset_flag(mail.uid, IMAP_SEEN_FLAG)

return unsended_mails

def _set_flag(self, uid):
def _set_flag(
self,
uid: int,
flag: str,
) -> None:
assert uid is not None

rv, data = self._connection.store(
uid,
'+FLAGS',
'\\Seen'
flag,
)
if rv == 'OK':
log = 'Message {uid} set as {flag}.'.format(
uid=uid,
flag=flag)
logger.debug(self, log)
else:
log = 'Can not set Message {uid} as {flag} : {rv}'.format(
uid=uid,
flag=flag,
rv=rv)
logger.error(self, log)

def _unset_flag(
self,
uid: int,
flag: str,
) -> None:
assert uid is not None

rv, data = self._connection.store(
uid,
'-FLAGS',
flag,
)
if rv == 'OK':
log = 'Message {} set as seen.'.format(uid)
log = 'Message {uid} unset as {flag}.'.format(
uid=uid,
flag=flag)
logger.debug(self, log)
else:
log = 'Can not set Message {} as seen : {}'.format(uid, rv)
log = 'Can not unset Message {uid} as {flag} : {rv}'.format(
uid=uid,
flag=flag,
rv=rv)
logger.error(self, log)

0 comments on commit 80ec9e0

Please sign in to comment.