In [None]:
import pandas as pd
from email.parser import BytesParser
from bs4 import BeautifulSoup
import re
from urllib.parse import urlparse
import requests
from pathlib import Path



In [None]:
import sys
print(sys.executable)


In [None]:
paths = [
    '/data/dataset/Phishing_-_3rd_Party/0a0e0cab473ff110072fbf12516d43c9/0_message.eml',
    '/data/dataset/Phishing_–_UBC/0ad4904a1bda559024c255b62a4bcbc3/0_message.eml',
    '/data/dataset/CEO_Fraud_-_Wire_Transfers/3a55d04d473bc290efc6767b416d43db/0_message.eml',
    '/data/dataset/CEO_Fraud_-_Gift_Cards/b9ed6f671bc6155024c255b62a4bcb1b/0_message.eml',
    '/data/dataset/Phishing_UBC_-_Outbound/0aeaad25938502105a9f30edfaba102e/0_message.eml'
]

emails = []
payloads = []
text_html = []
text_plain = []
text_clean = []

for i, path in enumerate(paths):
    with open(path, 'rb') as fp:
        msg = BytesParser().parse(fp)
        emails.append(msg)

    content_type = list()
    payload = {}

    for part in msg.walk():
        payload[part.get_content_type()] = part.get_payload(decode=True)

    payloads.append(payload)

    text_html.append(payload['text/html'] if 'text/html' in payload.keys() else None)

    text_plain.append(payload['text/plain'].decode() if 'text/plain' in payload.keys() else BeautifulSoup(payload['text/html']).get_text())

    text_clean.append(' '.join(text_plain[i].split()))


In [None]:
data_df_old = pd.DataFrame({
    'path': paths,
    'email': emails,
    'payload': payloads,
    'text_html': text_html,
    'text_plain': text_plain,
    'text_clean': text_clean,
}).set_index('path')


data_df_old

# filter eml

In [None]:
target_dirs = [
    "CEO_Fraud_-_Gift_Cards",
    "CEO_Fraud_-_Payroll_Update",
    "CEO_Fraud_-_Wire_Transfers",
    "Legitimate_Email_Confirmed",
    "Phishing_-_3rd_Party",
    "Phishing_-_Outbound",
    "Phishing_–_UBC",
    "Phishing_UBC_-_Outbound",
    "Reply_Chain_Attack",
    "Self-Phishing",
    "Spam_-_False_Positives",
    "Spam_–_Inbound",
    "Spam_–_Outbound",
    "Spearphishing"
]

root_dir = Path("/data/dataset")

eml_files = []
for folder in target_dirs:
    folder_path = root_dir / folder
    if folder_path.exists():
        for eml in folder_path.rglob("*_message.eml"):
            eml_files.append((folder, str(eml)))

df_eml = pd.DataFrame(eml_files, columns=["label", "path"])

df_eml

# sample 30 from each folder

In [None]:
sample_df = df_eml.groupby("label").apply(lambda x: x.sample(n=min(30, len(x)), random_state=42)).reset_index(drop=True)

emails = []
payloads = []
text_html = []
text_plain = []
text_clean = []

for i, path in enumerate(sample_df["path"]):
    try:
        with open(path, 'rb') as fp:
            msg = BytesParser().parse(fp)
            emails.append(msg)

            payload = {}
            for part in msg.walk():
                ctype = part.get_content_type()
                try:
                    payload[ctype] = part.get_payload(decode=True)
                except:
                    payload[ctype] = None
            payloads.append(payload)

            html = payload.get("text/html", None)
            plain = payload.get("text/plain", None)

            if html and not plain:
                try:
                    soup = BeautifulSoup(html, "html.parser")
                    plain = soup.get_text().encode()
                except:
                    plain = b""

            try:
                decoded_plain = plain.decode() if plain else ""
            except:
                decoded_plain = ""

            text_html.append(html)
            text_plain.append(decoded_plain)
            text_clean.append(" ".join(decoded_plain.split()))
    except Exception as e:
        emails.append(None)
        payloads.append({})
        text_html.append(None)
        text_plain.append("")
        text_clean.append("")

data_df = pd.DataFrame({
    "label": sample_df["label"].values,
    "path": sample_df["path"].values,
    "email": emails,
    "payload": payloads,
    "text_html": text_html,
    "text_plain": text_plain,
    "text_clean": text_clean
}).set_index("path")

