In [4]:
#importing required libraries
!pip install python-whois
!pip install dnspython
import smtplib
import ssl
import socket
import whois
import dns.resolver



In [5]:
#email security tool class
class EmailSecurityTool:
 def __init__(self, domain, server, ip):
   self.domain = domain
   self.server = server
   self.ip = ip

def run_checks(self):
  self.check_smtps_port()
  self.verify_ssl_certificate()
  self.check_domain_age()
  self.check_domain_reputation()
  self.get_mx_records()
  self.verify_mx_records()
  self.reverse_dns_lookup()
  self.get_mta_sts_policy()
  self.get_tls_rpt_policy()


In [7]:
#SMTPS ValidationPort Verification
def check_smtps_port(self):
  try:
    server = smtplib.SMTP_SSL(self.server, 465)
    print(f"{self.server} is using SMTPS on port 465")
  except:
      try:
        server = smtplib.SMTP(self.server, 587)
        server.starttls()
        print(f"{self.server} is using SMTPS on port 587")
      except:
        print(f"{self.server} is not using SMTPS on ports 465 or 587")

#SSL/TLS Certificate Verification
def verify_ssl_certificate(self):
  context = ssl.create_default_context()
  conn = context.wrap_socket(socket.socket(socket.AF_INET), server_hostname=self.server)
  conn.connect((self.server, 465))
  cert = conn.getpeercert()
  ssl.match_hostname(cert, self.server)
  print(f"SSL/TLS certificate for {self.server} is valid and not expired.")


In [8]:
#whois lookups
def check_domain_age(self):
  w = whois.whois(self.domain)
  creation_date = w.creation_date
  print(f"Domain {self.domain} was registered on {creation_date}")

In [9]:
#reputation assessment
def check_domain_reputation(self):
  known_malicious_domains = ['malicious.com', 'phishing.com']
  if self.domain in known_malicious_domains:
    print(f"Domain {self.domain} is known for malicious activity.")
  else:
    print(f"Domain {self.domain} is not listed as malicious.")

In [10]:
#mx (mail exchange) records
#dns lookups
def get_mx_records(self):
  try:
    answers = dns.resolver.resolve(self.domain, 'MX')
    for rdata in answers:
      print(f"MX record for {self.domain}: {rdata.exchange} with priority {rdata.preference}")
  except dns.resolver.NoAnswer:
    print(f"No MX records found for {self.domain}")

#MX Record Verification
def verify_mx_records(self):
  valid_mx_servers = ['mx1.valid.com', 'mx2.valid.com']
  try:
    answers = dns.resolver.resolve(self.domain, 'MX')
    for rdata in answers:
      mx_record = str(rdata.exchange)
      if mx_record in valid_mx_servers:
        print(f"MX record {mx_record} for {self.domain} is legitimate.")
      else:
        print(f"MX record {mx_record} for {self.domain} is suspicious.")
  except dns.resolver.NoAnswer:
    print(f"No MX records found for {self.domain}")


In [11]:
#reverse dns lookup
def reverse_dns_lookup(self):
  try:
    result = socket.gethostbyaddr(self.ip)
    print(f"IP address {self.ip} resolves to {result[0]}")
  except socket.herror:
    print(f"IP address {self.ip} does not resolve to a valid domain")

In [12]:
#TLS Reporting
#MTA-STS Policy
def get_mta_sts_policy(self):
  try:
    answers = dns.resolver.resolve('_mta-sts.' + self.domain, 'TXT')
    for rdata in answers:
      print(f"MTA-STS policy for {self.domain}: {rdata.strings}")
  except dns.resolver.NoAnswer:
    print(f"No MTA-STS policy found for {self.domain}")

In [15]:
#TLS-RPT Implementation
def get_tls_rpt_policy(self):
  try:
    answers = dns.resolver.resolve('_smtp._tls.' + self.domain, 'TXT')
    for rdata in answers:
      print(f"TLS-RPT policy for {self.domain}: {rdata.strings}")
  except dns.resolver.NoAnswer:
    print(f"No TLS-RPT policy found for {self.domain}")

tool = EmailSecurityTool(domain='google.com', server='smtp.google.com', ip='8.8.8.8')
tool.run_checks()


Domain google.com is not listed as malicious.
MX record for google.com: smtp.google.com. with priority 10
MX record smtp.google.com. for google.com is suspicious.
IP address 8.8.8.8 resolves to dns.google
MTA-STS policy for google.com: (b'v=STSv1; id=20210803T010101;',)
TLS-RPT policy for google.com: (b'v=TLSRPTv1;rua=mailto:sts-reports@google.com',)


