# Analyse d'un email suspect

Le vecteur initial d'intrusion le plus courant dans un incident cyber est un mail de phishing. \
C'est pour cela qu'il est très important de pouvoir analyser facilement un email reçu afin d'identifier très facilement un mail légitime d'un mail malveillant.

Il existe plusieurs format de mail ".msg" et ".eml". \
Les outils pour réaliser les analyses sont différents. Nous allons donc créer des sections distinctes.

Voici quelques ressources utilisées : 
- https://www.intezer.com/blog/incident-response/automate-analysis-phishing-email-files/


Ce playbook a pour objectif d'extraire et analyser les en-têtes des Emails pour répondre aux questions suivantes : 
- Quand a été envoyé ce mail ?
- Quelle l'@ip du serveur SMTP à l'origine de l'envoi ?
- Quelle est l'adresse email de l'expéditeur ?
- Quelle est l'adresse email de la personne destinataire du mail ?
- Est-ce que le contenu est malveillant (url, pièce jointe)?


Pour ce faire, voici les plus importants :
- Expéditeur du mail, plusieurs valeurs permettent de l'identifier. Si ces valeurs diffèrent, nous sommes très certainement face à une tentative d'usurpation d'identité.
    - **From**
    - **envelope-from**
    - **Return-Path**, **reply-to**, correspond à l'adresse mail à laquelle une réponse au mail sera envoyée;
    - **smtp.mailfrom**
    - **smtp.pra**, 
    - **x-sender**
- Informations relatives à la sécurité :
    - **x-spam status**, affiche une qualification du mail 

In [None]:
# Load python modules
from colorama import init, Fore, Back, Style
from datetime import datetime
from defang import defang
import eml_parser
import json
import msticpy as mp
import os
import pandas as pd
import msticpy.sectools as sectools
import re
import magic
from msg_parser import MsOxMessage
 

#Expand the width of the cells
from IPython.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', None)

!export MSTICPYCONFIG=./msticpyconfig.yaml
mp.init_notebook(globals(), verbosity=0)
ti = mp.TILookup()
ioc_extractor = sectools.IoCExtract()

# Regex 
regex_email = r"\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b"
regex_message_id = r"[^@]+@([^'>]+)"
regex_url = r"((https?|ftp|ssh|mailto):\/\/[a-zA-Z0-9\/:%_+.#?!@&=-]+)"

# Observables
observables_lst = []

# Chemin du fichier EML ou MSG à analyser
emailFile = {}
emailFile['path'] = "/home/secubian/Documents/Cases/payloads_and_dumps/email_eml/QUEIXA_CONTRA_SI.eml"  


# Determine the file type

Ce controle permet d'identifier s'il s'agit d'un fichier EML ou MSG Outlook, car les outils d'analyse sont différents.

In [None]:
import os.path
if os.path.exists(emailFile['path']):
    fileType = magic.Magic().id_filename(emailFile['path'])
    if "Microsoft Outlook Message" in fileType and ".msg" in emailFile['path']:
        print("This is a MSG file")
        emailType = "msg"
    elif "text" in fileType and ".eml" in emailFile['path']:
        print("This is an EML file")
        emailType = "eml"
    else:
        print("File Type not supported!")
else:
    print(f"File [{emailFile['path']}] doesn't exist")

## Extraction d'observables utils

Que le mail soit au format EML ou MSG, il est toujours interessant d'extraire les observables du type Email, URLs, présents dans le corps du mail. \
Une fois extraits, nous pourrons les rechercher dans les bases de connaissances des menaces.

In [None]:
# https://enjoylifescience.com/2020/11/05/analyzing-emails-in-python/
from email import policy
from email.parser import BytesParser

with open(emailFile['path'], 'rb') as fp:
    name = fp.name  # Get file name
    msg = BytesParser(policy=policy.default).parse(fp)
body_str = msg.get_body(preferencelist=('html','plain')).get_content()