data_df

In [None]:
data_df["label"].value_counts()


In [None]:
df_eml["label"].value_counts().sort_index()


# Extract URLs from .eml File
*urlextract package

In [None]:
def extract_urls(text=None, html=None):
    """
    Extract URLs from either plain text or HTML.

    Parameters
    ----------
    text : str or pd.Series, optional
        Plain text (cleaned).
    html : str or pd.Series, optional
        Raw HTML content.

    Returns
    -------
    list or pd.Series
        List of extracted URLs from both text and HTML.
    """

    url_pattern = re.compile(r'(https?://[^\s\)\]\<\>\"\'\,]+|www\.[^\s\)\]\<\>\"\'\,]+)')

    def _extract_combined(txt, html_str):
        urls = []
        
        if isinstance(txt, str):
            matches = url_pattern.findall(txt)
            urls.extend(u if u.startswith(('http://', 'https://')) else 'http://' + u for u in matches)
        
        if isinstance(html_str, str):
            try:
                soup = BeautifulSoup(html_str, "html.parser")
                urls.extend(a.get("href") for a in soup.find_all("a") if a.get("href"))
            except Exception:
                pass
        return list(set(urls)) 

    
    if isinstance(text, pd.Series) or isinstance(html, pd.Series):
        return pd.Series([
            _extract_combined(t, h)
            for t, h in zip(
                text if isinstance(text, pd.Series) else [None]*len(html),
                html if isinstance(html, pd.Series) else [None]*len(text)
            )
        ])
    else:
        return _extract_combined(text, html)



In [None]:
url_lists = extract_urls(data_df.text_clean)
url_lists

# Count the number of URLs

In [None]:
def get_url_count(text):
    """
    Count the number of URLs in a given text or Series of texts.

    Parameters
    ----------
    text : str or pd.Series
        Raw or cleaned email text(s). Can be a single string or a pandas Series.

    Returns
    -------
    int or pd.Series
        Number of URLs in the input text(s).

    Example
    -------
    >>> get_url_count("Visit https://ubc.ca or http://example.com")
    2

    >>> pd.Series(["https://a.com", "no link"]).pipe(get_url_count)
    0    1
    1    0
    dtype: int64
    """
    urls = extract_urls(text)
    if isinstance(urls, pd.Series):
        return urls.apply(len)
    else:
        return len(urls)
    


In [None]:
url_count = get_url_count(data_df.text_clean)

url_count

# Check if URLs accessible (Ture if there exist at least 1 accessible URL for saving source)

In [None]:
def has_accessible_url(urls, timeout=3):
    """
    Check whether at least one URL in a list is accessible (i.e., returns HTTP 200).

    Parameters
    ----------
    urls : list of str or str
        A list of URL strings (or a single URL string) to check.
    timeout : int, optional (default=3)
        Timeout in seconds for each request.

    Returns
    -------
    bool
        True if at least one URL is accessible; False if all are unreachable or invalid.

    Example
    -------
    >>> has_accessible_url(["https://ubc.ca", "http://invalid.domain"])
    True
    """
    if isinstance(urls, str):
        urls = [urls]
    if not isinstance(urls, list):
        return False

    for url in urls:
        if not url.startswith(('http://', 'https://')):
            url = "http://" + url
        try:
            resp = requests.head(url, allow_redirects=True, timeout=timeout)
            if resp.status_code == 200:
                return True
        except requests.RequestException:
            continue
    return False




In [None]:
check_accessible = url_lists.apply(has_accessible_url)
check_accessible

In [None]:
check_accessible[check_accessible]

# Check if redirection happened (True if redirection happened)

