-
Notifications
You must be signed in to change notification settings - Fork 17
/
lib.py
439 lines (375 loc) · 14.6 KB
/
lib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
# -*- coding: utf-8 -*-
import os
import re
import socket
import struct
from email.utils import parseaddr
from functools import wraps
import idna
from django.conf import settings
from django.contrib.auth.views import redirect_to_login
from django.urls import reverse
from django.utils.translation import gettext as _
from modoboa.admin import models as admin_models
from modoboa.lib.email_utils import (
split_address, split_local_part, split_mailbox
)
from modoboa.lib.exceptions import InternalError
from modoboa.lib.sysutils import exec_cmd
from modoboa.lib.web_utils import NavigationParameters
from modoboa.parameters import tools as param_tools
from .models import Policy, Users
from .utils import smart_bytes, smart_str
def selfservice(ssfunc=None):
"""Decorator used to expose views to the 'self-service' feature
The 'self-service' feature allows users to act on quarantined
messages without beeing authenticated.
This decorator only acts as a 'router'.
:param ssfunc: the function to call if the 'self-service'
pre-requisites are satisfied
"""
def decorator(f):
@wraps(f)
def wrapped_f(request, *args, **kwargs):
secret_id = request.GET.get("secret_id")
if not secret_id and request.user.is_authenticated:
return f(request, *args, **kwargs)
if not param_tools.get_global_parameter("self_service"):
return redirect_to_login(
reverse("modoboa_amavis:index")
)
return ssfunc(request, *args, **kwargs)
return wrapped_f
return decorator
class AMrelease(object):
def __init__(self):
conf = dict(param_tools.get_global_parameters("modoboa_amavis"))
try:
if conf["am_pdp_mode"] == "inet":
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((conf["am_pdp_host"], conf["am_pdp_port"]))
else:
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.connect(conf["am_pdp_socket"])
except socket.error as err:
raise InternalError(
_("Connection to amavis failed: %s" % str(err))
)
def decode(self, answer):
def repl(match):
return struct.pack("B", int(match.group(0)[1:], 16))
return re.sub(br"%([0-9a-fA-F]{2})", repl, answer)
def __del__(self):
self.sock.close()
def sendreq(self, mailid, secretid, recipient, *others):
self.sock.send(
smart_bytes("""request=release
mail_id=%s
secret_id=%s
quar_type=Q
recipient=%s
""" % (smart_str(mailid), smart_str(secretid), smart_str(recipient))))
answer = self.sock.recv(1024)
answer = self.decode(answer)
if re.search(br"250 [\d\.]+ Ok", answer):
return True
return False
class SpamassassinClient(object):
"""A stupid spamassassin client."""
def __init__(self, user, recipient_db):
"""Constructor."""
conf = dict(param_tools.get_global_parameters("modoboa_amavis"))
self._sa_is_local = conf["sa_is_local"]
self._default_username = conf["default_user"]
self._recipient_db = recipient_db
self._setup_cache = {}
self._username_cache = []
if user.role == "SimpleUsers":
if conf["user_level_learning"]:
self._username = user.email
else:
self._username = None
self.error = None
if self._sa_is_local:
self._learn_cmd = self._find_binary("sa-learn")
self._learn_cmd += " --{0} --no-sync -u {1}"
self._learn_cmd_kwargs = {}
self._expected_exit_codes = [0]
self._sync_cmd = self._find_binary("sa-learn")
self._sync_cmd += " -u {0} --sync"
else:
self._learn_cmd = self._find_binary("spamc")
self._learn_cmd += " -d {0} -p {1}".format(
conf["spamd_address"], conf["spamd_port"]
)
self._learn_cmd += " -L {0} -u {1}"
self._learn_cmd_kwargs = {}
self._expected_exit_codes = [5, 6]
def _find_binary(self, name):
"""Find path to binary."""
code, output = exec_cmd("which {}".format(name))
if not code:
return smart_str(output).strip()
known_paths = getattr(settings, "SA_LOOKUP_PATH", ("/usr/bin", ))
for path in known_paths:
bpath = os.path.join(path, name)
if os.path.isfile(bpath) and os.access(bpath, os.X_OK):
return bpath
raise InternalError(_("Failed to find {} binary").format(name))
def _get_mailbox_from_rcpt(self, rcpt):
"""Retrieve a mailbox from a recipient address."""
local_part, domname, extension = (
split_mailbox(rcpt, return_extension=True))
try:
mailbox = admin_models.Mailbox.objects.select_related(
"domain").get(address=local_part, domain__name=domname)
except admin_models.Mailbox.DoesNotExist:
alias = admin_models.Alias.objects.filter(
address="{}@{}".format(local_part, domname),
aliasrecipient__r_mailbox__isnull=False).first()
if not alias:
raise InternalError(_("No recipient found"))
if alias.type != "alias":
return None
mailbox = alias.aliasrecipient_set.filter(
r_mailbox__isnull=False).first()
return mailbox
def _get_domain_from_rcpt(self, rcpt):
"""Retrieve a domain from a recipient address."""
local_part, domname = split_mailbox(rcpt)
domain = admin_models.Domain.objects.filter(name=domname).first()
if not domain:
raise InternalError(_("Local domain not found"))
return domain
def _learn(self, rcpt, msg, mtype):
"""Internal method to call the learning command."""
if self._username is None:
if self._recipient_db == "global":
username = self._default_username
elif self._recipient_db == "domain":
domain = self._get_domain_from_rcpt(rcpt)
username = domain.name
condition = (
username not in self._setup_cache and
setup_manual_learning_for_domain(domain))
if condition:
self._setup_cache[username] = True
else:
mbox = self._get_mailbox_from_rcpt(rcpt)
if mbox is None:
username = self._default_username
else:
if isinstance(mbox, admin_models.Mailbox):
username = mbox.full_address
elif isinstance(mbox, admin_models.AliasRecipient):
username = mbox.address
else:
username = None
condition = (
username is not None and
username not in self._setup_cache and
setup_manual_learning_for_mbox(mbox))
if condition:
self._setup_cache[username] = True
else:
username = self._username
if username not in self._setup_cache:
mbox = self._get_mailbox_from_rcpt(username)
if mbox and setup_manual_learning_for_mbox(mbox):
self._setup_cache[username] = True
if username not in self._username_cache:
self._username_cache.append(username)
cmd = self._learn_cmd.format(mtype, username)
code, output = exec_cmd(
cmd, pinput=smart_bytes(msg), **self._learn_cmd_kwargs)
if code in self._expected_exit_codes:
return True
self.error = smart_str(output)
return False
def learn_spam(self, rcpt, msg):
"""Learn new spam."""
return self._learn(rcpt, msg, "spam")
def learn_ham(self, rcpt, msg):
"""Learn new ham."""
return self._learn(rcpt, msg, "ham")
def done(self):
"""Call this method at the end of the processing."""
if self._sa_is_local:
for username in self._username_cache:
cmd = self._sync_cmd.format(username)
exec_cmd(cmd, **self._learn_cmd_kwargs)
class QuarantineNavigationParameters(NavigationParameters):
"""
Specific NavigationParameters subclass for the quarantine.
"""
def __init__(self, request):
super(QuarantineNavigationParameters, self).__init__(
request, "quarantine_navparams"
)
self.parameters += [
("pattern", "", False),
("criteria", "from_addr", False),
("msgtype", None, False),
("viewrequests", None, False)
]
def _store_page(self):
"""Specific method to store the current page."""
if self.request.GET.get("reset_page", None) or "page" not in self:
self["page"] = 1
else:
page = self.request.GET.get("page", None)
if page is not None:
self["page"] = int(page)
def back_to_listing(self):
"""Return the current listing URL.
Looks into the user's session and the current request to build
the URL.
:return: a string
"""
url = "listing"
params = []
navparams = self.request.session[self.sessionkey]
if "page" in navparams:
params += ["page=%s" % navparams["page"]]
if "order" in navparams:
params += ["sort_order=%s" % navparams["order"]]
params += ["%s=%s" % (p[0], navparams[p[0]])
for p in self.parameters if p[0] in navparams]
if params:
url += "?%s" % ("&".join(params))
return url
def create_user_and_policy(name, priority=7):
"""Create records.
Create two records (a user and a policy) using :keyword:`name` as
an identifier.
:param str name: name
:return: the new ``Policy`` object
"""
if Users.objects.filter(email=name).exists():
return Policy.objects.get(policy_name=name[:32])
policy = Policy.objects.create(policy_name=name[:32])
Users.objects.create(
email=name, fullname=name, priority=priority, policy=policy
)
return policy
def create_user_and_use_policy(name, policy, priority=7):
"""Create a *users* record and use an existing policy.
:param str name: user record name
:param str policy: string or Policy instance
"""
if isinstance(policy, str):
policy = Policy.objects.get(policy_name=policy[:32])
Users.objects.get_or_create(
email=name, fullname=name, priority=priority, policy=policy
)
def update_user_and_policy(oldname, newname):
"""Update records.
:param str oldname: old name
:param str newname: new name
"""
if oldname == newname:
return
u = Users.objects.get(email=oldname)
u.email = newname
u.fullname = newname
u.policy.policy_name = newname[:32]
u.policy.save(update_fields=["policy_name"])
u.save()
def delete_user_and_policy(name):
"""Delete records.
:param str name: identifier
"""
try:
u = Users.objects.get(email=name)
except Users.DoesNotExist:
return
u.policy.delete()
u.delete()
def delete_user(name):
"""Delete a *users* record.
:param str name: user record name
"""
try:
Users.objects.get(email=name).delete()
except Users.DoesNotExist:
pass
def manual_learning_enabled(user):
"""Check if manual learning is enabled or not.
Also check for :kw:`user` if necessary.
:return: True if learning is enabled, False otherwise.
"""
conf = dict(param_tools.get_global_parameters("modoboa_amavis"))
if not conf["manual_learning"]:
return False
if user.role != "SuperAdmins":
if user.has_perm("admin.view_domain"):
manual_learning = (
conf["domain_level_learning"] or conf["user_level_learning"])
else:
manual_learning = conf["user_level_learning"]
return manual_learning
return True
def setup_manual_learning_for_domain(domain):
"""Setup manual learning if necessary.
:return: True if learning has been setup, False otherwise
"""
if Policy.objects.filter(sa_username=domain.name).exists():
return False
policy = Policy.objects.get(policy_name="@{}".format(domain.name[:32]))
policy.sa_username = domain.name
policy.save()
return True
def setup_manual_learning_for_mbox(mbox):
"""Setup manual learning if necessary.
:return: True if learning has been setup, False otherwise
"""
result = False
if (isinstance(mbox, admin_models.AliasRecipient) and
mbox.r_mailbox is not None):
mbox = mbox.r_mailbox
if isinstance(mbox, admin_models.Mailbox):
pname = mbox.full_address[:32]
if not Policy.objects.filter(policy_name=pname).exists():
policy = create_user_and_policy(pname)
policy.sa_username = mbox.full_address
policy.save()
for alias in mbox.alias_addresses:
create_user_and_use_policy(alias, policy)
result = True
return result
def make_query_args(address, exact_extension=True, wildcard=None,
domain_search=False):
assert isinstance(address, str), "address should be of type str"
conf = dict(param_tools.get_global_parameters("modoboa_amavis"))
local_part, domain = split_address(address)
if not conf["localpart_is_case_sensitive"]:
local_part = local_part.lower()
if domain:
domain = domain.lstrip("@").rstrip(".")
domain = domain.lower()
orig_domain = domain
domain = idna.encode(domain, uts46=True).decode("ascii")
delimiter = conf["recipient_delimiter"]
local_part, extension = split_local_part(local_part, delimiter=delimiter)
query_args = []
if (
conf["localpart_is_case_sensitive"] or
(domain and domain != orig_domain)
):
query_args.append(address)
if extension:
query_args.append("%s%s%s@%s" % (
local_part, delimiter, extension, domain))
if delimiter and not exact_extension and wildcard:
query_args.append("%s%s%s@%s" % (
local_part, delimiter, wildcard, domain))
query_args.append("%s@%s" % (local_part, domain))
if domain_search:
query_args.append("@%s" % domain)
query_args.append("@.")
return query_args
def cleanup_email_address(address):
address = parseaddr(address)
if address[0]:
return "%s <%s>" % address
return address[1]