# URLs research
urls_lst = []
matches = re.finditer(regex_url, body_str, re.MULTILINE)
for matchNum, match in enumerate(matches, start=1):
    urls_lst.append(match.groups()[0])
    observables_lst.append(match.groups()[0])
if len(urls_lst) > 0:
    urls_lst = set(urls_lst)
    print(Fore.RED + f"Some urls has been found : {urls_lst}")

# Emails research
emails_lst = []
matches = re.finditer(regex_email, body_str, re.MULTILINE)
for matchNum, match in enumerate(matches, start=1):
    emails_lst.append(match.group())
    observables_lst.append(match.group())
if len(emails_lst) > 0:
    emails_lst = set(emails_lst)
    print(Fore.RED + f"Some emails has been found : {emails_lst}")

## Analyse des mails au format EML

La première étape consiste à extraire les informations utiles telles que les entêtes. \
Ces informations seront placées dans des variables pour être analysées ci-dessous.

In [None]:
mail_headers = {}
headers = {}

eml = eml_parser.parser.decode_email(emailFile['path'], include_attachment_data=True)
headers = eml['header']
mail_headers['From'] = headers['from']
mail_headers['Subject'] = headers['subject']
observables_lst.append(mail_headers['From'])

print(Fore.RED + f"Le mail semble avoir été envoyé par [{mail_headers['From']}] \navec le sujet suivant : [{mail_headers['Subject']}]")

### Analyse de l'expéditeur

Il est facile de falcifier l'adresse mail à l'origine de l'envoi du mail. \
Sans rentrer dans les détails, nous allons analyser les entêtes comportant l'adresse mail de l'expéditeur. \
Si l'analyse recense plus d'une adresse mail, alors nous sommes potentiellement face à une usurpation d'identité.

In [None]:
# Analyse senders adresses
senders_headers = ['from','envelope-from','smtp.mailfrom','smtp.pra','reply-to','return-path','x-sender']
senders_email = []
match_from = None

for sender in senders_headers:
    if sender in headers['header']:
        match_from = re.search(regex_email, headers['header'][sender][0])
    if sender in headers:
        match_from = re.search(regex_email, headers[sender])
    if match_from is not None:
        senders_email.append(match_from.group())
        observables_lst.append(match_from.group())

if len(senders_email) > 1: senders_email = set(senders_email)

if (mail_headers['From'] in str(senders_email)) and len(senders_email) == 1:
    print(Fore.GREEN + "[✓] No Potentially identity usurpation")
    print(Fore.GREEN + f"[✓] Only one email address has been identified : {mail_headers['From']}")
else:
    print(Fore.RED + "[!] Potentially identity usurpation during headers analysis (from, envelope-from, x-sender, smtp.mailfrom, return-path, reply-to)")
    print(f"Email addresses specified in From value : {mail_headers['From']}")
    print(f"All email addresses identified in headers : {senders_email}")

### Analyse de/des destinataire(s) du mail

En cas d'incident de sécurité, il est opportun de connaitre l'ensemble des personnes ayant reçu le mail malveillant pour réaliser le confinement de l'attaque (isolation du poste de travail, exécution d'un scan antivirus, etc...)

In [None]:
# Analyse "to" adresses
to_headers = ['to','delivered-to','envelope-to','received_foremail']
to_email = []
match_to = None

for sender in to_headers:
    if sender in headers['header'] and len(headers['header'][sender][0]) > 0:
        match_to = re.search(regex_email, headers['header'][sender][0])
    else:
        if sender in headers and len(headers[sender]) > 0:
            match_to = re.search(regex_email, headers[sender][0])

    if match_to is not None:
        to_email.append(match_to.group())

if len(to_email) > 1: to_email = set(to_email)

print(f"Email has been delivered to : {to_email}")

### Analyse de l'entête "Message ID"

Il peut être observé durant l'analyse de l'entête "Message ID" un nom de domaine différent de celui présent dans l'adresse mail de l'expéditeur. Cette situation peut aider à identifier un mail malveillant.

In [None]:
if 'message-id' in headers['header']:
    result = re.search(regex_message_id, str(headers['header']['message-id']))
    message_id_domain = result.group(1)
    observables_lst.append(message_id_domain)