In [16]:
#url analysis and regex crafting
#feature extraction function
import re
from urllib.parse import urlparse, parse_qs

def extract_features(url):
  parsed_url = urlparse(url)
  features = {'url_length': len(url),'hostname_length': len(parsed_url.hostname)
  if parsed_url.hostname else 0,
              'path_length': len(parsed_url.path),
              'query_length': len(parsed_url.query),
              'fragment_length': len(parsed_url.fragment),
              'num_special_chars': len(re.findall(r'[@\-_.]', url)),
              'num_subdomains': parsed_url.hostname.count('.')
              if parsed_url.hostname else 0,
              'uses_https': int(parsed_url.scheme == 'https'),
              'has_ip': int(bool(re.search(r'\d+\.\d+\.\d+\.\d+',parsed_url.hostname)))
              if parsed_url.hostname else 0,
              'num_query_params': len(parse_qs(parsed_url.query)),
              'contains_suspicious_words':int(bool(re.search(r'login|verify|bank|secure', url, re.IGNORECASE)))
}
  return features

In [None]:
#model training and loading
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import joblib

# Load the pre-trained model
model = joblib.load('phishing_url_model.pkl')

# Example: Function to train and save the model (run this part separately as needed)
def train_and_save_model(dataset_path):
  dataset = pd.read_csv(dataset_path)
  dataset['features'] = dataset['url'].apply(extract_features)
  features_df = pd.json_normalize(dataset['features'])
  X_train, X_test, y_train, y_test = train_test_split(features_df, dataset['label'], test_size=0.2, random_state=42)
  model = RandomForestClassifier(n_estimators=100, random_state=42)
  model.fit(X_train, y_train)
  joblib.dump(model, 'phishing_url_model.pkl')
  accuracy = accuracy_score(y_test, model.predict(X_test))
  print(f'Model accuracy: {accuracy}')

# Uncomment to train and save the model
# train_and_save_model('url_dataset.csv')


In [None]:
#email parsing and url extraction
import email
from email.policy import default

def extract_urls(msg):
  urls = []
  for part in msg.walk():
    if part.get_content_type() == 'text/plain':
      text = part.get_payload()
      urls.extend(re.findall(r'http[s]?://\S+', text))
  return urls

In [None]:
#email analysis and flagging
def analyze_email(raw_email):
  msg = email.message_from_string(raw_email, policy=default)
  urls = extract_urls(msg)
  results = []
  for url in urls:
    features = extract_features(url)
    features_df = pd.json_normalize(features)
    prediction = model.predict(features_df)
    results.append((url, prediction[0]))
  return results

# Example: Analyze a raw email
raw_email = """<raw email content>""" # Replace with actual raw email content
analysis_results = analyze_email(raw_email)
print(analysis_results)

# Function to flag or block emails based on analysis def flag_or_block_email(analysis_results):
for url, is_phishing in analysis_results:
  if is_phishing:
    print(f"Flagged phishing URL: {url}")

# Here, you could move the email to a spam folder, mark it as suspicious, or block it.
  else:
    print(f"Legitimate URL: {url}")
# Run the flagging/blocking process
flag_or_block_email(analysis_results)


In [None]:
#Connecting to Email Server and Processing Emails
# Connect to the email server
def connect_to_email(username, password, server='imap.gmail.com'):
  mail = imaplib.IMAP4_SSL(server)
  mail.login(username, password)
  mail.select('inbox')
  return mail

# Fetch and process emails
def fetch_and_process_emails(mail):
  status, messages = mail.search(None, 'ALL')
  email_ids = messages[0].split()
  for e_id in email_ids:
    status, data = mail.fetch(e_id, '(RFC822)')
    raw_email = data[0][1].decode('utf-8')
    analysis_results = analyze_email(raw_email)
    flag_or_block_email(analysis_results)

# Example usage: Connect to email server and process emails username = 'your-email@example.com'
password = 'your-password'
mail = connect_to_email(username, password)
fetch_and_process_emails(mail)

In [None]:
#certificate verification and validation code
import ssl # Provides SSL/TLS functionality
import OpenSSL # Python bindings for OpenSSL library
from cryptography import x509 # Tools for parsing X.509 certificates
from cryptography.hazmat.backends import default_backend # Default backend for cryptography module
from cryptography.x509.oid import NameOID # Object identifiers (OIDs) for X.509 certificates
from datetime import datetime      # Date and time manipulation
from email.parser import BytesParser # Parse bytes into email objects
from email.policy import default # Default parsing policy for email module
import requests # HTTP library for making requests to URLs


