Skip to content

Commit

Permalink
major update
Browse files Browse the repository at this point in the history
  • Loading branch information
un33k committed Jan 12, 2013
1 parent 2a2d39e commit 73a781a
Showing 1 changed file with 113 additions and 115 deletions.
228 changes: 113 additions & 115 deletions emailahoy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,46 @@
# -*- coding: utf-8 -*-

__version__ = '0.0.5'
__version__ = '0.0.6'

import re
import sys
import smtplib
import socket
import re
import popen2
import smtplib as _smtp

DEBUG = True

__all__ = ['VerifyEmail', 'verify_email_address', 'query_mx']

mx_re = re.compile('mail\sexchanger\s=\s(\d+)\s(.*)\.')
MX_RE = re.compile('mail\sexchanger\s=\s(\d+)\s(.*)\.')
EMAIL_RE = re.compile('([\w\-\.+]+@\w[\w\-]+\.+[\w\-]+)')
NOT_FOUND_KEYWORDS = [
"does not exist",
"doesn't exist",
"doesn't have",
"doesn't handle",
"unknown user",
"user unknown",
"rejected",
"disabled",
"discontinued",
"unavailable",
"unknown",
"invalid",
"typos",
"unnecessary spaces",
]

UNVERIFIABLE_KEYWORDS = [
"block",
"block list",
"spam",
"spammer",
"isp",
"weren't sent",
"not accepted",
]


def query_mx(host):
""" Returns all MX records of a given domain name """
Expand All @@ -23,7 +53,7 @@ def query_mx(host):
fout, fin = popen2.popen2('%s -query=mx %s' % (cmd, host))
line = fout.readline()
while line <> '':
m = mx_re.search(line.lower())
m = MX_RE.search(line.lower())
if m:
mx.append((eval(m.group(1)), m.group(2)))
line = fout.readline()
Expand All @@ -36,128 +66,88 @@ def query_mx(host):
class VerifyEmail(object):
""" Verify if email exists """

EMAIL_RE = re.compile('([\w\-\.+]+@\w[\w\-]+\.+[\w\-]+)')
default_response = (550, 'Reason not known')

# given a hostname, all mx records will be returned
def get_mx_for_hostname(self, hostname):
mx = []
if self.is_hostname_valid(hostname):
try:
mx = query_mx(hostname)
except:
pass
return mx


# given a host name, returns True if valid, else False
def is_hostname_valid(self, hostname):
""" if hostname is valid """
try:
socket.gethostbyname(hostname)
except:
return False
return True

EMAIL_FOUND = 1
EMAIL_NOT_FOUND = 2
UNABLE_TO_VERIFY = 3

# given an email address, returns True if email matches a valid pattern
def is_email_valid(self, email):
""" if a given email maches the email pattern """
return self.EMAIL_RE.search(email)

def connect(self, hostname, timeout=10):
""" Returns a server connection or None given a hostname """

# given an email, hostname is returned
def get_hostname_from_email(self, email):
try:
hostname = email.strip().split('@')[1]
except:
hostname = None
return hostname


# given a hostname, a smtp server connection is returned or None
def get_smtp_connection(self, hostname):
""" returns a server with valid connection if possible """
resp = self.default_response
connection_success = lambda x: x[0] == 220
if self.is_hostname_valid(hostname):
server = smtplib.SMTP()
try:
resp = server.connect(hostname)
except:
pass
if connection_success(resp):
socket.gethostbyname(hostname)
server = _smtp.SMTP(timeout=timeout)
code, resp = server.connect(hostname)
if code == 220:
return server
except:
pass
return None


# given a response tuple, it returns True if status was success
def was_found(self, resp):
""" email WAS found """
return resp[0] == 250
def unverifiable(self, resp):
""" Return true if email is not verifiable """
return any(a in resp.lower() for a in UNVERIFIABLE_KEYWORDS)