In [None]:
def has_redirected_url(urls, timeout=3):
    """
    Check whether any URL in the list performs a redirection to a different destination.

    Parameters
    ----------
    urls : list of str or str
        A list of URLs or a single URL string.
    timeout : int, default 3
        Timeout for each request in seconds.

    Returns
    -------
    bool
        True if any URL redirects to a different destination;
        False if none redirect or all fail to connect.
    """

    # Normalize to list
    if isinstance(urls, str):
        urls = [urls]
    if not isinstance(urls, list):
        return False

    headers = {"User-Agent": "Mozilla/5.0"}

    def normalize(url):
        parsed = urlparse(url)
        return parsed.geturl().rstrip('/').lower()

    for url in urls:
        if not isinstance(url, str) or not url.strip():
            continue
        if not url.startswith(('http://', 'https://')):
            url = "http://" + url

        try:
            # First try HEAD
            response = requests.head(url, allow_redirects=True, timeout=timeout, headers=headers)
            final_url = response.url
        except requests.RequestException:
            try:
                # Fallback to GET
                response = requests.get(url, allow_redirects=True, timeout=timeout, headers=headers, stream=True)
                final_url = response.url
            except requests.RequestException:
                continue

        if normalize(final_url) != normalize(url):
            return True

    return False



In [None]:
check_redirection = url_lists.apply(has_redirected_url)
check_redirection

In [None]:
check_redirection[check_redirection]

# Check if URLs contain IP address  eg.http://111.123.1.1
- hide real domain
- Rapid deployment
- avoid blacklist filter

In [None]:
import re

def has_ip_url(urls):
    """
    Check whether any of the given URLs contains an IP address.

    Parameters
    ----------
    urls : str or list of str
        A single URL string or a list of URL strings to be checked.

    Returns
    -------
    bool
        True if any URL in the input contains an IP address.
        Returns False for invalid input types or if no IP address is found.

    Notes
    -----
    - This function treats any non-string elements in a list as invalid and skips them.
    - If a URL doesn't start with 'http://' or 'https://', 'http://' is prepended before checking.
    - A valid IP address pattern matches 'http(s)://<digit>.<digit>.<digit>.<digit>' at the start.

    Examples
    --------
    >>> has_ip_url("http://192.168.0.1/index.html")
    True

    >>> has_ip_url(["https://example.com", "http://172.16.0.1"])
    True

    >>> has_ip_url(["https://example.com", "not a url"])
    False

    >>> has_ip_url(12345)
    False
    """
    ip_pattern = re.compile(r'^https?://(\d{1,3}\.){3}\d{1,3}')

    # Reject unsupported input types early
    if not isinstance(urls, (list, str)):
        return False

    # Normalize to list for uniform processing
    if isinstance(urls, str):
        urls = [urls]

    # Check each URL
    for u in urls:
        if not isinstance(u, str) or not u.strip():
            continue  # Skip non-string or empty entries
        url_to_check = u if u.startswith(("http://", "https://")) else "http://" + u
        if ip_pattern.match(url_to_check):
            return True

    return False



In [None]:
check_ip = url_lists.apply(has_ip_url)

check_ip[check_ip == True]

check_ip

# Check if URLs starts with http
- http is old, cheap and faster for setup

In [None]:
def has_http_only(urls):
    """
    Check whether any of the given URLs explicitly starts with 'http://' (not 'https://').

    Parameters
    ----------
    urls : str or list of str
        A single URL string or a list of URL strings.

    Returns
    -------
    bool
        True if any URL in the input starts with 'http://' only (not 'https://').
        Returns False for invalid input or if no such URL is found.

    Examples
    --------
    >>> has_http_only("http://example.com")
    True

    >>> has_http_only(["https://secure.com", "http://open.com"])
    True

    >>> has_http_only(["https://secure.com", "www.example.com"])
    False
    """
    if not isinstance(urls, (str, list)):
        return False

    if isinstance(urls, str):
        urls = [urls]

    for u in urls:
        if isinstance(u, str) and u.startswith("http://"):
            return True

    return False


In [None]:
check_http = url_lists.apply(has_http_only)

check_http

In [None]:
check_http[check_http == True]

# Check if URLs has @ symbol
- @ can hide real link, everything before @ are treated as login info. http://UBC.com@bad.com actually you will go to bad.com

In [None]:
def has_at_symbol(urls):
    """
    Check whether any of the given URLs contains an '@' symbol.

    Parameters
    ----------
    urls : str or list of str
        A single URL string or a list of URL strings.

    Returns
    -------
    bool
        True if any URL in the input contains an '@' symbol.
        Returns False for invalid input or if no such symbol is found.

    Examples
    --------
    >>> has_at_symbol("http://user@example.com")
    True

    >>> has_at_symbol(["https://example.com", "http://admin@evil.com"])
    True

    >>> has_at_symbol(["https://safe.com", "http://normal.com"])
    False

    >>> has_at_symbol(12345)
    False
    """
    if not isinstance(urls, (str, list)):
        return False

    if isinstance(urls, str):
        urls = [urls]

    for u in urls:
        if isinstance(u, str) and "@" in u:
            return True

    return False