if message_id_domain not in mail_headers['From']:
    print(Fore.RED + f"[!] Domain in Message-ID ({message_id_domain}) doesn't equal with 'From' email address domain ({mail_headers['From']})")
else:
    print(Fore.GREEN + f"[✓] Message-ID domain ({message_id_domain}) is similar with 'From' email address domain ({mail_headers['From']})")

### Analyse des serveurs ayant émis le mail
La prochaine étape consiste à identifier le serveur ayant émis le mail.

In [None]:
email_servers_ips_lst = []

if headers['received'][len(headers['received'])-1]['by'][0]:
    email_servers_ips_lst.append(headers['received'][len(headers['received'])-1]['by'][0])
    observables_lst.append(headers['received'][len(headers['received'])-1]['by'][0])

if len(email_servers_ips_lst) > 0:
    print(Fore.GREEN + f"Email was sent by : {email_servers_ips_lst}")
else:
    print(Fore.RED + f"[!] There is an issue in  headers analyse.")


### Analyse de la durée nécessaire à l'envoi du mail

Un mail n'a besoin que de quelques secondes pour transiter entre le client de messagerie de l'expéditeur et le serveur de messagerie du destinataire. \
Il est donc plutot facile d'identifier des mails envoyés à des groupes de diffusion, ou spam en masse qui demande plus de temps de traitement aux serveurs intermédiaires durant le transite du mail. Nous avons décidé ici d'établir un seuil à 60 secondes.

In [None]:
from datetime import timezone
# Analyse the delta between the sending of the email by the sender and its reception by the recipient. 
mail_date_sent = headers['date'].replace(tzinfo=timezone.utc)
mail_date_received = headers['received'][0]['date'].replace(tzinfo=timezone.utc)
delta = mail_date_received - mail_date_sent

print(f"Sending date : {mail_date_sent}")
print(f"Receiving date : {mail_date_received}")

if delta.total_seconds() > 60:
    print(Fore.RED + f"[!] It seems to be a SPAM : {delta.total_seconds()} seconds ")
else:
    print(Fore.GREEN + f"[✓] It seems to be a Legit mail : {delta.total_seconds()} seconds ")
    
mail_headers['mail_time_delay (seconds)'] = delta.total_seconds()

### Extraction des pièces jointes

L'analyse du fichier EML continue en réalisant une extraction des pièces jointes. \
Ces dernières peuvent contenir du code malveillant. Il est donc nécessaire d'être vigilant.

Une des premières actions pourrait être l'analyse des pièces jointes via un antivirus, ou rechercher leur hash.

In [None]:
import base64
outpath = f"{emailFile['path']}_attachments/"
if 'attachment' in eml:
    if len(eml['attachment']) > 0:
        for attachment in eml['attachment']:
            filename = attachment['filename']
            filename = os.path.join(outpath, filename)
            if not os.path.exists(os.path.dirname(filename)):
                try:
                    os.makedirs(os.path.dirname(filename))
                except OSError as exc:  # Guard against race condition
                    if exc.errno != errno.EEXIST:
                        raise

            print(Fore.RED + f"[!] Writing attachment: {filename}")

            with open(filename, 'wb') as a_out:
                a_out.write(base64.b64decode(attachment['raw']))
    else:
        print(Fore.GREEN + f"[✓] No attachment included")
else:
    print(Fore.GREEN + f"[✓] No attachment included")

La commande ci-dessous va générer les empreintes numériques des fichiers extraits. \
Il est tout à fait possible à l'issue de réaliser une recherche en sources ouvertes de ces valeurs (ex: https://www.virustotal.com/gui/home/search)

In [None]:
outpath_hash = outpath.replace(' ','\ ')
!hashdeep -r $outpath_hash

## Analyse des mails au format MSG Outlook

La première étape consiste à extraire les informations utiles telles que les entêtes. \
Ces informations seront placées dans des variables pour être analysées ci-dessous.

In [None]:
import extract_msg