# given a response tuple, it returns True if it can tell if email was not found
def not_found(self, resp):
""" email was NOT found """
not_found_words = [
"does not exist",
"doesn't exist",
"rejected",
"disabled",
"discontinued",
"unavailable",
"unknown",
"invalid",
"doesn't handle",
]
return resp[0] != 250 and any(a in resp[1].lower() for a in not_found_words)
def nonexistent(self, resp):
""" Return true if email is not verifiable """
return any(a in resp.lower() for a in NOT_FOUND_KEYWORDS)


# given a response tuple, it returns true if it couldn't tell, if email found or not
def could_not_verify_status(self, resp):
""" email unverifiable """
return not (self.was_found(resp) or self.not_found(resp))


# returns a response tuple indicating the existance of an email address
def verify_email_smtp(
def verify(
self,
email,
from_host='example.com',
from_email='verify@example.com'
):
""" if an email does exsit """

cmd_success = lambda x: x[0] == 250
found = False
resp = self.default_response
if self.is_email_valid(email):
hostname = self.get_hostname_from_email(email)
mx = self.get_mx_for_hostname(hostname)
if not mx:
return (550, 'No-Mx-Found')
for m in mx:
server = self.get_smtp_connection(m[1])
if server:
try:
resp = server.docmd('HELO %s' % from_host)
except:
continue
if cmd_success(resp):
try:
resp = server.docmd('MAIL FROM: <%s>' % from_email)
except:
continue
if cmd_success(resp):
try:
resp = server.docmd('RCPT TO: <%s>' % email)
except:
continue
break
""" verifies wether an email address does exsit """

if not EMAIL_RE.search(email):
return self.EMAIL_NOT_FOUND

try:
hostname = email.strip().split('@')[1]
socket.gethostbyname(hostname)
mail_exchangers = query_mx(hostname)
except:
return self.UNABLE_TO_VERIFY

for mx in mail_exchangers:
mx_name = mx[1]
server = self.connect(mx_name)
if not server:
return (550, 'No-Server-Connection')
return resp
continue
if DEBUG:
server.set_debuglevel(1)
code, resp = server.helo(mx_name)
if code != 250:
if not self.unverifiable(resp):
return self.UNABLE_TO_VERIFY
continue
code, resp = server.mail(from_email)
if code != 250:
if not self.unverifiable(resp):
return self.UNABLE_TO_VERIFY
continue
code, resp = server.rcpt(email)
if code != 250:
if self.nonexistent(resp):
return self.EMAIL_NOT_FOUND
elif self.unverifiable(resp):
return self.UNABLE_TO_VERIFY
else:
continue
code, resp = server.data('Ahoy. Are you there?{0}.{0}'.format(_smtp.CRLF))
if code != 250:
if self.nonexistent(resp):
return self.EMAIL_NOT_FOUND
elif self.unverifiable(resp):
return self.UNABLE_TO_VERIFY
elif code == 250:
return self.EMAIL_FOUND

return self.UNABLE_TO_VERIFY


# given an email it returns True if it can tell it exist or False
Expand All @@ -168,9 +158,17 @@ def verify_email_address(
):
""" A quick email verification fuction """
e = VerifyEmail()
status = e.verify_email_smtp(email=email, from_host='example.com', from_email='verify@example.com')
if e.was_found(status):
return True
return False


status = e.verify(email, from_host, from_email)
print status
if status == e.EMAIL_NOT_FOUND:
return False
return True

# if __name__ == "__main__":
# if verify_email_address('un33kvu@gmail.com', 'djanguru.com', 'verify@djanguru.com'):
# if verify_email_address('un33ksssddsdsd333vu@gmail.com', 'djanguru.com', 'verify@djanguru.com'):
# if verify_email_address('un33kvu@yahoo.com', 'djanguru.com', 'verify@djanguru.com'):
# if verify_email_address('un33ksssddsdsd333vu@yahoo.com', 'djanguru.com', 'verify@djanguru.com'):
# if verify_email_address('un33ksssddsdsd333vu@cnn.com', 'djanguru.com', 'verify@djanguru.com'):
# if verify_email_address('vinnie@cnn.com', 'djanguru.com', 'verify@djanguru.com'):
# print "found"

0 comments on commit 73a781a

Please sign in to comment.