/
tasks.py
195 lines (165 loc) · 5.77 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# Copyright (C) 2012-2013 Luis Pedro Coelho <luis@luispedro.org>
# This file is part of rbit mail.
from __future__ import print_function
from PySide import QtCore
import Queue
from rbit import signals
_imap_manager = None
def get_imap():
# By importing only inside the function, we avoid having to import at
# programme start up
from resources import imap_manager
global _imap_manager
if _imap_manager is None:
_imap_manager = imap_manager()
return _imap_manager.get()
def run_from_queue(group, q):
from gevent import monkey
# thread needs to be left alone as Qt handles those
monkey.patch_all(thread=False)
for message in q:
if message == 'quit':
group.kill()
return
group.spawn(message)
def transfer_queue(q, gq):
import gevent
while True:
try:
message = q.get_nowait()
gq.put(message)
if message == 'quit':
return
except Queue.Empty:
gevent.sleep(1.)
class GEventLoop(QtCore.QThread):
def __init__(self, parent=None):
super(GEventLoop, self).__init__(parent)
self.q = Queue.Queue()
def spawn(self, f, *args, **kwargs):
self.q.put(lambda: f(*args, **kwargs))
def run(self):
import gevent.queue
from gevent.pool import Group
gq = gevent.queue.Queue()
group = Group()
group.spawn(transfer_queue, self.q, gq)
run_from_queue(group, gq)
@QtCore.Slot()
def kill(self):
self.q.put('quit')
self.wait()
class RBitTask(QtCore.QObject):
error = QtCore.Signal(str)
status = QtCore.Signal(str)
progress = QtCore.Signal(int, int)
done = QtCore.Signal()
def __init__(self, *args, **kwargs):
super(RBitTask, self).__init__(*args, **kwargs)
self.dead = False
def schedule_death(self):
'''
task.schedule_death()
This method should cause the task to complete at its earliest
convenience. There are no garantees on how long this should take,
however.
'''
self.dead = True
def perform(self):
try:
self._perform()
self.done.emit()
except Exception as err:
from sys import stderr
import traceback
self.error.emit(str(err))
traceback.print_exc(file=stderr)
class RetrainFolderModel(RBitTask):
def _perform(self):
from rbit.ml import train, predict
self.status.emit('Retraining folder model...')
train.retrain_folder_model()
predict.init()
class UpdateMessages(RBitTask):
def __init__(self, parent):
super(UpdateMessages, self).__init__(parent)
signals.register(signals.STATUS, self.get_status)
def get_status(self, code, message):
if code == 'imap-update':
self.status.emit(message)
def _perform(self):
from rbit.sync import update_all_folders
from rbit import config
from rbit import backend
from rbit import imap
from rbit.ml import predict
from rbglobals import index
cfg = config.Config('config', backend.create_session)
self.status.emit('Updating messages from %s' % cfg.get('account', 'host'))
signals.register(signals.NEW_MESSAGE, predict.predict_inbox, replace_all=True)
index.register()
client = imap.IMAPClient.from_config(cfg)
update_all_folders(client)
client.close()
class MoveMessage(RBitTask):
def __init__(self, parent, message, target):
super(MoveMessage, self).__init__(parent)
self.folder = message.folder
self.uid = message.uid
self.target = target
def _perform(self):
with get_imap() as client:
client.select_folder(self.folder)
client.move_messages([self.uid], self.target)
client.expunge()
class TrashMessage(MoveMessage):
def __init__(self, parent, message):
MoveMessage.__init__(self, parent, message, 'INBOX.Trash')
class PredictMessages(RBitTask):
def __init__(self, parent, account, uids):
super(PredictMessages, self).__init__(parent)
self.account = account
self.uids = uids
def _perform(self):
from rbit.ml import predict
from rbit.backend import create_session
from rbit.models import Message
session = create_session()
for uid in self.uids:
message = session.\
query(Message).\
filter_by(account=self.account).\
filter_by(uid=uid).\
filter_by(folder=u'INBOX').\
one()
if message is not None:
predict.predict_inbox(message, u'INBOX', uid, session=session)
session.commit()
def paginate(xs, n):
s = 0
while s < len(xs):
yield xs[s:s+n]
s += n
class ReindexMessages(RBitTask):
def _perform(self):
import rbglobals
from rbit import index as rbit_index
from rbit.backend import create_session
from rbit.models import Message
from rbit.imap import breakup
from gevent import sleep
rbglobals.index.close()
rbit_index.remove_index()
rbglobals.index = rbit_index.get_index()
session = create_session()
mids = session.query(Message.mid).all()
STEP = 256
for done,ms in enumerate(paginate(mids, STEP)):
self.progress.emit(done*STEP, len(mids))
messages = [Message.load_by_mid(m, create_session=(lambda:session)) for m in ms]
rbglobals.index.add(messages)
session.expunge_all()
if self.dead:
break
sleep()
self.status.emit('Message reindexing complete')