mail_headers = {}
headers = {}

email_msg = extract_msg.openMsg(emailFile['path'])
headers = json.loads(json.dumps(email_msg.headerDict))
mail_headers['From'] = headers['From']
mail_headers['Subject'] = headers['Subject']
mail_headers['To'] = headers['To']
observables_lst.append(mail_headers['From'])

print(Fore.RED + f"Le mail semble avoir été envoyé par [{mail_headers['From']}] \navec le sujet suivant : [{mail_headers['Subject']}]")


### Analyse de l'expéditeur

Il est facile de falcifier l'adresse mail à l'origine de l'envoi du mail. \
Sans rentrer dans les détails, nous allons analyser les entêtes comportant l'adresse mail de l'expéditeur. \
Si l'analyse recense plus d'une adresse mail, alors nous sommes potentiellement face à une usurpation d'identité.

In [None]:
# Analyse senders adresses
senders_email = []
senders_headers = ['From','envelope-from','smtp.mailfrom','smtp.pra','reply-to','return-path','x-sender']

regex = [
            {"name":"From (envelope-from)", "value":r'envelope-from=\"([^;,]+)\"', "regexGroupID":0},
            {"name":"From (smtp.mailfrom)", "value":r'smtp.mailfrom=([^;,]+)', "regexGroupID":0},
            {"name":"From (smtp.mailfrom)", "value":r'smtp.mfrom=([^;,]+)', "regexGroupID":0},
            {"name":"From (smtp.pra)", "value":r'smtp.pra=([^;,]+)', "regexGroupID":0},
            {"name":"From (return-path)", "value":r'Return-Path[^\:]+: ([^\\\r,;]+)', "regexGroupID":0},
            #{"name":"From (return-path)", "value":r'Return-Path: ([^\\\r]+)', "regexGroupID":0},
            {"name":"From (x-sender)", "value":r'X-Sender: ([^\\\r]+)', "regexGroupID":0},
            {"name":"From (reply-to)", "value":r'Reply-To: ([^\\\r]+)', "regexGroupID":0},  # A valider
        ]

for r in regex:
    result = re.search(r['value'], str(headers))
    if result is not None:
        match = re.search(regex_email, result.group(1))
        if match is not None:
            senders_email.append(match.group())
            observables_lst.append(match.group())

senders_email = list(set(senders_email))


# Extract email from From header
match_email_from = re.search(regex_email, mail_headers['From'])
match_email_from = match_email_from.group()

if (match_email_from in str(senders_email)) and len(senders_email) == 1:
    print(Fore.GREEN + "[✓] No Potentially identity usurpation")
    print(Fore.GREEN + f"[✓] Only one email address has been identified : {mail_headers['From']}")
else:
    print(Fore.RED + "[!] Potentially identity usurpation during headers analysis (from, envelope-from, x-sender, smtp.mailfrom, return-path)")
    print(f"Email addresses specified in From value : {mail_headers['From']}")
    print(f"All email addresses identified in headers : {senders_email}")



### Analyse de l'entête "Message ID"

Il peut être observé durant l'analyse de l'entête "Message ID" un nom de domaine différent de celui présent dans l'adresse mail de l'expéditeur. Cette situation peut aider à identifier un mail malveillant.

In [None]:
regex_message_id = r"<[^@]+@([^>]+)>"

result = re.search(regex_message_id, headers['Message-ID'])
message_id_domain = result.group(1)
if message_id_domain not in mail_headers['From']:
    print(Fore.RED + f"[!] Domain in Message-ID ({message_id_domain}) doesn't equal with 'From' email address domain ({mail_headers['From']})")
else:
    print(Fore.GREEN + f"[✓] Message-ID domain ({message_id_domain}) is similar with 'From' email address domain ({mail_headers['From']})")


### Analyse des serveurs ayant émis le mail
La prochaine étape consiste à identifier le serveur ayant émis le mail.