In [None]:
check_at = url_lists.apply(has_at_symbol)
check_at[check_at == True]


In [None]:
check_at = url_lists.apply(has_at_symbol)

check_at

# Check if URLs has port number
- Normal website won't include prot number (http:80  https:443)
- easy setup
- avoid firewall filter

In [None]:
def has_port_number(urls):
    """
    Check whether any of the given URLs includes an explicit port number.

    Parameters
    ----------
    urls : str or list of str
        A single URL string or a list of URL strings.

    Returns
    -------
    bool
        True if any URL contains an explicit port number (e.g., ':8080').
        False for invalid input or if no port is specified.

    Examples
    --------
    >>> has_port_number("http://example.com:8080")
    True

    >>> has_port_number(["https://abc.com", "http://site.org:8000/page"])
    True

    >>> has_port_number(["https://example.com", "http://abc.com"])
    False

    >>> has_port_number(12345)
    False
    """
    if not isinstance(urls, (str, list)):
        return False

    if isinstance(urls, str):
        urls = [urls]

    for u in urls:
        if not isinstance(u, str) or not u.strip():
            continue
        # Add scheme if missing for proper parsing
        full_url = u if u.startswith(("http://", "https://")) else "http://" + u
        parsed = urlparse(full_url)
        if parsed.port is not None:
            return True

    return False


In [None]:
check_port_number = url_lists.apply(has_port_number)

check_port_number

In [None]:
check_port_number[check_port_number == True]


# Check if URLs too long (default threshold = 75)
- confuse user
- most normal link are simple and short

In [None]:
def has_long_url(urls, threshold=75):
    """
    Check whether any of the given URLs exceeds a specified length threshold.

    Parameters
    ----------
    urls : str or list of str
        A single URL string or a list of URL strings.
    threshold : int, optional (default=75)
        The minimum length at which a URL is considered "long".

    Returns
    -------
    bool
        True if any URL is longer than the threshold.
        False for invalid input or if all URLs are within limit.

    Examples
    --------
    >>> has_long_url("http://short.com")
    False

    >>> has_long_url("http://verylongurl.com/" + "a"*80)
    True

    >>> has_long_url(["http://a.com", "http://b.com/" + "x"*100], threshold=90)
    True

    >>> has_long_url(None)
    False
    """
    if not isinstance(urls, (str, list)):
        return False

    if isinstance(urls, str):
        urls = [urls]

    for u in urls:
        if isinstance(u, str) and len(u) > threshold:
            return True

    return False


In [None]:
check_long_url = url_lists.apply(has_long_url)

check_long_url

# Check if URLs has multiple subdomains
- confuse user

In [None]:
def has_multiple_subdomains(urls):
    """
    Check whether any of the given URLs has more than 2 dots in its domain,
    indicating the presence of multiple subdomains (e.g., a.b.c.com).

    Parameters
    ----------
    urls : str or list of str
        A single URL string or a list of URL strings.

    Returns
    -------
    bool
        True if any URL contains more than one subdomain (i.e., more than 2 dots in domain).
        False for invalid input or normal domain structures like 'www.example.com'.

    Examples
    --------
    >>> has_multiple_subdomains("http://a.b.c.com")
    True

    >>> has_multiple_subdomains(["example.com", "x.y.z.domain.com"])
    True

    >>> has_multiple_subdomains("http://www.example.com")
    False

    >>> has_multiple_subdomains(12345)
    False
    """
    if not isinstance(urls, (str, list)):
        return False

    if isinstance(urls, str):
        urls = [urls]

    for u in urls:
        if not isinstance(u, str) or not u.strip():
            continue
        url_to_check = u if u.startswith(("http://", "https://")) else "http://" + u
        parsed = urlparse(url_to_check)
        domain_parts = parsed.netloc.split(".")
        # e.g. 'a.b.c.com' → ['a', 'b', 'c', 'com'] → 4 parts → subdomains = 4 - 2 = 2 → True
        if len(domain_parts) > 3:
            return True

    return False



In [None]:
check_subdomains = url_lists.apply(has_multiple_subdomains)

check_subdomains