# Function to load and parse a certificate from file or data
def load_certificate(cert_data=None, cert_file=None):
  if cert_file:
    with open(cert_file, "rb") as f:
      cert_data = f.read() # Read certificate data from file
      if not cert_data:
        raise ValueError("Certificate data or file must be provided.")

# Ensure certificate data is provided
  cert = x509.load_pem_x509_certificate(cert_data, default_backend()) # Parse PEM encoded certificate
  return cert

# Function to verify certificate chain using system's trusted CAs
def verify_certificate_chain(cert):
  try:
    trusted_cas = ssl.create_default_context().get_ca_certs() # Load system's trusted CA certificates
    store = OpenSSL.crypto.X509Store() # Create X.509 certificate store
    for ca in trusted_cas:
      ca_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, ca) # Load CA certificate
      store.add_cert(ca_cert) # Add CA certificate to store
      store_ctx = OpenSSL.crypto.X509StoreContext(store, OpenSSL.crypto.X509.from_cryptography(cert)) # Create store context
      store_ctx.verify_certificate() # Verify certificate against trusted CAs
      print("Certificate chain is valid.") # Print success message if valid
  except Exception as e:
    print(f"Certificate chain verification failed: {e}") # Print error message if verification fails

# Function to check if certificate is expired
def check_expiration(cert):
  not_after = cert.not_valid_after # Get certificate expiration date
  if not_after < datetime.utcnow(): # Compare expiration date with current time
    print("Certificate is expired.") # Print message if certificate is expired
  else:
    print("Certificate is valid.") # Print message if certificate is still valid


# Function to verify domain name against certificate's common name
def verify_domain(cert, domain_name):
  try:
    common_name = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value # Get common name from certificate
    if domain_name == common_name:
      print("Domain name matches the certificate.") # Print message if domain matches certificate
    else:
      print("Domain name does not match the certificate.") # Print message if domain does not match certificate
  except Exception as e:
    print(f"Domain verification failed: {e}") # Print error message if domain verification fails

# Placeholder function to check certificate revocation status using OCSP
def check_revocation_status(cert):
  try:
    print("Checking revocation status via OCSP...") # Placeholder message for OCSP status check
    print("Certificate is not revoked.") # Placeholder message assuming certificate is not revoked
  except Exception as e:
    print(f"OCSP check failed: {e}") # Print error message if OCSP check fails

# Function to read and parse email from file or data
def parse_email(email_data=None, email_file=None):
  if email_file:
    with open(email_file, 'rb') as f:
      email_data = f.read() # Read email data from file
  if not email_data:
    raise ValueError("Email data or file must be provided.") # Ensure email data is provided

  email = BytesParser(policy=default).parsebytes(email_data) # Parse email bytes into email object
  return email


# Function to fetch and verify SSL certificate of a URL
def fetch_ssl_certificate(url):
  try:
    response = requests.get(url) # Send HTTP GET request to URL
    print(f"HTTP Status Code: {response.status_code}") #Print HTTP status code of the response
    cert_data = ssl.get_server_certificate((url, 443)) # Fetch SSL certificate data from server
    cert = load_certificate(cert_data=cert_data.encode('utf-8')) # Load and parse SSL certificate
    return cert # Return parsed SSL certificate
  except Exception as e:
    print(f"Failed to fetch SSL certificate: {e}") # Print error message if fetching SSL certificate fails
    return None # Return None if fetching SSL certificate fails


# Predefined paths and URLs for testing (replace with actual paths and URLs)
cert_file = "path/to/certificate.pem" # Replace with actual path to certificate file
domain_name = "example.com" # Replace with actual domain name for certificate verification
email_file = "path/to/email.eml" # Replace with actual path to email file for parsing
url = "https://example.com" # Replace with actual URL for fetching SSL certificate

# Load and parse certificate if cert_file is provided
if cert_file:
  try:
    cert = load_certificate(cert_file=cert_file) # Load certificate from file
    print("Certificate loaded successfully.") # Print success message if certificate loaded successfully
  except ValueError as e:
    print(f"Error loading certificate: {e}") # Print error message if loading certificate fails


