Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

make python files compatible with pep8

  • Loading branch information...
commit 35269b458ffa262c9f9afb1215de587b08362e4a 1 parent 8a703b9
@sergiocharpineljr sergiocharpineljr authored
View
78 ispdb/audit.py
@@ -13,9 +13,10 @@
settings_audit = None
value_error_re = re.compile("^.+'(.+)'$")
+
class AuditTrail(object):
- def __init__(self, show_in_admin=False, save_change_type=True, audit_deletes=True,
- track_fields=None):
+ def __init__(self, show_in_admin=False, save_change_type=True,
+ audit_deletes=True, track_fields=None):
self.opts = {}
self.opts['show_in_admin'] = show_in_admin
self.opts['save_change_type'] = save_change_type
@@ -38,7 +39,8 @@ def _contribute(sender, **kwargs):
# admin.site.register(cls, clsAdmin)
# Otherwise, register class with default ModelAdmin
admin.site.register(model)
- descriptor = AuditTrailDescriptor(model._default_manager, sender._meta.pk.attname)
+ descriptor = AuditTrailDescriptor(model._default_manager,
+ sender._meta.pk.attname)
setattr(sender, name, descriptor)
def _audit_track(instance, field_arr, **kwargs):
@@ -68,9 +70,6 @@ def _audit(sender, instance, created, **kwargs):
for field_arr in model._audit_track:
kwargs[field_arr[0]] = _audit_track(instance, field_arr)
model._default_manager.create(**kwargs)
- ## Uncomment this line for pre r8223 Django builds
- #dispatcher.connect(_audit, signal=models.signals.post_save, sender=cls, weak=False)
- ## Comment this line for pre r8223 Django builds
models.signals.post_save.connect(_audit, sender=cls, weak=False)
if self.opts['audit_deletes']:
@@ -82,17 +81,14 @@ def _audit_delete(sender, instance, **kwargs):
if self.opts['save_change_type']:
kwargs['_audit_change_type'] = 'D'
for field_arr in model._audit_track:
- kwargs[field_arr[0]] = _audit_track(instance, field_arr)
+ kwargs[field_arr[0]] = _audit_track(instance,
+ field_arr)
model._default_manager.create(**kwargs)
- ## Uncomment this line for pre r8223 Django builds
- #dispatcher.connect(_audit_delete, signal=models.signals.pre_delete, sender=cls, weak=False)
- ## Comment this line for pre r8223 Django builds
- models.signals.pre_delete.connect(_audit_delete, sender=cls, weak=False)
-
- ## Uncomment this line for pre r8223 Django builds
- #dispatcher.connect(_contribute, signal=models.signals.class_prepared, sender=cls, weak=False)
- ## Comment this line for pre r8223 Django builds
- models.signals.class_prepared.connect(_contribute, sender=cls, weak=False)
+ models.signals.pre_delete.connect(_audit_delete, sender=cls,
+ weak=False)
+ models.signals.class_prepared.connect(_contribute, sender=cls,
+ weak=False)
+
class AuditTrailDescriptor(object):
def __init__(self, manager, pk_attribute):
@@ -101,13 +97,14 @@ def __init__(self, manager, pk_attribute):
def __get__(self, instance=None, owner=None):
if instance == None:
- #raise AttributeError, "Audit trail is only accessible via %s instances." % type.__name__
return create_audit_manager_class(self.manager)
else:
- return create_audit_manager_with_pk(self.manager, self.pk_attribute, instance._get_pk_val())
+ return create_audit_manager_with_pk(self.manager,
+ self.pk_attribute, instance._get_pk_val())
def __set__(self, instance, value):
- raise AttributeError, "Audit trail may not be edited in this manner."
+ raise AttributeError("Audit trail may not be edited in this manner.")
+
def create_audit_manager_with_pk(manager, pk_attribute, pk):
"""Create an audit trail manager based on the current object"""
@@ -116,9 +113,11 @@ def __init__(self):
self.model = manager.model
def get_query_set(self):
- return super(AuditTrailWithPkManager, self).get_query_set().filter(**{pk_attribute: pk})
+ return super(AuditTrailWithPkManager, self).get_query_set().filter(
+ **{pk_attribute: pk})
return AuditTrailWithPkManager()
+
def create_audit_manager_class(manager):
"""Create an audit trail manager based on the current object"""
class AuditTrailManager(manager.__class__):
@@ -126,6 +125,7 @@ def __init__(self):
self.model = manager.model
return AuditTrailManager()
+
def create_audit_model(cls, **kwargs):
"""Create an audit model for the specific class"""
name = cls.__name__ + 'Audit'
@@ -140,10 +140,13 @@ class Meta:
'__module__': cls.__module__,
'Meta': Meta,
'_audit_id': models.AutoField(primary_key=True),
- '_audit_timestamp': models.DateTimeField(auto_now_add=True, db_index=True),
+ '_audit_timestamp': models.DateTimeField(auto_now_add=True,
+ db_index=True),
'_audit__str__': cls.__str__.im_func,
- '__str__': lambda self: '%s as of %s' % (self._audit__str__(), self._audit_timestamp),
- '_audit_track': _track_fields(track_fields=kwargs['track_fields'], unprocessed=True)
+ '__str__': lambda self: '%s as of %s' % (self._audit__str__(),
+ self._audit_timestamp),
+ '_audit_track': _track_fields(track_fields=kwargs['track_fields'],
+ unprocessed=True)
}
if 'save_change_type' in kwargs and kwargs['save_change_type']:
@@ -153,14 +156,17 @@ class Meta:
for field in cls._meta.fields:
#if field.attname in attrs:
if field.name in attrs:
- raise ImproperlyConfigured, "%s cannot use %s as it is needed by AuditTrail." % (cls.__name__, field.attname)
+ raise ImproperlyConfigured("%s cannot use %s as it is needed by "
+ "AuditTrail." % (cls.__name__,
+ field.attname))
if isinstance(field, models.AutoField):
# Audit models have a separate AutoField
- attrs[field.name] = models.IntegerField(db_index=True, editable=False)
+ attrs[field.name] = models.IntegerField(db_index=True,
+ editable=False)
else:
attrs[field.name] = copy.copy(field)
- # If 'unique' is in there, we need to remove it, otherwise the index
- # is created and multiple audit entries for one item fail.
+ # If 'unique' is in there, we need to remove it, otherwise the
+ # index is created and multiple audit entries for one item fail.
attrs[field.name]._unique = False
# If a model has primary_key = True, a second primary key would be
# created in the audit model. Set primary_key to false.
@@ -168,11 +174,13 @@ class Meta:
for track_field in _track_fields(kwargs['track_fields']):
if track_field['name'] in attrs:
- raise NameError('Field named "%s" already exists in audit version of %s' % (track_field['name'], cls.__name__))
+ raise NameError('Field named "%s" already exists in audit version '
+ 'of %s' % (track_field['name'], cls.__name__))
attrs[track_field['name']] = copy.copy(track_field['field'])
-
+
return type(name, (models.Model,), attrs)
+
def _build_track_field(track_item):
track = {}
track['name'] = track_item[0]
@@ -181,21 +189,24 @@ def _build_track_field(track_item):
elif issubclass(track_item[1], models.Model):
track['field'] = models.ForeignKey(track_item[1])
else:
- raise TypeError('Track fields only support items that are Fields or Models.')
+ raise TypeError('Track fields only support items that are Fields or '
+ 'Models.')
return track
+
def _track_fields(track_fields=None, unprocessed=False):
# Add in the fields from the Audit class "track" attribute.
tracks_found = []
-
+
if settings_audit:
- global_track_fields = getattr(settings_audit, 'GLOBAL_TRACK_FIELDS', [])
+ global_track_fields = getattr(settings_audit, 'GLOBAL_TRACK_FIELDS',
+ [])
for track_item in global_track_fields:
if unprocessed:
tracks_found.append(track_item)
else:
tracks_found.append(_build_track_field(track_item))
-
+
if track_fields:
for track_item in track_fields:
if unprocessed:
@@ -203,4 +214,3 @@ def _track_fields(track_fields=None, unprocessed=False):
else:
tracks_found.append(_build_track_field(track_item))
return tracks_found
-
View
5 ispdb/config/admin.py
@@ -1,14 +1,17 @@
from ispdb.config.models import Config, Domain, DomainRequest
from django.contrib import admin
+
class DomainInline(admin.TabularInline):
model = Domain
extra = 0
+
class DomainRequestInline(admin.TabularInline):
model = DomainRequest
extra = 0
+
class ConfigAdmin(admin.ModelAdmin):
inlines = [
DomainRequestInline,
@@ -21,7 +24,7 @@ class ConfigAdmin(admin.ModelAdmin):
def change_view(self, request, obj_id):
c = self.model.objects.get(pk=obj_id)
if not c.domainrequests.all():
- self.inlines=[DomainInline,]
+ self.inlines = [DomainInline, ]
return super(ConfigAdmin, self).change_view(request, obj_id)
def list_domains(self, obj):
View
591 ispdb/config/forms.py
@@ -15,12 +15,13 @@
DocURLDesc, EnableURL, EnableURLInst, Issue)
-def clean_port(self,field):
+def clean_port(self, field):
data = self.cleaned_data[field]
if data > 65535:
raise ValidationError("Port number cannot be larger than 65535")
return data
+
class DynamicModelForm(ModelForm):
"""
Class that represents a ModelForm which can work with our dynamic forms
@@ -43,7 +44,7 @@ def disable_fields(self):
def __init__(self, *args, **kwargs):
super(DynamicModelForm, self).__init__(*args, **kwargs)
self.empty_permitted = False
- if self.data.get(self.prefix+'-DELETE', '') == "True":
+ if self.data.get(self.prefix + '-DELETE', '') == "True":
self.disable_fields()
def clean(self, **kwargs):
@@ -199,6 +200,7 @@ def save(self, commit=True):
self.desc_formset.save()
return m
+
class BaseDocURLFormSet(DynamicBaseModelFormSet):
model = DocURL
@@ -225,7 +227,7 @@ def __init__(self, *args, **kwargs):
queryset = DocURLDesc.objects.none()
# if form is deleted, delete the descriptions
if (self.data and
- self.data.get(form.prefix+'-DELETE', '') == "True"):
+ self.data.get(form.prefix + '-DELETE', '') == "True"):
delete = True
form.desc_formset = self.DocURLDescFormSet(data=data,
prefix=prefix,
@@ -330,6 +332,7 @@ def save(self, commit=True):
self.inst_formset.save()
return m
+
class BaseEnableURLFormSet(DynamicBaseModelFormSet):
model = EnableURL
@@ -356,7 +359,7 @@ def __init__(self, *args, **kwargs):
qs = EnableURLInst.objects.none()
# if form is deleted, delete the instructions
if (self.data and
- self.data.get(form.prefix+'-DELETE', '') == "True"):
+ self.data.get(form.prefix + '-DELETE', '') == "True"):
delete = True
form.inst_formset = self.EnableURLInstFormSet(data=data,
prefix=prefix,
@@ -411,7 +414,7 @@ def clean(self, *args, **kwargs):
# if it is going to be deleted, dont need to to check it
if cleaned_data["DELETE"]:
return cleaned_data
- if not cleaned_data.has_key('name'):
+ if not 'name' in cleaned_data:
return cleaned_data
data = cleaned_data["name"]
# check if domain name is valid
@@ -422,9 +425,9 @@ def clean(self, *args, **kwargs):
# Trivial case failed. Try for possible IDN domain
msg = ("Domain name is not valid")
try:
- if not regex.match(data.encode('idna')): # IDN -> ACE
+ if not regex.match(data.encode('idna')): # IDN -> ACE
raise ValidationError(msg)
- except UnicodeError: # invalid domain
+ except UnicodeError: # invalid domain
raise ValidationError(msg)
return cleaned_data
@@ -434,7 +437,7 @@ def validate_unique(self):
self.instance.config.status == 'suggested'):
dom = Domain.objects.filter(name=self.instance.name,
config__status='approved')
- if dom and (not self.initial.has_key('name') or (dom[0].name !=
+ if dom and ((not 'name' in self.initial) or (dom[0].name !=
self.initial['name'])):
msg = (u"Domain configuration already exists "
"<a href=\"%s\">here</a>." %
@@ -468,6 +471,7 @@ def save(self, commit=True):
super(DomainForm, self).save()
return self.instance
+
class BaseDomainFormSet(DynamicBaseModelFormSet):
model = DomainRequest
@@ -478,7 +482,6 @@ def __init__(self, *args, **kwargs):
def clean(self):
"""Checks that at least one domain is not deleted."""
if any(self.errors):
- # Don't bother validating the formset unless each form is valid on its own
return
# Get number of deleted forms
deleted_forms = 0
@@ -500,6 +503,7 @@ def clean(self):
if (self.total_form_count() - deleted_forms) > self.max_num:
raise ValidationError("Number of domains exceeded.")
+
class ConfigForm(ModelForm):
class Meta:
model = Config
@@ -562,3 +566,572 @@ def save_all(self, *args, **kwargs):
for form in self.enableurl_formset:
form.instance.config = self.instance
self.enableurl_formset.save()
+
+
+class DynamicModelForm(ModelForm):
+ """
+ Class that represents a ModelForm which can work with our dynamic forms
+ system
+ """
+ DELETE = BooleanField(required=False, initial=False, widget=HiddenInput)
+
+ def disable_fields(self):
+ for k, field in self.fields.iteritems():
+ if k == "DELETE":
+ continue
+ field.required = False
+ w = field.widget
+ if (hasattr(w, 'input_type')) and (w.input_type in
+ ["text", "password"]):
+ w.attrs['readonly'] = "readonly"
+ else:
+ w.attrs['disabled'] = "disabled"
+
+ def __init__(self, *args, **kwargs):
+ super(DynamicModelForm, self).__init__(*args, **kwargs)
+ self.empty_permitted = False
+ if self.data.get(self.prefix + '-DELETE', '') == "True":
+ self.disable_fields()
+
+ def clean(self, **kwargs):
+ cleaned_data = super(DynamicModelForm, self).clean(**kwargs)
+ # if form is deleted, clear errors
+ if cleaned_data['DELETE']:
+ self.errors.clear()
+ return cleaned_data
+
+
+class DynamicBaseModelFormSet(BaseModelFormSet):
+ """
+ Class that represents a BaseModelFormSet which can work with our dynamic
+ forms system
+ """
+ def __init__(self, *args, **kwargs):
+ super(DynamicBaseModelFormSet, self).__init__(*args, **kwargs)
+ self.can_delete = True
+
+ def save(self, *args, **kwargs):
+ for form in self.forms:
+ # discard unchanged empty forms
+ if (not hasattr(form, 'cleaned_data')) or (not form.cleaned_data):
+ continue
+ # discard deleted new forms
+ if form.cleaned_data['DELETE'] and not form in self.initial_forms:
+ continue
+ form.save(*args, **kwargs)
+
+ def _get_empty_form(self, **kwargs):
+ defaults = {
+ 'auto_id': 'id_%s',
+ 'prefix': '{1}-{0}',
+ 'empty_permitted': True,
+ }
+ defaults.update(kwargs)
+ form = self.form(**defaults)
+ return form
+ empty_form = property(_get_empty_form)
+
+
+class IssueForm(ModelForm):
+ show_form = BooleanField(required=False, initial=False, widget=HiddenInput)
+
+ class Meta:
+ model = Issue
+ fields = ['title', 'description']
+
+
+class DocURLDescForm(DynamicModelForm):
+ class Meta:
+ model = DocURLDesc
+ exclude = ['docurl']
+
+ def __init__(self, *args, **kwargs):
+ http_accept_language = kwargs.pop('http_accept_language', '')
+ super(DocURLDescForm, self).__init__(*args, **kwargs)
+ choices = []
+ # add HTTP_ACCEPTED_LANG first
+ if http_accept_language:
+ codes = re.split(';|,', http_accept_language)
+ for code in codes:
+ try:
+ li = get_language_info(code)
+ except:
+ continue
+ choices.append((code, li['name'] + ' - ' + li['name_local']))
+ # Redefine our choices, so we can add the translated language names and
+ # sort the list by language name
+ choices.append(self.fields['language'].choices[0])
+ langs = self.fields['language'].choices[1:]
+ langs.sort(key=lambda l: l[1].lower())
+ for code, lang in langs:
+ li = get_language_info(code)
+ choices.append((code, lang + ' - ' + li['name_local']))
+ self.fields['language'].choices = choices
+
+ def save(self, commit=True):
+ super(DocURLDescForm, self).save(commit=False)
+ if commit:
+ # delete if exists
+ if self.cleaned_data and self.cleaned_data['DELETE']:
+ if self.instance.pk:
+ return self.instance.delete()
+ else:
+ return None
+ super(DocURLDescForm, self).save()
+ return self.instance
+
+
+class BaseDocURLDescFormSet(DynamicBaseModelFormSet):
+ model = DocURLDesc
+
+ def __init__(self, *args, **kwargs):
+ self.docurl_delete = kwargs.pop('delete', False)
+ super(BaseDocURLDescFormSet, self).__init__(*args, **kwargs)
+ # if docurl is deleted, set required to false so they don't
+ # need to be filled
+ if self.docurl_delete:
+ for form in self.forms:
+ for k, field in form.fields.iteritems():
+ field.required = False
+
+ def clean(self, **kwargs):
+ super(BaseDocURLDescFormSet, self).clean(**kwargs)
+ if any(self.errors):
+ return
+ deleted_forms = 0
+ languages = []
+ for i in range(0, self.total_form_count()):
+ form = self.forms[i]
+ if (not form.cleaned_data) or form.cleaned_data['DELETE']:
+ deleted_forms += 1
+ continue
+ # Check for repeated languages
+ if form.cleaned_data['language'] in languages:
+ raise ValidationError("Duplicated language found.")
+ if form.cleaned_data['language'] != "Other":
+ languages.append(form.cleaned_data['language'])
+ # Check if all forms are deleted
+ if (not self.docurl_delete) and (self.total_form_count() == 0 or
+ self.total_form_count() == deleted_forms):
+ raise ValidationError("At least one description should be "
+ "specified.")
+
+
+class DocURLForm(DynamicModelForm):
+ class Meta:
+ model = DocURL
+ fields = ['url']
+
+ def save(self, commit=True):
+ m = super(DocURLForm, self).save(commit=False)
+ self.desc_formset.save(commit=False)
+ if commit:
+ if self.cleaned_data and self.cleaned_data['DELETE']:
+ # delete docurl and docurldesc forms
+ if self.instance.pk:
+ for form in self.desc_formset:
+ if form.instance.pk:
+ form.instance.delete()
+ self.instance.delete()
+ return None
+ m = super(DocURLForm, self).save()
+ for form in self.desc_formset:
+ form.instance.docurl = m
+ self.desc_formset.save()
+ return m
+
+
+class BaseDocURLFormSet(DynamicBaseModelFormSet):
+ model = DocURL
+
+ def __init__(self, *args, **kwargs):
+ kwargs['prefix'] = 'docurl'
+ # get request.META to add HTTP_ACCEPT_LANGUAGE on top of languages
+ # choices
+ meta = kwargs.pop('meta', {})
+ accept_lang = meta.get('HTTP_ACCEPT_LANGUAGE', '')
+ super(BaseDocURLFormSet, self).__init__(*args, **kwargs)
+ # construct description formsets
+ self.DocURLDescFormSet = modelformset_factory(DocURLDesc,
+ extra=self.extra, form=DocURLDescForm,
+ formset=BaseDocURLDescFormSet)
+ self.DocURLDescFormSet.form = staticmethod(curry(DocURLDescForm,
+ http_accept_language=accept_lang))
+ for index, form in enumerate(self.forms):
+ prefix = 'desc_%s' % index
+ data = self.data or None
+ delete = False
+ if self.queryset:
+ queryset = DocURLDesc.objects.filter(docurl=form.instance)
+ else:
+ queryset = DocURLDesc.objects.none()
+ # if form is deleted, delete the descriptions
+ if (self.data and
+ self.data.get(form.prefix + '-DELETE', '') == "True"):
+ delete = True
+ form.desc_formset = self.DocURLDescFormSet(data=data,
+ prefix=prefix,
+ delete=delete,
+ queryset=queryset)
+ # create a empty_desc_form
+ if self.forms:
+ self.empty_desc_form = self.forms[0].desc_formset.empty_form
+ else:
+ formset = self.DocURLDescFormSet()
+ self.empty_desc_form = formset.empty_form
+
+ def clean(self, **kwargs):
+ super(BaseDocURLFormSet, self).clean(**kwargs)
+ if any(self.errors):
+ return
+ urls = []
+ for i in range(0, self.total_form_count()):
+ form = self.forms[i]
+ if (not form.cleaned_data) or form.cleaned_data['DELETE']:
+ continue
+ # Check for repeated urls
+ if form.cleaned_data['url'] in urls:
+ raise ValidationError("Duplicated URL found.")
+ urls.append(form.cleaned_data['url'])
+
+ def is_valid(self, *args, **kwargs):
+ result = super(BaseDocURLFormSet, self).is_valid(*args, **kwargs)
+ for form in self.forms:
+ result = result and form.desc_formset.is_valid()
+ return result
+
+ def delete(self):
+ for form in self.forms:
+ for desc in form.desc_formset:
+ desc.instance.delete()
+ form.instance.delete()
+
+
+class EnableURLInstForm(DynamicModelForm):
+ class Meta:
+ model = EnableURLInst
+ exclude = ['enableurl']
+
+ def __init__(self, *args, **kwargs):
+ http_accept_language = kwargs.pop('http_accept_language', '')
+ super(EnableURLInstForm, self).__init__(*args, **kwargs)
+ choices = []
+ # add HTTP_ACCEPTED_LANG first
+ if http_accept_language:
+ codes = re.split(';|,', http_accept_language)
+ for code in codes:
+ try:
+ li = get_language_info(code)
+ except:
+ continue
+ choices.append((code, li['name'] + ' - ' + li['name_local']))
+ # Redefine our choices, so we can add the translated language names and
+ # sort the list by language name
+ choices.append(self.fields['language'].choices[0])
+ langs = self.fields['language'].choices[1:]
+ langs.sort(key=lambda l: l[1].lower())
+ for code, lang in langs:
+ li = get_language_info(code)
+ choices.append((code, lang + ' - ' + li['name_local']))
+ self.fields['language'].choices = choices
+
+ def save(self, commit=True):
+ super(EnableURLInstForm, self).save(commit=False)
+ if commit:
+ # delete if exists
+ if self.cleaned_data and self.cleaned_data['DELETE']:
+ if self.instance.pk:
+ return self.instance.delete()
+ else:
+ return None
+ super(EnableURLInstForm, self).save()
+ return self.instance
+
+
+class BaseEnableURLInstFormSet(DynamicBaseModelFormSet):
+ model = EnableURLInst
+
+ def __init__(self, *args, **kwargs):
+ self.enableurl_delete = kwargs.pop('delete', False)
+ super(BaseEnableURLInstFormSet, self).__init__(*args, **kwargs)
+ # if enableurl is deleted, set required to false so they don't
+ # need to be filled
+ if self.enableurl_delete:
+ for form in self.forms:
+ for k, field in form.fields.iteritems():
+ field.required = False
+
+ def clean(self, **kwargs):
+ super(BaseEnableURLInstFormSet, self).clean(**kwargs)
+ if any(self.errors):
+ return
+ deleted_forms = 0
+ languages = []
+ for i in range(0, self.total_form_count()):
+ form = self.forms[i]
+ if (not form.cleaned_data) or form.cleaned_data['DELETE']:
+ deleted_forms += 1
+ continue
+ # Check for repeated languages
+ if form.cleaned_data['language'] in languages:
+ raise ValidationError("Duplicated language found.")
+ if form.cleaned_data['language'] != "Other":
+ languages.append(form.cleaned_data['language'])
+ # Check if all forms are deleted
+ if (not self.enableurl_delete) and (self.total_form_count() == 0 or
+ self.total_form_count() == deleted_forms):
+ raise ValidationError("At least one instruction should be "
+ "specified.")
+
+
+class EnableURLForm(DynamicModelForm):
+ class Meta:
+ model = EnableURL
+ fields = ['url']
+
+ def save(self, commit=True):
+ m = super(EnableURLForm, self).save(commit=False)
+ self.inst_formset.save(commit=False)
+ if commit:
+ if self.cleaned_data and self.cleaned_data['DELETE']:
+ # delete enableurl and instruction forms
+ if self.instance.pk:
+ for form in self.inst_formset:
+ if form.instance.pk:
+ form.instance.delete()
+ self.instance.delete()
+ return None
+ m = super(EnableURLForm, self).save()
+ for form in self.inst_formset:
+ form.instance.enableurl = m
+ self.inst_formset.save()
+ return m
+
+
+class BaseEnableURLFormSet(DynamicBaseModelFormSet):
+ model = EnableURL
+
+ def __init__(self, *args, **kwargs):
+ kwargs['prefix'] = 'enableurl'
+ # get request.META to add HTTP_ACCEPT_LANGUAGE on top of languages
+ # choices
+ meta = kwargs.pop('meta', {})
+ accept_lang = meta.get('HTTP_ACCEPT_LANGUAGE', '')
+ super(BaseEnableURLFormSet, self).__init__(*args, **kwargs)
+ # construct instructions formsets
+ self.EnableURLInstFormSet = modelformset_factory(EnableURLInst,
+ extra=self.extra, form=EnableURLInstForm,
+ formset=BaseEnableURLInstFormSet)
+ self.EnableURLInstFormSet.form = staticmethod(curry(EnableURLInstForm,
+ http_accept_language=accept_lang))
+ for index, form in enumerate(self.forms):
+ prefix = 'inst_%s' % index
+ data = self.data or None
+ delete = False
+ if self.queryset:
+ qs = EnableURLInst.objects.filter(enableurl=form.instance)
+ else:
+ qs = EnableURLInst.objects.none()
+ # if form is deleted, delete the instructions
+ if (self.data and
+ self.data.get(form.prefix + '-DELETE', '') == "True"):
+ delete = True
+ form.inst_formset = self.EnableURLInstFormSet(data=data,
+ prefix=prefix,
+ delete=delete,
+ queryset=qs)
+ # create a empty_inst_form
+ if self.forms:
+ self.empty_inst_form = self.forms[0].inst_formset.empty_form
+ else:
+ formset = self.EnableURLInstFormSet()
+ self.empty_inst_form = formset.empty_form
+
+ def clean(self, **kwargs):
+ super(BaseEnableURLFormSet, self).clean(**kwargs)
+ if any(self.errors):
+ return
+ urls = []
+ for i in range(0, self.total_form_count()):
+ form = self.forms[i]
+ if (not form.cleaned_data) or form.cleaned_data['DELETE']:
+ continue
+ # Check for repeated urls
+ if form.cleaned_data['url'] in urls:
+ raise ValidationError("Duplicated URL found.")
+ urls.append(form.cleaned_data['url'])
+
+ def is_valid(self, *args, **kwargs):
+ result = super(BaseEnableURLFormSet, self).is_valid(*args, **kwargs)
+ for form in self.forms:
+ result = result and form.inst_formset.is_valid()
+ return result
+
+ def delete(self):
+ for form in self.forms:
+ for inst in form.inst_formset:
+ inst.instance.delete()
+ form.instance.delete()
+
+
+class DomainForm(DynamicModelForm):
+ class Meta:
+ fields = ('name',)
+
+ def __init__(self, *args, **kwargs):
+ if not self._meta.model:
+ self._meta.model = DomainRequest
+ super(DomainForm, self).__init__(*args, **kwargs)
+ self.fields['name'].required = False
+
+ def clean(self, *args, **kwargs):
+ cleaned_data = super(DomainForm, self).clean(*args, **kwargs)
+ # if it is going to be deleted, dont need to to check it
+ if cleaned_data["DELETE"]:
+ return cleaned_data
+ if not 'name' in cleaned_data:
+ return cleaned_data
+ data = cleaned_data["name"]
+ # check if domain name is valid
+ regex = re.compile(r"(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+"
+ "(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)",
+ re.IGNORECASE)
+ if not regex.match(data):
+ # Trivial case failed. Try for possible IDN domain
+ msg = ("Domain name is not valid")
+ try:
+ if not regex.match(data.encode('idna')): # IDN -> ACE
+ raise ValidationError(msg)
+ except UnicodeError: # invalid domain
+ raise ValidationError(msg)
+ return cleaned_data
+
+ def validate_unique(self):
+ if not (hasattr(self.instance, 'config') and
+ self.instance.config and
+ self.instance.config.status == 'suggested'):
+ dom = Domain.objects.filter(name=self.instance.name,
+ config__status='approved')
+ if dom and ((not 'name' in self.initial) or (dom[0].name !=
+ self.initial['name'])):
+ msg = (u"Domain configuration already exists "
+ "<a href=\"%s\">here</a>." %
+ reverse("ispdb_details", args=[dom[0].config.id]))
+ self._update_errors({'name': [mark_safe(msg)]})
+
+ def save(self, commit=True):
+ super(DomainForm, self).save(commit=False)
+ # search for existing instances
+ if not self.instance.pk:
+ # if it is a DomainRequest, we want those with no config attached
+ if self._meta.model == DomainRequest:
+ exists = DomainRequest.objects.filter(name=self.instance.name,
+ config=None)
+ if exists:
+ exists[0].config = self.instance.config
+ self.instance = exists[0]
+ # if it is a Domain, we want those which are not approved
+ else:
+ exists = Domain.objects.filter(name=self.instance.name)
+ if exists and exists[0].config != "approved":
+ exists[0].config = self.instance.config
+ self.instance = exists[0]
+ if commit:
+ # delete if exists
+ if self.cleaned_data and self.cleaned_data['DELETE']:
+ if self.instance.pk:
+ return self.instance.delete()
+ else:
+ return None
+ super(DomainForm, self).save()
+ return self.instance
+
+
+class BaseDomainFormSet(DynamicBaseModelFormSet):
+ model = DomainRequest
+
+ def __init__(self, *args, **kwargs):
+ kwargs['prefix'] = 'domain'
+ super(BaseDomainFormSet, self).__init__(*args, **kwargs)
+
+ def clean(self):
+ """Checks that at least one domain is not deleted."""
+ if any(self.errors):
+ return
+ # Get number of deleted forms
+ deleted_forms = 0
+ names = []
+ for i in range(0, self.total_form_count()):
+ form = self.forms[i]
+ if (not form.cleaned_data) or form.cleaned_data['DELETE']:
+ deleted_forms += 1
+ continue
+ # Check for repeated domain names
+ if form.cleaned_data['name'] in names:
+ raise ValidationError("Duplicated domain name found.")
+ names.append(form.cleaned_data['name'])
+ # Check if all forms are deleted
+ if (self.total_form_count() == 0 or
+ self.total_form_count() == deleted_forms):
+ raise ValidationError("At least one domain should be specified.")
+ # Check if number of non deleted forms is greater then max_num
+ if (self.total_form_count() - deleted_forms) > self.max_num:
+ raise ValidationError("Number of domains exceeded.")
+
+
+class ConfigForm(ModelForm):
+ class Meta:
+ model = Config
+ exclude = ['status',
+ 'last_status',
+ 'deleted_datetime',
+ 'email_provider_id',
+ 'outgoing_add_this_server',
+ 'outgoing_use_global_preferred_server',
+ 'owner',
+ 'locked',
+ ]
+ incoming_type = ChoiceField(widget=RadioSelect,
+ choices=Config.INCOMING_TYPE_CHOICES)
+
+ def __init__(self, *args, **kwargs):
+ self.domain_formset = kwargs.pop('domain_formset', None)
+ self.docurl_formset = kwargs.pop('docurl_formset', None)
+ self.enableurl_formset = kwargs.pop('enableurl_formset', None)
+ if not self.domain_formset:
+ raise(Exception('Domain formset required.'))
+ if not self.docurl_formset:
+ raise(Exception('DocURL formset required.'))
+ if not self.enableurl_formset:
+ raise(Exception('EnableURL formset required.'))
+ super(ConfigForm, self).__init__(*args, **kwargs)
+
+ def clean_incoming_port(self):
+ return clean_port(self, "incoming_port")
+
+ def clean_outgoing_port(self):
+ return clean_port(self, "outgoing_port")
+
+ def is_valid_all(self, *args, **kwargs):
+ return (self.is_valid(*args, **kwargs) and
+ self.domain_formset.is_valid(*args, **kwargs) and
+ self.docurl_formset.is_valid(*args, **kwargs) and
+ self.enableurl_formset.is_valid(*args, **kwargs))
+
+ def save_all(self, *args, **kwargs):
+ super(ConfigForm, self).save(*args, **kwargs)
+ # Save domain formset
+ self.domain_formset.save(commit=False)
+ for form in self.domain_formset:
+ form.instance.config = self.instance
+ self.domain_formset.save()
+ # Save DocURL formset
+ self.docurl_formset.save(commit=False)
+ for form in self.docurl_formset:
+ form.instance.config = self.instance
+ self.docurl_formset.save()
+ # Save EnableURL formset
+ self.enableurl_formset.save(commit=False)
+ for form in self.enableurl_formset:
+ form.instance.config = self.instance
+ self.enableurl_formset.save()
View
78 ispdb/config/models.py
@@ -5,12 +5,13 @@
from django.conf import settings
import ispdb.audit as audit
+
class Domain(models.Model):
name = models.CharField(max_length=100, unique=True,
verbose_name="Email domain",
help_text="(e.g. \"gmail.com\")")
config = models.ForeignKey('Config', related_name="domains",
- blank=True) # blank is for requests and rejects
+ blank=True) # blank is for requests and rejects
@staticmethod
def create_from_domainrequest(domainrequest):
@@ -21,8 +22,12 @@ def create_from_domainrequest(domainrequest):
d.config = domainrequest.config
return d
- def __str__(self): return str(self.name)
- def __unicode__(self): return self.name
+ def __str__(self):
+ return str(self.name)
+
+ def __unicode__(self):
+ return self.name
+
class DomainRequest(models.Model):
name = models.CharField(max_length=100, verbose_name="Email domain",
@@ -31,8 +36,12 @@ class DomainRequest(models.Model):
blank=True, null=True)
votes = models.IntegerField(default=1)
- def __str__(self): return str(self.name)
- def __unicode__(self): return self.name
+ def __str__(self):
+ return str(self.name)
+
+ def __unicode__(self):
+ return self.name
+
class Config(models.Model):
"""
@@ -65,8 +74,8 @@ def __unicode__(self):
("invalid", "invalid"),
("deleted", "deleted"),
]
- status = models.CharField(max_length=20, choices = CONFIG_CHOICES)
- last_status = models.CharField(max_length=20, choices = CONFIG_CHOICES,
+ status = models.CharField(max_length=20, choices=CONFIG_CHOICES)
+ last_status = models.CharField(max_length=20, choices=CONFIG_CHOICES,
blank=True)
email_provider_id = models.CharField(max_length=50, blank=True)
display_name = models.CharField(
@@ -102,16 +111,16 @@ def __unicode__(self):
("GSSAPI", "GSSAPI"),
)
incoming_authentication = models.CharField(
- max_length = 20,
- choices = INCOMING_AUTHENTICATION_CHOICES,
- default = "password-encrypted",
- help_text = """<strong>Unencrypted Password</strong>: Send password
- unencrypted in the clear. Dangerous, if SSL isn't used
- either. PLAIN or LOGIN etc…<br/>
-
- <strong>Encrypted Password</strong>: Hashed password.
- Offers minimal protection for passwords. CRAM-MD5 or
- DIGEST-MD5. Not NTLM.<br/>"""
+ max_length=20,
+ choices=INCOMING_AUTHENTICATION_CHOICES,
+ default="password-encrypted",
+ help_text="""<strong>Unencrypted Password</strong>: Send password
+ unencrypted in the clear. Dangerous, if SSL isn't used
+ either. PLAIN or LOGIN etc…<br/>
+
+ <strong>Encrypted Password</strong>: Hashed password.
+ Offers minimal protection for passwords. CRAM-MD5 or
+ DIGEST-MD5. Not NTLM.<br/>"""
)
outgoing_hostname = models.CharField(max_length=100)
outgoing_port = models.PositiveIntegerField()
@@ -134,16 +143,16 @@ def __unicode__(self):
("GSSAPI", "GSSAPI"),
)
outgoing_authentication = models.CharField(
- max_length = 20,
- choices = OUTGOING_AUTHENTICATION_CHOICES,
- default = "password-encrypted",
- help_text = """<strong>Unencrypted Password</strong>: Send password
- unencrypted in the clear. Dangerous, if SSL isn't used
- either. PLAIN or LOGIN etc…<br/>
-
- <strong>Encrypted Password</strong>: Hashed password.
- Offers minimal protection for passwords. CRAM-MD5 or
- DIGEST-MD5. Not NTLM.<br/>"""
+ max_length=20,
+ choices=OUTGOING_AUTHENTICATION_CHOICES,
+ default="password-encrypted",
+ help_text="""<strong>Unencrypted Password</strong>: Send password
+ unencrypted in the clear. Dangerous, if SSL isn't used
+ either. PLAIN or LOGIN etc…<br/>
+
+ <strong>Encrypted Password</strong>: Hashed password.
+ Offers minimal protection for passwords. CRAM-MD5 or
+ DIGEST-MD5. Not NTLM.<br/>"""
)
outgoing_add_this_server = models.BooleanField(
verbose_name="Add this server to list???")
@@ -154,6 +163,7 @@ def __unicode__(self):
history = audit.AuditTrail()
+
class Issue(models.Model):
title = models.CharField(max_length=50)
description = models.TextField(max_length=3000)
@@ -178,8 +188,11 @@ class CommonConfigURL(models.Model):
class Meta:
abstract = True
- def __str__(self): return str(self.url)
- def __unicode__(self): return self.url
+ def __str__(self):
+ return str(self.url)
+
+ def __unicode__(self):
+ return self.url
class CommonURLDesc(models.Model):
@@ -194,8 +207,11 @@ class CommonURLDesc(models.Model):
class Meta:
abstract = True
- def __str__(self): return str(self.description)
- def __unicode__(self): return self.description
+ def __str__(self):
+ return str(self.description)
+
+ def __unicode__(self):
+ return self.description
class DocURL(CommonConfigURL):
View
161 ispdb/config/serializers.py
@@ -7,9 +7,11 @@
# The serializers in reverse order, newest at the top.
+
def xmlOneDotOne(data):
"""
- Return the configuration using the XML document that Thunderbird is expecting.
+ Return the configuration using the XML document that Thunderbird is
+ expecting.
"""
desiredOutput = {
"display_name": "displayName",
@@ -33,42 +35,42 @@ def xmlOneDotOne(data):
qs = Domain.objects.filter(config=data) or (
DomainRequest.objects.filter(config=data))
for domain in qs:
- if not data.email_provider_id:
- data.email_provider_id = domain.name
- ET.SubElement(emailProvider, "domain").text = domain.name
+ if not data.email_provider_id:
+ data.email_provider_id = domain.name
+ ET.SubElement(emailProvider, "domain").text = domain.name
emailProvider.attrib["id"] = data.email_provider_id
for field in data._meta.fields:
- if field.name not in desiredOutput:
- continue
- if field.name.startswith("incoming"):
- if incoming is None:
- incoming = ET.SubElement(emailProvider, "incomingServer")
- incoming.attrib["type"] = data.incoming_type
- name = field.name
- currParent = incoming
- elif field.name.startswith("outgoing"):
- if outgoing is None:
- outgoing = ET.SubElement(emailProvider, "outgoingServer")
- outgoing.attrib["type"] = "smtp"
- name = field.name
- currParent = outgoing
- else:
- name = field.name
- currParent = emailProvider
- if name == "incoming_username_form" and data.incoming_username_form == "":
- data.incoming_username_form = data.outgoing_username_form
- name = desiredOutput[name]
- e = ET.SubElement(currParent, name)
- text = getattr(data, field.name)
-
- if type(text) is bool:
- # Force boolean values to use lowercase.
- text = unicode(text).lower()
- else:
- # Force other values to be converted into unicode strings.
- text = unicode(text)
-
- e.text = text
+ if field.name not in desiredOutput:
+ continue
+ if field.name.startswith("incoming"):
+ if incoming is None:
+ incoming = ET.SubElement(emailProvider, "incomingServer")
+ incoming.attrib["type"] = data.incoming_type
+ name = field.name
+ currParent = incoming
+ elif field.name.startswith("outgoing"):
+ if outgoing is None:
+ outgoing = ET.SubElement(emailProvider, "outgoingServer")
+ outgoing.attrib["type"] = "smtp"
+ name = field.name
+ currParent = outgoing
+ else:
+ name = field.name
+ currParent = emailProvider
+ if (name == "incoming_username_form") and (
+ data.incoming_username_form == ""):
+ data.incoming_username_form = data.outgoing_username_form
+ name = desiredOutput[name]
+ e = ET.SubElement(currParent, name)
+ text = getattr(data, field.name)
+
+ if type(text) is bool:
+ # Force boolean values to use lowercase.
+ text = unicode(text).lower()
+ else:
+ # Force other values to be converted into unicode strings.
+ text = unicode(text)
+ e.text = text
# EnableURL
for enableurl in data.enableurl_set.all():
enable = ET.SubElement(emailProvider, "enable")
@@ -91,9 +93,11 @@ def xmlOneDotOne(data):
xml.write(retval, encoding="UTF-8", xml_declaration=True)
return retval.getvalue()
+
def xmlOneDotZero(data):
"""
- Return the configuration using the XML document that Thunderbird is expecting.
+ Return the configuration using the XML document that Thunderbird is
+ expecting.
"""
desiredOutput = {
"display_name": "displayName",
@@ -123,46 +127,47 @@ def xmlOneDotZero(data):
config.attrib["version"] = "1.0"
emailProvider = ET.SubElement(config, "emailProvider")
for domain in Domain.objects.filter(config=data):
- ET.SubElement(emailProvider, "domain").text = domain.name
- if not data.email_provider_id:
- data.email_provider_id = domain.name
+ ET.SubElement(emailProvider, "domain").text = domain.name
+ if not data.email_provider_id:
+ data.email_provider_id = domain.name
emailProvider.attrib["id"] = data.email_provider_id
for field in data._meta.fields:
- if field.name not in desiredOutput:
- continue
- if field.name.startswith("incoming"):
- if incoming is None:
- incoming = ET.SubElement(emailProvider, "incomingServer")
- incoming.attrib["type"] = data.incoming_type
- name = field.name
- currParent = incoming
- elif field.name.startswith("outgoing"):
- if outgoing is None:
- outgoing = ET.SubElement(emailProvider, "outgoingServer")
- outgoing.attrib["type"] = "smtp"
- name = field.name
- currParent = outgoing
- else:
- name = field.name
- currParent = emailProvider
- if name == "incoming_username_form" and data.incoming_username_form == "":
- data.incoming_username_form = data.outgoing_username_form
- name = desiredOutput[name]
- e = ET.SubElement(currParent, name)
- text = getattr(data, field.name)
-
- if type(text) is bool:
- # Force boolean values to use lowercase.
- text = unicode(text).lower()
- else:
- # Force other values to be converted into unicode strings.
- text = unicode(text)
-
- # Fix up the data to make it 1.0 compliant.
- if name == "authentication":
- text = authentication_values.get(text, text)
-
- e.text = text
+ if field.name not in desiredOutput:
+ continue
+ if field.name.startswith("incoming"):
+ if incoming is None:
+ incoming = ET.SubElement(emailProvider, "incomingServer")
+ incoming.attrib["type"] = data.incoming_type
+ name = field.name
+ currParent = incoming
+ elif field.name.startswith("outgoing"):
+ if outgoing is None:
+ outgoing = ET.SubElement(emailProvider, "outgoingServer")
+ outgoing.attrib["type"] = "smtp"
+ name = field.name
+ currParent = outgoing
+ else:
+ name = field.name
+ currParent = emailProvider
+ if (name == "incoming_username_form") and (
+ data.incoming_username_form == ""):
+ data.incoming_username_form = data.outgoing_username_form
+ name = desiredOutput[name]
+ e = ET.SubElement(currParent, name)
+ text = getattr(data, field.name)
+
+ if type(text) is bool:
+ # Force boolean values to use lowercase.
+ text = unicode(text).lower()
+ else:
+ # Force other values to be converted into unicode strings.
+ text = unicode(text)
+
+ # Fix up the data to make it 1.0 compliant.
+ if name == "authentication":
+ text = authentication_values.get(text, text)
+
+ e.text = text
retval = StringIO("w")
xml = ET.ElementTree(config)
@@ -174,13 +179,13 @@ def xmlOneDotZero(data):
# that breaks old clients, which are hopefully exceptional cases.
# In other words, this does *not* change with every TB major release.
_serializers = {
- "1.0" : xmlOneDotZero,
- "1.1" : xmlOneDotOne,
+ "1.0": xmlOneDotZero,
+ "1.1": xmlOneDotOne,
}
+
def get(version):
# If there is no version requested, return the most recent version.
if version == None:
- return xmlOneDotZero
+ return xmlOneDotZero
return _serializers.get(version, None)
-
View
7 ispdb/config/templatetags/custom_filters.py
@@ -4,6 +4,7 @@
register = template.Library()
+
@register.filter
def data_verbose(boundField, attr_name="value"):
"""
@@ -18,10 +19,11 @@ def data_verbose(boundField, attr_name="value"):
data = boundField[attr_name]
field = boundField
rv = data
- if field.has_key("choices") and dict(field['choices']).get(data, ''):
+ if 'choices' in field and dict(field['choices']).get(data, ''):
rv = dict(field["choices"]).get(data, "")
return rv
+
@register.filter
def data_verbose_field(boundField, attr_name="value"):
"""
@@ -34,9 +36,10 @@ def data_verbose_field(boundField, attr_name="value"):
"""
data = boundField.value()
field = boundField.field
- return hasattr(field, 'choices') and dict(field.choices).get(data,'') or \
+ return hasattr(field, 'choices') and dict(field.choices).get(data, '') or \
data
+
@register.filter
def is_undo_available(self):
delta = timezone.now() - self.deleted_datetime
View
60 ispdb/config/views.py
@@ -32,12 +32,14 @@
from ispdb.config.configChecks import do_domain_checks
from ispdb.config.configChecks import do_config_checks
+
@login_required
def comment_post_wrapper(request):
if not request.method == 'POST':
return HttpResponseRedirect('/')
return post_comment(request)
+
@permission_required("comments.can_moderate")
def delete_comment(request, id):
comment = get_object_or_404(comments.get_model(), pk=id,
@@ -47,6 +49,7 @@ def delete_comment(request, id):
redir = request.META.get('HTTP_REFERER', None) or '/'
return HttpResponseRedirect(redir)
+
def login(request):
redirect_url = '/'
if 'next' in request.GET:
@@ -54,15 +57,18 @@ def login(request):
return render_to_response("config/login.html", {'redir_url': redirect_url},
context_instance=RequestContext(request))
+
def logout_view(request):
logout(request)
return HttpResponseRedirect('/')
+
def intro(request):
domains = Domain.objects.all()
return render_to_response("config/intro.html", {'domains': domains},
context_instance=RequestContext(request))
+
def list(request, format="html"):
if format == "xml":
providers = ET.Element("providers")
@@ -70,8 +76,8 @@ def list(request, format="html"):
for config in configs:
provider = ET.SubElement(providers, "provider")
ET.SubElement(provider, "id").text = unicode(config.id)
- ET.SubElement(provider, "export").text = reverse("ispdb_export_xml",
- kwargs={"id": config.id})
+ ET.SubElement(provider, "export").text = reverse(
+ "ispdb_export_xml", kwargs={"id": config.id})
ET.SubElement(provider, "lastUpdated").text = unicode(
config.last_update_datetime)
xml = ET.ElementTree(providers)
@@ -85,6 +91,7 @@ def list(request, format="html"):
return render_to_response("config/list.html", {'configs': configs},
context_instance=RequestContext(request))
+
def details(request, id, error=None):
config = get_object_or_404(Config, pk=id)
other_fields = []
@@ -124,6 +131,7 @@ def details(request, id, error=None):
'enableurls': enableurl_formset},
context_instance=RequestContext(request))
+
def export_xml(request, version=None, id=None, domain=None):
config = None
if id is not None:
@@ -136,6 +144,7 @@ def export_xml(request, version=None, id=None, domain=None):
data = serialize(config)
return HttpResponse(data, mimetype='text/xml')
+
@login_required
def edit(request, config_id):
config = get_object_or_404(Config, pk=config_id)
@@ -205,7 +214,6 @@ def edit(request, config_id):
}, context_instance=RequestContext(request))
-
@login_required
def add(request, domain=None):
DomainFormSet = modelformset_factory(DomainRequest, extra=1, max_num=10,
@@ -217,7 +225,7 @@ def add(request, domain=None):
action = 'add'
has_errors = False
- if request.method == 'POST': # If the form has been submitted...
+ if request.method == 'POST': # If the form has been submitted...
data = request.POST
docurl_formset = DocURLFormSet(request.POST, request.FILES,
queryset=DocURL.objects.none(), meta=request.META)
@@ -243,7 +251,7 @@ def add(request, domain=None):
domain.save()
else:
form.save()
- return HttpResponseRedirect('/') # Redirect after POST
+ return HttpResponseRedirect('/') # Redirect after POST
else:
config = Config(owner=request.user, status='requested')
# A form bound to the POST data
@@ -277,6 +285,7 @@ def add(request, domain=None):
'has_errors': has_errors
}, context_instance=RequestContext(request))
+
@permission_required("config.can_approve")
def queue(request):
domains = DomainRequest.objects.filter(config=None).order_by('-votes')
@@ -289,10 +298,12 @@ def queue(request):
'invalid_configs': invalid_configs,
}, context_instance=RequestContext(request))
+
def policy(request):
return render_to_response('config/policy.html', {},
context_instance=RequestContext(request))
+
@permission_required("config.can_approve")
def approve(request, id):
config = get_object_or_404(Config, pk=id)
@@ -302,7 +313,7 @@ def approve(request, id):
return details(request, config.id, error=error)
old_status = config.status
message = ''
- if request.method == 'POST': # If the form has been submitted...
+ if request.method == 'POST': # If the form has been submitted...
data = request.POST
if data.get('approved', False):
# check if domains and domains requests are null
@@ -340,7 +351,7 @@ def approve(request, id):
message = data['comment']
config.status = 'invalid'
else:
- raise ValueError, "shouldn't get here"
+ raise ValueError("shouldn't get here")
config.save()
comment = Comment(user_name='ISPDB System',
site_id=settings.SITE_ID)
@@ -352,7 +363,8 @@ def approve(request, id):
comment.object_pk = config.pk
comment.save()
- return HttpResponseRedirect('/details/' + id) # Redirect after POST
+ return HttpResponseRedirect('/details/' + id) # Redirect after POST
+
@permission_required("config.can_approve")
def sanity(request, id):
@@ -360,11 +372,12 @@ def sanity(request, id):
domains = config.domains.all() or config.domainrequests.all()
domain_errors, domain_warnings = do_domain_checks(domains)
config_errors, config_warnings = do_config_checks(config)
- data = simplejson.dumps({"errors" : domain_errors + config_errors,
- "warnings" : domain_warnings + config_warnings,
+ data = simplejson.dumps({"errors": domain_errors + config_errors,
+ "warnings": domain_warnings + config_warnings,
})
return HttpResponse(data, mimetype='application/json')
+
@login_required
def delete(request, id):
config = get_object_or_404(Config, pk=id)
@@ -378,16 +391,18 @@ def delete(request, id):
return details(request, id, error=error)
if request.method == 'POST':
data = request.POST
- if data.has_key('delete') and data['delete'] == "delete":
+ if 'delete' in data and data['delete'] == "delete":
if not config.status in ("invalid", "requested"):
- return HttpResponseRedirect(reverse('ispdb_details',args=[id]))
+ return HttpResponseRedirect(reverse('ispdb_details',
+ args=[id]))
config.last_status = config.status
config.deleted_datetime = timezone.now()
config.status = 'deleted'
config.save()
- elif data.has_key('delete') and data['delete'] == "undo":
+ elif 'delete' in data and data['delete'] == "undo":
if not config.status == 'deleted':
- return HttpResponseRedirect(reverse('ispdb_details',args=[id]))
+ return HttpResponseRedirect(reverse('ispdb_details',
+ args=[id]))
delta = timezone.now() - config.deleted_datetime
if not delta.days > 0:
config.status = config.last_status
@@ -395,6 +410,7 @@ def delete(request, id):
config.save()
return HttpResponseRedirect(reverse('ispdb_details', args=[id]))
+
@login_required
def report(request, id):
config = get_object_or_404(Config, pk=id, status='approved')
@@ -507,6 +523,7 @@ def report(request, id):
'has_errors': has_errors,
}, context_instance=RequestContext(request))
+
def show_issue(request, id):
issue = get_object_or_404(Issue, pk=id)
incoming = []
@@ -576,8 +593,8 @@ def show_issue(request, id):
if not issue.config.status == "approved":
error = "Can't merge into non approved configurations."
if issue.config.locked:
- error = ("This configuration is locked. Only admins can unlock "
- "it.")
+ error = ("This configuration is locked. Only admins can unlock"
+ " it.")
elif issue.updated_config:
# check if any of the added domains are already bound to some
# approved configuration
@@ -586,9 +603,9 @@ def show_issue(request, id):
if Domain.objects.filter(name=domain,
config__status="approved"):
error = """Can't approve this configuration.
- Domain %s is already used by another approved
- configuration.""" % (domain)
- break;
+ Domain %s is already used by another
+ approved configuration.""" % (domain)
+ break
if not error:
# update domains
for domain in removed_domains:
@@ -606,8 +623,9 @@ def show_issue(request, id):
# update config fields
all_fields = base + incoming + outgoing
for field in all_fields:
- if field.has_key('new_value'):
- setattr(issue.config, field['name'], field['new_value'])
+ if 'new_value' in field:
+ setattr(issue.config, field['name'],
+ field['new_value'])
# update docurls
docurl_formset.delete()
for docurl in new_docurl_formset:
View
124 ispdb/convert.py
@@ -1,7 +1,9 @@
-import os, sys
+import os
+import sys
+import lxml.etree as ET
from ispdb.config.models import Domain, Config
-import lxml.etree as ET
+
def get_username(element):
rv = element.find('username')
@@ -13,6 +15,7 @@ def get_username(element):
rv = rv.text
return rv
+
def get_authentication(element):
rv = element.find('authentication').text
if (rv == "plain"):
@@ -21,68 +24,71 @@ def get_authentication(element):
rv = "password-encrypted"
return rv
+
def main():
- datadir = os.environ.get("AUTOCONFIG_DATA", "../autoconfig_data")
- for fname in os.listdir(datadir):
- fullpath = os.path.join(datadir, fname)
- if fname.startswith('.') or os.path.isdir(fullpath) or fname == "README":
- continue
- print "PROCESSING", fname
- et = ET.parse(fullpath)
- root = et.getroot()
- #print root
- incoming = root.find('.//incomingServer')
- outgoing = root.find('.//outgoingServer')
- id = root.find(".//emailProvider").attrib['id']
- # if we have one for this id, skip it
- if Config.objects.filter(email_provider_id=id):
- continue
+ datadir = os.environ.get("AUTOCONFIG_DATA", "../autoconfig_data")
+ for fname in os.listdir(datadir):
+ fullpath = os.path.join(datadir, fname)
+ if fname.startswith('.') or os.path.isdir(fullpath) or (fname ==
+ "README"):
+ continue
+ print "PROCESSING", fname
+ et = ET.parse(fullpath)
+ root = et.getroot()
+ #print root
+ incoming = root.find('.//incomingServer')
+ outgoing = root.find('.//outgoingServer')
+ id = root.find(".//emailProvider").attrib['id']
+ # if we have one for this id, skip it
+ if Config.objects.filter(email_provider_id=id):
+ continue
+
+ # Handle the older forms of XML.
+ incoming_username_form = get_username(incoming)
+ outgoing_username_form = get_username(outgoing)
+ incoming_authentication = get_authentication(incoming)
+ outgoing_authentication = get_authentication(outgoing)
- # Handle the older forms of XML.
- incoming_username_form = get_username(incoming)
- outgoing_username_form = get_username(outgoing)
- incoming_authentication = get_authentication(incoming)
- outgoing_authentication = get_authentication(outgoing)
-
- if not incoming_authentication:
- continue
-
- try:
- addThisServer = outgoing.find('addThisServer').text == "true"
- except:
- addThisServer = False
+ if not incoming_authentication:
+ continue
- try:
- useGlobalPreferredServer = outgoing.find('useGlobalPreferredServer').text == "true"
- except:
- useGlobalPreferredServer = False
+ try:
+ addThisServer = outgoing.find('addThisServer').text == "true"
+ except:
+ addThisServer = False
- c = Config(id=None,
- email_provider_id = id,
- display_name = root.find('.//displayName').text,
- display_short_name = root.find('.//displayShortName').text,
- incoming_type = incoming.attrib['type'],
- incoming_hostname = incoming.find('hostname').text,
- incoming_port = int(incoming.find('port').text),
- incoming_socket_type = incoming.find('socketType').text,
- incoming_authentication = incoming_authentication,
- incoming_username_form = incoming_username_form,
+ try:
+ useGlobalPreferredServer = \
+ outgoing.find('useGlobalPreferredServer').text == "true"
+ except:
+ useGlobalPreferredServer = False
- outgoing_hostname = outgoing.find('hostname').text,
- outgoing_port = int(outgoing.find('port').text),
- outgoing_socket_type = outgoing.find('socketType').text,
- outgoing_username_form = outgoing_username_form,
- outgoing_authentication = outgoing_authentication,
- outgoing_add_this_server = addThisServer,
- outgoing_use_global_preferred_server = useGlobalPreferredServer,
+ c = Config(id=None,
+ email_provider_id=id,
+ display_name=root.find('.//displayName').text,
+ display_short_name=root.find('.//displayShortName').text,
+ incoming_type=incoming.attrib['type'],
+ incoming_hostname=incoming.find('hostname').text,
+ incoming_port=int(incoming.find('port').text),
+ incoming_socket_type=incoming.find('socketType').text,
+ incoming_authentication=incoming_authentication,
+ incoming_username_form=incoming_username_form,
+ outgoing_hostname=outgoing.find('hostname').text,
+ outgoing_port=int(outgoing.find('port').text),
+ outgoing_socket_type=outgoing.find('socketType').text,
+ outgoing_username_form=outgoing_username_form,
+ outgoing_authentication=outgoing_authentication,
+ outgoing_add_this_server=addThisServer,
+ outgoing_use_global_preferred_server=(
+ useGlobalPreferredServer),
+ status='approved',
+ )
+ domains = root.findall('.//domain')
+ c.save()
+ ds = [Domain(id=None, name=d.text, config=c) for d in domains]
+ for d in ds:
+ d.save()
- status = 'approved',
- )
- domains = root.findall('.//domain')
- c.save()
- ds = [Domain(id=None, name=domain.text, config=c) for domain in domains]
- for d in ds:
- d.save()
if __name__ == "__main__":
- main()
+ main()
View
9 ispdb/tests/test_add.py
@@ -9,6 +9,7 @@
from ispdb.tests.common import adding_domain_form, asking_domain_form
from ispdb.tests.common import success_code, fail_code
+
class AddTest(TestCase):
fixtures = ['login_testdata.json']
@@ -150,7 +151,7 @@ def test_add_lots_of_domains(self):
assert_equal(res.status_code, success_code)
for i in range(num_domains):
assert_true(models.DomainRequest.objects.filter(
- name="test%d.com"%i))
+ name="test%d.com" % i))
# Tests correct handling of a leading zero in form-TOTAL_FORMS
def test_add_09_domains(self):
@@ -164,7 +165,7 @@ def test_add_09_domains(self):
assert_equal(res.status_code, success_code)
for i in range(num_domains):
assert_true(models.DomainRequest.objects.filter(
- name="test%d.com"%i))
+ name="test%d.com" % i))
def test_add_aaa_domains(self):
self.client.login(username='test', password='test')
@@ -190,7 +191,7 @@ def test_add_bad_number_domains(self):
assert_equal(res.status_code, fail_code)
for i in range(num_domains):
assert_false(models.DomainRequest.objects.filter(
- name="test%d.com"%i))
+ name="test%d.com" % i))
def test_add_invalid_domains(self):
self.client.login(username='test', password='test')
@@ -198,7 +199,7 @@ def test_add_invalid_domains(self):
'test',
'test_test.com',
'test.c',
- 100*'test'+'.com',
+ 100 * 'test' + '.com',
'127.0.0.1',
'example.',
'com.',
View
1  ispdb/tests/test_approve.py
@@ -7,6 +7,7 @@
from urllib import quote_plus
from nose.tools import *
+
class ApproveTest(TestCase):
fixtures = ['xml_testdata', 'login_testdata']
View
17 ispdb/tests/test_delete.py
@@ -13,6 +13,7 @@
# Return with form errors if form is invalid
fail_code = httplib.OK
+
class DeleteTest(TestCase):
fixtures = ['login_testdata', 'domain_testdata']
@@ -21,14 +22,14 @@ def delete_domain(self, id):
config = models.Config.objects.get(pk=id)
assert_true(config.status != 'deleted')
res = self.client.post(reverse("ispdb_delete", args=[id]),
- {'delete':'delete'})
+ {'delete': 'delete'})
assert_equal(res.status_code, success_code)
self.client.logout()
def test_delete_no_superuser_domainrequest(self):
self.client.login(username='test', password='test')
res = self.client.post(reverse("ispdb_delete", args=[1]),
- {'delete':'delete'})
+ {'delete': 'delete'})
assert_equal(res.status_code, success_code)
config = models.Config.objects.get(pk=1)
assert_equal(config.status, 'deleted')
@@ -37,7 +38,7 @@ def test_delete_no_superuser_domainrequest(self):
def test_delete_no_superuser_domain(self):
self.client.login(username='test', password='test')
res = self.client.post(reverse("ispdb_delete", args=[2]),
- {'delete':'delete'}, follow=True)
+ {'delete': 'delete'}, follow=True)
# Make sure it redirects to login page
goodRedirect = "/login/"
self.assertRedirects(res, goodRedirect)
@@ -50,7 +51,7 @@ def test_delete_locked_config(self):
config = models.Config.objects.get(pk=1)
assert_true(config.status != 'deleted')
res = self.client.post(reverse("ispdb_delete", args=[1]),
- {'delete':'delete'})
+ {'delete': 'delete'})
self.client.logout()
config = models.Config.objects.get(pk=1)
assert_true(config.status != 'deleted')
@@ -78,7 +79,7 @@ def test_undo_invalid_domain_normal_user(self):
assert_equal(config.status, 'deleted')
self.client.login(username='test', password='test')
res = self.client.post(reverse("ispdb_delete", args=[4]),
- {'delete':'undo'},
+ {'delete': 'undo'},
follow=True)
# Make sure it redirects to login page
goodRedirect = "/login/"
@@ -91,7 +92,7 @@ def test_undo_request_normal_user(self):
assert_equal(config.status, 'deleted')
self.client.login(username='test', password='test')
res = self.client.post(reverse("ispdb_delete", args=[1]),
- {'delete':'undo'},
+ {'delete': 'undo'},
follow=True)
config = models.Config.objects.get(pk=1)
assert_equal(config.status, 'requested')
@@ -102,7 +103,7 @@ def test_undo_invalid_request_superuser(self):
assert_equal(config.status, 'deleted')
self.client.login(username='test_admin', password='test')
res = self.client.post(reverse("ispdb_delete", args=[3]),
- {'delete':'undo'},
+ {'delete': 'undo'},
follow=True)
config = models.Config.objects.get(pk=3)
assert_equal(config.status, 'invalid')
@@ -116,7 +117,7 @@ def test_undo_old_config(self):
config.save()
self.client.login(username='test_admin', password='test')
res = self.client.post(reverse("ispdb_delete", args=[4]),
- {'delete':'undo'},
+ {'delete': 'undo'},
follow=True)
config = models.Config.objects.get(pk=4)
assert_equal(config.status, 'deleted')
View
15 ispdb/tests/test_edit.py
@@ -8,6 +8,7 @@
from ispdb.tests.common import adding_domain_form
from ispdb.tests.common import success_code, fail_code
+
class EditTest(TestCase):
fixtures = ['login_testdata.json']
@@ -25,7 +26,7 @@ def add_domain(self, name='test.com'):
def test_edit_no_login(self):
self.add_domain()
- res = self.client.post(reverse("ispdb_edit",args=[1]),
+ res = self.client.post(reverse("ispdb_edit", args=[1]),
adding_domain_form(),
follow=True)
# Make sure it redirects to login page
@@ -35,7 +36,7 @@ def test_edit_no_login(self):
def test_edit_invalid_user(self):
self.add_domain()
self.client.login(username='test2', password='test')
- res = self.client.post(reverse("ispdb_edit",args=[1]),
+ res = self.client.post(reverse("ispdb_edit", args=[1]),
adding_domain_form(),
follow=True)
# Make sure it redirects to login page
@@ -50,7 +51,7 @@ def test_edit_locked_config(self):
form = adding_domain_form()
form["display_name"] = "testing2"
self.client.login(username='test', password='test')
- res = self.client.post(reverse("ispdb_edit",args=[1]),
+ res = self.client.post(reverse("ispdb_edit", args=[1]),
form,
follow=True)
config = models.Config.objects.get(pk=1)
@@ -118,7 +119,7 @@ def test_edit_staff_user(self):
form["inst_0-INITIAL_FORMS"] = "1"
form["inst_0-0-id"] = "1"
form["inst_0-0-description"] = "test1"
- res = self.client.post(reverse("ispdb_edit",args=[1]),
+ res = self.client.post(reverse("ispdb_edit", args=[1]),
form,
follow=True)
goodRedirect = "/details/1/"
@@ -144,7 +145,7 @@ def test_edit_duplicated_names(self):
form = adding_domain_form()
form["domain-1-name"] = "test.com"
form["domain-1-DELETE"] = "False"
- res = self.client.post(reverse("ispdb_edit",args=[1]),
+ res = self.client.post(reverse("ispdb_edit", args=[1]),
form,
follow=True)
assert_equal(res.status_code, fail_code)
@@ -160,7 +161,7 @@ def test_add_existing_domain(self):
form = adding_domain_form()
form["domain-1-name"] = "approved.com"
form["domain-1-DELETE"] = "False"
- res = self.client.post(reverse("ispdb_edit",args=[1]),
+ res = self.client.post(reverse("ispdb_edit", args=[1]),
form,
follow=True)
assert_equal(res.status_code, fail_code)
@@ -176,7 +177,7 @@ def test_owner_edit_approved_domain(self):
self.client.login(username='test', password='test')
form = adding_domain_form()
form["incoming_hostname"] = "bar"
- res = self.client.post(reverse("ispdb_edit",args=[1]),
+ res = self.client.post(reverse("ispdb_edit", args=[1]),
form,
follow=True)
goodRedirect = "/login/"
View
23 ispdb/tests/test_issue.py
@@ -8,6 +8,7 @@
from ispdb.tests.common import adding_domain_form
from ispdb.tests.common import success_code, fail_code
+
def adding_issue_form():
form = adding_domain_form()
form['show_form'] = 'False'
@@ -50,7 +51,7 @@ def add_issue(self, updated_config=False, config_id=1):
self.client.logout()
def test_add_issue_no_login(self):
- res = self.client.post(reverse("ispdb_report",args=[2]),
+ res = self.client.post(reverse("ispdb_report", args=[2]),
adding_issue_form(),
follow=True)
# Make sure it redirects to login page
@@ -66,8 +67,8 @@ def test_add_issue_with_updated_config(self):
def test_close_issue_normal_user(self):
self.add_issue()
self.client.login(username='test', password='test')
- res = self.client.post(reverse("ispdb_show_issue",args=[1]),
- {"action":"close"},
+ res = self.client.post(reverse("ispdb_show_issue", args=[1]),
+ {"action": "close"},
follow=True)
issue = models.Issue.objects.get(title='Test')
assert_equal(issue.status, "open")
@@ -78,8 +79,8 @@ def test_close_issue_normal_user(self):
def test_close_issue_superuser(self):
self.add_issue()
self.client.login(username='test_admin', password='test')
- res = self.client.post(reverse("ispdb_show_issue",args=[1]),
- {"action":"close"},
+ res = self.client.post(reverse("ispdb_show_issue", args=[1]),
+ {"action": "close"},
follow=True)
issue = models.Issue.objects.get(title='Test')
assert_equal(issue.status, "closed")
@@ -87,8 +88,8 @@ def test_close_issue_superuser(self):
def test_merge_issue_normal_user(self):
self.add_issue(updated_config=True)
self.client.login(username='test', password='test')
- res = self.client.post(reverse("ispdb_show_issue",args=[1]),
- {"action":"merge"},
+ res = self.client.post(reverse("ispdb_show_issue", args=[1]),
+ {"action": "merge"},
follow=True)
issue = models.Issue.objects.get(title='Test')
assert_equal(issue.status, "open")
@@ -102,8 +103,8 @@ def test_merge_locked_config(self):
issue.config.locked = True
issue.config.save()
self.client.login(username='test_admin', password='test')
- res = self.client.post(reverse("ispdb_show_issue",args=[1]),
- {"action":"merge"},
+ res = self.client.post(reverse("ispdb_show_issue", args=[1]),
+ {"action": "merge"},
follow=True)
issue = models.Issue.objects.get(title='Test')
assert_equal(issue.status, "open")
@@ -114,8 +115,8 @@ def test_merge_locked_config(self):
def test_merge_issue_superuser(self):
self.add_issue(updated_config=True)
self.client.login(username='test_admin', password='test')
- res = self.client.post(reverse("ispdb_show_issue",args=[1]),
- {"action":"merge"},
+ res = self.client.post(reverse("ispdb_show_issue", args=[1]),
+ {"action": "merge"},
follow=True)
issue = models.Issue.objects.get(title='Test')
assert_equal(issue.status, "closed")
View
33 ispdb/tests/test_sanity.py
@@ -67,11 +67,13 @@ def tearDown(self):
def test_dns_methods(self):
# Set up our mock expectations.
name = dns.name.from_text('test.org.')
- message = dns.message.from_text(ns_message_text('test.org','test.org'))
+ message = dns.message.from_text(ns_message_text('test.org',
+ 'test.org'))
answer = dns.resolver.Answer(name, dns.rdatatype.NS,
dns.rdataclass.IN, message)
dns.resolver.query('test.org', 'NS').AndReturn(answer)
- message = dns.message.from_text(mx_message_text('test.org','test.org'))
+ message = dns.message.from_text(mx_message_text('test.org',
+ 'test.org'))
answer = dns.resolver.Answer(name, dns.rdatatype.MX,
dns.rdataclass.IN, message)
dns.resolver.query('test.org', 'MX').AndReturn(answer)
@@ -259,22 +261,26 @@ def test_sanity_view_no_errors(self):
# Set up our mock expectations.
# _do_domain_checks for test.org and test.com
name = dns.name.from_text('test.org.')
- message = dns.message.from_text(ns_message_text('test.org','test.org'))
+ message = dns.message.from_text(ns_message_text('test.org',
+ 'test.org'))
answer = dns.resolver.Answer(name, dns.rdatatype.NS,
dns.rdataclass.IN, message)
dns.resolver.query('test.org', 'NS').AndReturn(answer)
name = dns.name.from_text('test.com.')
- message = dns.message.from_text(ns_message_text('test.com','test.org'))
+ message = dns.message.from_text(ns_message_text('test.com',
+ 'test.org'))
answer = dns.resolver.Answer(name, dns.rdatatype.NS,
dns.rdataclass.IN, message)
dns.resolver.query('test.com', 'NS').AndReturn(answer)
name = dns.name.from_text('test.org.')
- message = dns.message.from_text(mx_message_text('test.org','test.org'))
+ message = dns.message.from_text(mx_message_text('test.org',
+ 'test.org'))
answer = dns.resolver.Answer(name, dns.rdatatype.MX,
dns.rdataclass.IN, message)
dns.resolver.query('test.org', 'MX').AndReturn(answer)
name = dns.name.from_text('test.com.')
- message = dns.message.from_text(mx_message_text('test.com','test.org'))
+ message = dns.message.from_text(mx_message_text('test.com',
+ 'test.org'))
answer = dns.resolver.Answer(name, dns.rdatatype.MX,
dns.rdataclass.IN, message)
dns.resolver.query('test.com', 'MX').AndReturn(answer)
@@ -313,12 +319,14 @@ def test_do_domain_checks_errors(self):
# with name servers of other domains. It will create a warning if we
# return different name servers
name = dns.name.from_text('test.org.')
- message = dns.message.from_text(ns_message_text('test.org','test.org'))
+ message = dns.message.from_text(ns_message_text('test.org',
+ 'test.org'))
answer = dns.resolver.Answer(name, dns.rdatatype.NS,
dns.rdataclass.IN, message)
dns.resolver.query('test.org', 'NS').AndReturn(answer)
name = dns.name.from_text('test.com.')
- message = dns.message.from_text(ns_message_text('test.com','test.com'))
+ message = dns.message.from_text(ns_message_text('test.com',
+ 'test.com'))
answer = dns.resolver.Answer(name, dns.rdatatype.NS,
dns.rdataclass.IN, message)
dns.resolver.query('test.com', 'NS').AndReturn(answer)
@@ -327,17 +335,18 @@ def test_do_domain_checks_errors(self):
# and outgoing server. If they are different it will create an error
# for each server
name = dns.name.from_text('test.org.')
- message = dns.message.from_text(mx_message_text('test.org','test.org'))
+ message = dns.message.from_text(mx_message_text('test.org',
+ 'test.org'))
answer = dns.resolver.Answer(name, dns.rdatatype.MX,
dns.rdataclass.IN, message)
dns.resolver.query('test.org', 'MX').AndReturn(answer)
name = dns.name.from_text('test.com.')
- message = dns.message.from_text(mx_message_text('test.com','test.com'))
+ message = dns.message.from_text(mx_message_text('test.com',
+ 'test.com'))
answer = dns.resolver.Answer(name, dns.rdatatype.MX,
dns.rdataclass.IN, message)
dns.resolver.query('test.com', 'MX').AndReturn(answer)
-
self.mox.ReplayAll()
#Test methods
@@ -487,5 +496,3 @@ def test_do_config_checks_capabilities_errors(self):
self.mox.VerifyAll()
assert_equal(len(errors), 2)
assert_equal(len(warnings), 2)
-
-
View
101 ispdb/tests/test_view.py
@@ -19,25 +19,25 @@
def make_config(value):
"Get the dictionary for a sample config."
return {
- "asking_or_adding":"adding",
- "domain-TOTAL_FORMS":"1",
- "domain-INITIAL_FORMS":"0",
- "domain-0-id":"",
- "domain-0-name":"test%s.com" % value,
- "domain-0-DELETE":"False",
- "display_name":"test%s" % value,
- "display_short_name":"test%s" % value,
- "incoming_type":"imap",
- "incoming_hostname":"foo",
- "incoming_port":"22%s" % value,
- "incoming_socket_type":"plain",
- "incoming_authentication":"password-cleartext",
- "incoming_username_form":"%EMAILLOCALPART%",
- "outgoing_hostname":"bar",
- "outgoing_port":"22%s" % value,
- "outgoing_socket_type":"STARTTLS",
- "outgoing_username_form":"%EMAILLOCALPART%",
- "outgoing_authentication":"password-cleartext",
+ "asking_or_adding": "adding",
+ "domain-TOTAL_FORMS": "1",
+ "domain-INITIAL_FORMS": "0",
+ "domain-0-id": "",
+ "domain-0-name": "test%s.com" % value,
+ "domain-0-DELETE": "False",
+ "display_name": "test%s" % value,
+ "display_short_name": "test%s" % value,
+ "incoming_type": "imap",
+ "incoming_hostname": "foo",
+ "incoming_port": "22%s" % value,
+ "incoming_socket_type":