Permalink
Browse files

New DNS checks.

see #1618
  • Loading branch information...
tonioo committed Nov 19, 2018
1 parent 38408db commit 277602e3f0a8077dc156bf8dbcd6d38bcba1b0b4
@@ -51,6 +51,39 @@ class AdminParametersForm(param_forms.AdminParametersForm):
)
)
enable_spf_checks = YesNoField(
label=ugettext_lazy("Enable SPF checks"),
initial=True,
help_text=ugettext_lazy(
"Check if every domain has a valid SPF record"
)
)
enable_dkim_checks = YesNoField(
label=ugettext_lazy("Enable DKIM checks"),
initial=True,
help_text=ugettext_lazy(
"Check if every domain with DKIM signin enabled has a valid DNS "
"record"
)
)
enable_dmarc_checks = YesNoField(
label=ugettext_lazy("Enable DMARC checks"),
initial=True,
help_text=ugettext_lazy(
"Check if every domain has a valid DMARC record"
)
)
enable_autoconfig_checks = YesNoField(
label=ugettext_lazy("Enable autoconfig checks"),
initial=True,
help_text=ugettext_lazy(
"Check if every domain has a valid records for autoconfiguration"
)
)
enable_dnsbl_checks = YesNoField(
label=ugettext_lazy("Enable DNSBL checks"),
initial=True,
@@ -23,6 +23,7 @@
from modoboa.core.models import User
from modoboa.lib.exceptions import PermDeniedException
from modoboa.parameters import tools as param_tools
from . import signals
from .models import Alias, Domain, DomainAlias
@@ -170,56 +171,67 @@ def import_dlist(user, row, formopts):
_import_alias(user, row)
def get_domain_mx_list(domain):
"""Return a list of MX IP address for domain."""
result = []
logger = logging.getLogger("modoboa.admin")
def get_dns_resolver():
"""Return a DNS resolver object."""
dns_server = param_tools.get_global_parameter("custom_dns_server")
if dns_server:
resolver = dns.resolver.Resolver()
resolver.nameservers = [dns_server]
else:
resolver = dns.resolver
return resolver
def get_dns_records(name, typ, resolver=None):
"""Retrieve DNS records for given name and type."""
logger = logging.getLogger("modoboa.admin")
if not resolver:
resolver = get_dns_resolver()
try:
dns_answers = resolver.query(domain, "MX")
dns_answers = resolver.query(name, typ)
except dns.resolver.NXDOMAIN as e:
logger.error(_("No DNS records found for %s") % domain, exc_info=e)
logger.error(_("No DNS record found for %s") % name, exc_info=e)
except dns.resolver.NoAnswer as e:
logger.error(_("No MX record for %s") % domain, exc_info=e)
logger.error(_("No %s record for %s") % (typ, name), exc_info=e)
except dns.resolver.NoNameservers as e:
logger.error(_("No working name servers found"), exc_info=e)
except dns.resolver.Timeout as e:
logger.warning(
_("DNS resolution timeout, unable to query %s at the moment") %
domain, exc_info=e)
name, exc_info=e)
else:
for dns_answer in dns_answers:
for rtype in ["A", "AAAA"]:
return dns_answers
return None
def get_domain_mx_list(domain):
"""Return a list of MX IP address for domain."""
result = []
logger = logging.getLogger("modoboa.admin")
resolver = get_dns_resolver()
dns_answers = get_dns_records(domain, "MX", resolver)
if dns_answers is None:
return result
for dns_answer in dns_answers:
mx_domain = dns_answer.exchange.to_unicode(
omit_final_dot=True, idna_codec=IDNA_2008_UTS_46)
for rtype in ["A", "AAAA"]:
ip_answers = get_dns_records(mx_domain, rtype, resolver)
if not ip_answers:
continue
for ip_answer in ip_answers:
try:
mx_domain = dns_answer.exchange.to_unicode(
omit_final_dot=True, idna_codec=IDNA_2008_UTS_46)
ip_answers = resolver.query(mx_domain, rtype)
except dns.resolver.NXDOMAIN as e:
logger.error(
_("No {type} record found for MX {mx}").format(
type=rtype, mx=domain), exc_info=e)
except dns.resolver.NoAnswer:
pass
address_smart = smart_text(ip_answer.address)
mx_ip = ipaddress.ip_address(address_smart)
except ValueError as e:
logger.warning(
_("Invalid IP address format for "
"{domain}; {addr}").format(
domain=mx_domain,
addr=smart_text(ip_answer.address)
), exc_info=e)
else:
for ip_answer in ip_answers:
try:
address_smart = smart_text(ip_answer.address)
mx_ip = ipaddress.ip_address(address_smart)
except ValueError as e:
logger.warning(
_("Invalid IP address format for "
"{domain}; {addr}").format(
domain=mx_domain,
addr=smart_text(ip_answer.address)
), exc_info=e)
else:
result.append((mx_domain, mx_ip))
result.append((mx_domain, mx_ip))
return result
@@ -19,6 +19,7 @@
from django.utils.translation import ugettext as _
from modoboa.admin import constants, models
from modoboa.dnstools import models as dns_models
from modoboa.parameters import tools as param_tools
@@ -193,6 +194,25 @@ def check_domain(self, domain, timeout=3, ttl=7200, **options):
if param_tools.get_global_parameter("enable_mx_checks"):
self.check_valid_mx(domain, mx_list, **options)
if param_tools.get_global_parameter("enable_spf_checks"):
dns_models.DNSRecord.objects.get_or_create_for_domain(
domain, "spf", ttl)
condition = (
param_tools.get_global_parameter("enable_dkim_checks") and
domain.dkim_public_key
)
if condition:
dns_models.DNSRecord.objects.get_or_create_for_domain(
domain, "dkim", ttl)
if param_tools.get_global_parameter("enable_dmarc_checks"):
dns_models.DNSRecord.objects.get_or_create_for_domain(
domain, "dmarc", ttl)
if param_tools.get_global_parameter("enable_autoconfig_checks"):
dns_models.DNSRecord.objects.get_or_create_for_domain(
domain, "autoconfig", ttl)
dns_models.DNSRecord.objects.get_or_create_for_domain(
domain, "autodiscover", ttl)
condition = (
not param_tools.get_global_parameter("enable_dnsbl_checks") or
options["no_dnsbl"] is True)
@@ -161,6 +161,31 @@ def dnsbl_status_color(self):
else:
return "success"
@cached_property
def spf_record(self):
"""Return SPF record."""
return self.dnsrecord_set.filter(type="spf").first()
@cached_property
def dkim_record(self):
"""Return DKIM record."""
return self.dnsrecord_set.filter(type="dkim").first()
@cached_property
def dmarc_record(self):
"""Return DMARC record."""
return self.dnsrecord_set.filter(type="dmarc").first()
@cached_property
def autoconfig_record(self):
"""Return autoconfig record."""
return self.dnsrecord_set.filter(type="autoconfig").first()
@cached_property
def autodiscover_record(self):
"""Return autodiscover record."""
return self.dnsrecord_set.filter(type="autodiscover").first()
@cached_property
def allocated_quota(self):
"""Return current quota allocation."""
@@ -257,7 +282,7 @@ def save(self, *args, **kwargs):
}
if self.old_dkim_key_length != self.dkim_key_length:
self.dkim_public_key = ""
self.dkim_private_key_path = ""
self.dkim_private_key_path = ""
super(Domain, self).save(*args, **kwargs)
def delete(self, fromuser, keepdir=False):
@@ -11,4 +11,37 @@
{% if enable_dnsbl_checks %}
<span class="label label-mini label-{{ domain.dnsbl_status_color }}"><a href="{% url 'admin:dnsbl_domain_detail' domain.pk %}" data-toggle="ajaxmodal">DNSBL</a></span>
{% endif %}
{% if enable_spf_checks %}
{% if not domain.spf_record %}
<span class="label label-mini label-danger" title="{% trans 'No record found' %}">SPF</span>
{% else %}
<span class="label label-mini label-{% if not domain.spf_record.is_valid %}warning{% else %}success{% endif %}">
<a href="{% url 'dnstools:dns_record_detail' domain.spf_record.pk %}" data-toggle="ajaxmodal">SPF</a></span>
{% endif %}
{% endif %}
{% if enable_dkim_checks and domain.enable_dkim %}
{% if not domain.dkim_public_key %}
<span class="label label-mini label-default" title="{% trans 'Waiting for key to be generated' %}">DKIM</span>
{% else %}
<span class="label label-mini label-{% if not domain.dkim_record %}danger{% elif not domain.dkim_record.is_valid %}warning{% else %}success{% endif %}">
<a href="{% url 'dnstools:dns_record_detail' domain.dkim_record.pk %}" data-toggle="ajaxmodal">DKIM</a></span>
{% endif %}
{% endif %}
{% if enable_dmarc_checks %}
{% if not domain.dmarc_record %}
<span class="label label-mini label-danger" title="{% trans 'No record found' %}">DMARC</span>
{% else %}
<span class="label label-mini label-{% if not domain.dmarc_record.is_valid %}warning{% else %}success{% endif %}">
<a href="{% url 'dnstools:dns_record_detail' domain.dmarc_record.pk %}" data-toggle="ajaxmodal">DMARC</a></span>
{% endif %}
{% endif %}
{% if enable_autoconfig_checks %}
{% if not domain.autoconfig_record and not domain.autodiscover_record %}
<span class="label label-mini label-danger" title="{% trans 'No record found' %}">autoconfig</span>
{% elif not domain.autoconfig_record or not domain.autodiscover_record %}
<span class="label label-mini label-warning"><a href="{% url 'dnstools:autoconfig_records_status' domain.pk %}" data-toggle="ajaxmodal">autoconfig</a></span>
{% else %}
<span class="label label-mini label-success"><a href="{% url 'dnstools:autoconfig_records_status' domain.pk %}" data-toggle="ajaxmodal">autoconfig</a></span>
{% endif %}
{% endif %}
{% endif %}
@@ -453,4 +453,3 @@ def test_dkim_key_length_modification(self):
os.unlink(key_path)
call_command("modo", "manage_dkim_keys")
self.assertTrue(os.path.exists(key_path))
@@ -110,7 +110,7 @@ def test_single_domain_update(
@mock.patch("socket.getaddrinfo")
@mock.patch.object(dns.resolver.Resolver, "query")
def test_get_mx_list_dsn_server(self, mock_query, mock_getaddrinfo):
def test_get_mx_list_dns_server(self, mock_query, mock_getaddrinfo):
"""Test to get mx list from specific DNS server."""
mock_query.side_effect = utils.mock_dns_query_result
mock_getaddrinfo.side_effect = utils.mock_ip_query_result
@@ -133,7 +133,7 @@ def test_get_domain_mx_list_logging(self, mock_query, mock_ip_address):
get_domain_mx_list("bad-response.example.com")
log.check(
("modoboa.admin", "ERROR",
_("No DNS records found for %s")
_("No DNS record found for %s")
% "does-not-exist.example.com"),
("modoboa.admin", "ERROR",
_("No MX record for %s") % "no-mx.example.com"),
@@ -142,11 +142,11 @@ def test_get_domain_mx_list_logging(self, mock_query, mock_ip_address):
_("DNS resolution timeout, unable to query %s at the moment")
% "timeout.example.com"),
("modoboa.admin", "ERROR",
_("No A record found for MX %(domain)s")
% {"domain": "no-lookup.example.com"}),
_("No DNS record found for %s")
% "does-not-exist.example.com"),
("modoboa.admin", "ERROR",
_("No AAAA record found for MX %(domain)s")
% {"domain": "no-lookup.example.com"}),
_("No DNS record found for %s")
% "does-not-exist.example.com"),
("modoboa.admin", "WARNING",
_("Invalid IP address format for %(domain)s; %(addr)s")
% {"domain": "bad-response.example.com", "addr": "000.0.0.0"}),
@@ -69,23 +69,25 @@ def _domains(request):
}
page = get_listing_page(domainlist, request.GET.get("page", 1))
parameters = request.localconfig.parameters
dns_checks = {
"enable_mx_checks": parameters.get_value("enable_mx_checks"),
"enable_spf_checks": parameters.get_value("enable_spf_checks"),
"enable_dkim_checks": parameters.get_value("enable_dkim_checks"),
"enable_dmarc_checks": parameters.get_value("enable_dmarc_checks"),
"enable_autoconfig_checks": (
parameters.get_value("enable_autoconfig_checks")),
"enable_dnsbl_checks": parameters.get_value("enable_dnsbl_checks")
}
context["headers"] = render_to_string(
"admin/domain_headers.html", {
"enable_mx_checks": parameters.get_value("enable_mx_checks"),
"enable_dnsbl_checks": (
parameters.get_value("enable_dnsbl_checks"))
}, request
"admin/domain_headers.html", dns_checks, request
)
if page is None:
context["length"] = 0
else:
tpl_context = {"domains": page.object_list}
tpl_context.update(dns_checks)
context["rows"] = render_to_string(
"admin/domains_table.html", {
"domains": page.object_list,
"enable_mx_checks": parameters.get_value("enable_mx_checks"),
"enable_dnsbl_checks": (
parameters.get_value("enable_dnsbl_checks"))
}, request
"admin/domains_table.html", tpl_context, request
)
context["pages"] = [page.number]
return render_to_json_response(context)
@@ -104,6 +106,11 @@ def domains(request, tplname="admin/domains.html"):
return render(request, tplname, {
"selection": "domains",
"enable_mx_checks": parameters.get_value("enable_mx_checks"),
"enable_spf_checks": parameters.get_value("enable_spf_checks"),
"enable_dkim_checks": parameters.get_value("enable_dkim_checks"),
"enable_dmarc_checks": parameters.get_value("enable_dmarc_checks"),
"enable_autoconfig_checks": (
parameters.get_value("enable_autoconfig_checks")),
"enable_dnsbl_checks": parameters.get_value("enable_dnsbl_checks")
})
@@ -226,6 +233,11 @@ def get_context_data(self, **kwargs):
context.update({
"templates": {"left": [], "right": []},
"enable_mx_checks": parameters.get_value("enable_mx_checks"),
"enable_spf_checks": parameters.get_value("enable_spf_checks"),
"enable_dkim_checks": parameters.get_value("enable_dkim_checks"),
"enable_dmarc_checks": parameters.get_value("enable_dmarc_checks"),
"enable_autoconfig_checks": (
parameters.get_value("enable_autoconfig_checks")),
"enable_dnsbl_checks": parameters.get_value("enable_dnsbl_checks"),
})
for _receiver, widgets in result:
@@ -2,8 +2,6 @@
"""
Django settings for {{ name }} project.
Generated by 'django-admin startproject' using Django 1.11.8.
For more information on this file, see
https://docs.djangoproject.com/en/1.11/topics/settings/
@@ -82,6 +80,7 @@ MODOBOA_APPS = (
'modoboa.relaydomains',
'modoboa.limits',
'modoboa.parameters',
'modoboa.dnstools',
# Modoboa extensions here.
{% for extension in extensions %} '{{ extension }}',
{% endfor %}
@@ -69,7 +69,11 @@
"admin-create_alias_on_mbox_rename": False,
"admin-dkim_keys_storage_dir": "",
"admin-dkim_default_key_length": 1024,
"admin-custom_dns_server": "193.43.55.67"
"admin-custom_dns_server": "193.43.55.67",
"admin-enable_spf_checks": True,
"admin-enable_dkim_checks": True,
"admin-enable_dmarc_checks": True,
"admin-enable_autoconfig_checks": True,
}
No changes.
@@ -0,0 +1,5 @@
from django.apps import AppConfig
class DnstoolsConfig(AppConfig):
name = 'dnstools'
Oops, something went wrong.

0 comments on commit 277602e

Please sign in to comment.