# Verify certificate chain if cert is loaded
if cert:
  verify_certificate_chain(cert) # Verify certificate chain against trusted CAs
  check_expiration(cert) # Check if certificate is expired
  verify_domain(cert, domain_name) # Verify domain name against certificate
  check_revocation_status(cert) # Check certificate revocation status (placeholder)


# Parse and analyze email if email_file is provided
if email_file:
  try:
    email = parse_email(email_file=email_file) # Parse email from file
    print("Email parsed successfully.") # Print success message if email parsed successfully
  except ValueError as e:
    print(f"Error parsing email: {e}") # Print error message if parsing email fails


# Fetch and verify SSL certificate from URL if URL is provided
if url:
  url_cert = fetch_ssl_certificate(url) # Fetch SSL certificate from URL
  if url_cert:
    verify_certificate_chain(url_cert) # Verify certificate chain against trusted CAs
    check_expiration(url_cert) # Check if certificate is expired
    verify_domain(url_cert, domain_name) # Verify domain name against certificate
    check_revocation_status(url_cert) # Check certificate revocation status (placeholder)



In [None]:
# Additional steps: Integrate with ML models, update systems, and handle incident response as per above steps
#IP, DOMAIN & DNS BLACKLISTING
#email retrieval from the server
def fetch_latest_email(server, username, password):
       mail = imaplib.IMAP4_SSL(server)# Connect to the server and login using the login creds
       mail.login(username, password)
       mail.select('inbox')
       status, messages = mail.search(None, 'ALL')  # Search for the latest email in the inbox
       email_ids = messages[0].split()
       latest_email_id = email_ids[-1] #Get the latest email ID
       status, msg_data = mail.fetch(latest_email_id, '(RFC822)') #Fetching Email Data of the latest email
       for response_part in msg_data:
                    if isinstance(response_part, tuple):
                      raw_email = response_part[1]
                      mail.close()
                      mail.logout()
                      return raw_email

#Parsing Email data to Extract Source IP and Domain Name and checking against a Blacklist
def parse_and_check(raw_email):
       parsed_mail = mailparser.parse_from_bytes(raw_email) # Parse the raw email using mailparser
       sender_domain = parsed_mail.from_sender_ip = parsed_mail.get_server_ipaddress() #Extract sender's domain and IP address
       with open('ip_blacklist.txt', 'r') as f: #checking against IP Blacklist
             for line in f:
                           if sender_ip == line.strip():
                             print(f'Sender IP {sender_ip} found in IP blacklist!')
       with open('domain_blacklist.txt', 'r') as f: #checking against Domain Blacklist

                   for line in f:
                    if sender_domain == line.strip():
                      print(f'Sender Domain {sender_domain} found in domain blacklist!')



In [None]:
#implementation of SPF Verification
import dns.resolver
import ipaddress

def get_spf_record(domain):
  try:
    answers = dns.resolver.resolve(domain, 'TXT')
    for record in answers:
      for txt_string in record.strings:
        if txt_string.startswith(b'v=spf1'):
          return txt_string.decode('utf-8')
  except dns.resolver.NoAnswer:
    return None
  except dns.resolver.NXDOMAIN:
    return None


def parse_spf_record(spf_record):
  terms = spf_record.split()
  mechanisms = [term for term in terms if term != 'v=spf1' and term != '-all']
  return mechanisms

def check_ip_in_spf(ip_address, mechanisms):
  for mechanism in mechanisms:
    if mechanism.startswith('ip4:'):
      ip_range = mechanism.split(':')[1]
      if ipaddress.ip_address(ip_address) in ipaddress.ip_network(ip_range):
        return True
    elif mechanism.startswith('include:'):
      included_domain = mechanism.split(':')[1]
      included_spf_record = get_spf_record(included_domain)
      if included_spf_record:
        included_mechanisms = parse_spf_record(included_spf_record)
        if check_ip_in_spf(ip_address, included_mechanisms):
          return True
  return False


def verify_spf(email_address, sending_ip):
  domain = email_address.split('@')[-1]
  spf_record = get_spf_record(domain)
  if not spf_record:
    print(f"No SPF record found for domain {domain}.")
    return False

  mechanisms = parse_spf_record(spf_record)
  if check_ip_in_spf(sending_ip, mechanisms):
    print(f"IP address {sending_ip} is authorized to send email for {domain}.")
    return True
  else:
    print(f"IP address {sending_ip} is not authorized to send email for {domain}.")
    return False

# Example Usage
email_address = "[email protected]"
sending_ip = "192.0.2.1"
result = verify_spf(email_address, sending_ip)
print(f"SPF Verification Result: {result}")