In [None]:
email_servers_lst = []
regex_senders_email_server = [
            {"name":"X-Source-IP", "value":r'X-Source-IP[^\:]+: ([^\\\r,;]+)', "regexGroupID":1},
            {"name":"X-Source-Sender", "value":r'X-Source-Sender[^\:]+: ([^\\\r,;]+)', "regexGroupID":1},
            {"name":"X-SenderID", "value":r'X-SenderID[^\:]+: ([^\\\r,;]+)', "regexGroupID":1},
        ]

for r in regex_senders_email_server:
    result = re.search(r['value'], str(headers))
    if result is not None:
        email_servers_lst.append(result.group(r['regexGroupID']).replace("'",""))
        observables_lst.append(result.group(r['regexGroupID']).replace("'",""))

email_servers_lst = list(set(email_servers_lst))


if len(email_servers_lst) > 0:
    print(Fore.GREEN + f"Email was sent by servers : {email_servers_lst}")
else:
    print(Fore.RED + f"[!] There is an issue in  headers analyse.")


### Analyse de la durée nécessaire à l'envoi du mail

Un mail n'a besoin que de quelques secondes pour transiter entre le client de messagerie de l'expéditeur et le serveur de messagerie du destinataire. \
Il est donc plutot facile d'identifier des mails envoyés à des groupes de diffusion, ou spam en masse qui demande plus de temps de traitement aux serveurs intermédiaires durant le transite du mail. Nous avons décidé ici d'établir un seuil à 60 secondes.

In [None]:
# Analyse the delta between the sending of the email by the sender and its reception by the recipient. 
delta = email_msg.receivedTime - datetime.strptime(email_msg.date, '%a, %d %b %Y %H:%M:%S %z')
if delta.total_seconds() > 60:
    print(Fore.RED + f"[!] It seems to be a SPAM : {delta.total_seconds()} seconds ")
else:
    print(Fore.GREEN + f"[✓] It seems to be a Legit mail : {delta.total_seconds()} seconds ")
    
mail_headers['mail_time_delay (seconds)'] = delta.total_seconds()

### Analyse de l'outil ayant envoyé le mail

Comme pour les requetes HTTP avec l'en-tête "User-Agent", certains clients de messagerie laissent leur empreinte lors de l'envoi. \
Cette empreinte peut se trouver dans l'en-tête : "X-Mailer".

In [None]:
if 'X-Mailer' in headers:
    print(Fore.RED + f"[!] L'outil utilisé pour l'envoi du mail est : {headers['X-Mailer']}")
else:
    print(Fore.GREEN + f"[✓] L'outil utilisé pour l'envoi du mail n'a pas pu être identifié.")


## Analyse en Threat Intel des observables identifiés

La dernière étape est commune aux deux types de fichiers EML, et MSG. \
Elle consiste à rechercher dans les bases de connaissance de Threat Intel, les observables identifiés durant toute l'analyse des en-têtes.

In [None]:
# Retrieve all observables list and concat them in one DataFrame
df_emails_servers_ips = pd.DataFrame(email_servers_lst, columns=['servers'])
df_urls = pd.DataFrame(urls_lst, columns=['servers'])
df_observables_lst = pd.DataFrame(observables_lst, columns=['servers'])
resource = [df_emails_servers_ips, df_urls, df_observables_lst]
df_observable = pd.concat(resource)

# Analyse all observable and extract domains,ips,urls
df_observable = ioc_extractor.extract(data = df_observable, columns=['servers'])
df_observable = df_observable.drop_duplicates(subset=['Observable'])

# Research observables in theat intel database
df_observable_ti = ti.lookup_iocs(data=df_observable, ioc_col="Observable", providers=["VirusTotal", "OTX"])
df_observable_ti = pd.json_normalize(data=df_observable_ti[['Ioc','Provider','Details']].to_dict(orient='records'))

if (df_observable_ti.empty):
    print(Fore.GREEN + "[✓] No Potentially Suspicious Network connections identified")
else:
    print(Fore.RED + "[!] Potentially Suspicious Network connections identified")
    display(df_observable_ti.sort_values(by="Details.pulse_count", ascending=False))