In [None]:
#Implementation of Email Hop count And MIME version Analysis
import imaplib
import email
from email import policy
from email.parser import BytesParser

def count_hops(email_content):
  msg = BytesParser(policy=policy.default).parsebytes(email_content)
  hop_count = 0
  for header in msg.items():
    if header[0].lower() == 'received':
      hop_count += 1
  return hop_count


def analyze_mime_headers(msg):
  mime_version = msg.get('MIME-Version')
  if mime_version:
    print(f"MIME-Version: {mime_version}")
    if mime_version != '1.0':
      print("Alert: Unusual MIME-Version detected! This email may be suspicious.")

  content_type = msg.get('Content-Type')
  if content_type:
    print(f"Content-Type: {content_type}")


def get_gmail_emails():
  raw_email = b"""Received: by 2002:a05:7000:1fa1:b0:596:762d:281d with SMTP id hr33csp1783068mab;
Tue, 25 Jun 2024 04:47:44 -0700 (PDT)
Received: from mta-86-140.sparkpostmail.com (mta-86-140.sparkpostmail.com. [192.174.86.140])
by mx.google.com with ESMTPS id d2e1a72fcca58-70677f7881dsi4685954b3a.361.2024.06.25.04.47.43
for <devsh8681@gmail.com>
(version=TLS1_2 cipher=ECDHE-ECDSA-AES128-GCM-SHA256 bits=128/128); Tue, 25 Jun 2024 04:47:43 -0700 (PDT)
From: Simplilearn <updates@simplilearnmailer.com> To: "devsh8681@gmail.com" <devsh8681@gmail.com> Subject:
=?utf-8?B?RmluYWwgUmVtaW5kZXIgfCDwn5OiIEV0aGljYWwgSGFja2luZyBXZWJpbmFy?= =?utf-8?Q?_Tomorrow?=
Thread-Topic:
=?utf-8?B?RmluYWwgUmVtaW5kZXIgfCDwn5OiIEV0aGljYWwgSGFja2luZyBXZWJpbmFy?= =?utf-8?Q?_Tomorrow?=
Thread-Index: ATZBLjNGBO9J08+wm8WapqQeCHGq5Q== X-MS-Exchange-MessageSentRepresentingType: 1
Date: Tue, 25 Jun 2024 17:17:43 +0530
Message-ID: <E8.FA.23409.F5EAA766@kn.mta1vrest.cc.prd.sparkpost> List-Unsubscribe:
<mailto:unsubscribe@unsub.spmta.com?subject=unsubscribe:Lp8Wo6n23WcGo1ahBt2xw

  SLnCaaf0X6eejSgGZ30d6g~|eyAicmNwdF90byI6ICJkZXZzaDg2ODFAZ21haWwuY29tIiwgInRlb mFudF9pZCI6ICJzcGMiLCAiY3VzdG9tZXJfaWQiOiAiMjkxODU5IiwgInN1YmFjY291bnRfaWQiOi AiMCIsICJtZXNzYWdlX2lkIjogIjY2NzU1ZmFlN2E2NjgyNGIyZjhlIiB9>,<https://unsubscr ibe.spmta.com/u/DCwsJixhzLMHhohyjnQWLQ~~/AAR0EwA~/RgRoXTNfPFcDc3BjQgpmdV-uema CSy-OUhNkZXZzaDg2ODFAZ21haWwuY29tWAQAAAAA>
Content-Language: en-US
X-Hashtags: #Newsletters,#Commercial
X-MS-Has-Attach:
X-MS-Exchange-Organization-SCL: -1
X-MS-TNEF-Correlator: X-MS-Exchange-Organization-RecordReviewCfmType: 0
received-spf: pass (google.com: domain of msprvs1=19906gr4oiu_i=bounces-291859@spmailtechnol.com designates 192.174.86.140 as permitted sender) client-ip=192.174.86.140; Content-Type: multipart/alternative;
boundary="_000_E8FA23409F5EAA766knmta1vrestccprdsparkpost_" MIME-Version: 1.0
"""
  msg = email.message_from_bytes(raw_email, policy=policy.default)
  print(msg)
  hop_count = count_hops(raw_email)
  print(f"The email hop count is: {hop_count}")
  unusual_hop_count_threshold = 3
  if hop_count >= unusual_hop_count_threshold:
    print("Alert: Unusual hop count detected! This email may be suspicious.")

  analyze_mime_headers(msg)

get_gmail_emails()