# Data Extraction

In [1]:
import io
import os
import re
import tarfile
from pathlib import Path

import chardet
import numpy as np
import pandas as pd
import requests
from bs4 import BeautifulSoup
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

23/06/17 11:31:32 WARN Utils: Your hostname, mr.local resolves to a loopback address: 127.0.0.1; using 192.168.15.9 instead (on interface en0)
23/06/17 11:31:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/17 11:31:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
APACHE_SPAM_ASSASSIN = "https://spamassassin.apache.org/old/publiccorpus"
DATA_FOLDER = '../data'

In [4]:
def pull_data():
    response = requests.get(APACHE_SPAM_ASSASSIN)
    response.raise_for_status()

    soup = BeautifulSoup(response.text, 'html.parser')
    file_paths = [link.get('href') for link in soup.find_all('a')]
    file_paths = [path for path in file_paths if path.split('.')[-1] == 'bz2']

    available_data = set(os.listdir(DATA_FOLDER))
    file_paths = [path for path in file_paths if path not in available_data and path != 'corpus.parquet']

    if len(file_paths) == 0:
        print('No data to pull')

    for file_path in file_paths:
        print(f'Pulling {file_path}')
        response = requests.get(f"{APACHE_SPAM_ASSASSIN}/{file_path}")
        response.raise_for_status()

        file_object = io.BytesIO(response.content)
        tar = tarfile.open(fileobj=file_object, mode="r:bz2")

        extract_dir = Path(DATA_FOLDER)
        extract_path = extract_dir.joinpath(Path(file_path))

        extract_dir.mkdir(exist_ok=True)
        tar.extractall(extract_path)

        tar.close()

    size = get_directory_size("../data")
    print(f"Data directory size: {size} bytes")


def get_directory_size(directory):
    total = 0
    for dirpath, dirnames, filenames in os.walk(directory):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            if not os.path.islink(fp):
                total += os.path.getsize(fp)

    return total

In [10]:
pull_data()

No data to pull
Data directory size: 121604826 bytes


In [5]:
def parse_data():
    successful_files = []
    failed_files = []
    corpus_df = pd.DataFrame(columns=['date', 'difficulty', 'class', 'collection', 'body'])
    for dirpath, dirnames, filenames in os.walk(DATA_FOLDER):
        print(dirpath)
        for filename in filenames:
            corpus_path = os.path.join(dirpath, filename)
            corpus_df, successful_files, failed_files = incorporate(df, corpus_path, dirpath, successful_files,
                                                                    failed_files)
    return corpus_df, successful_files, failed_files


def incorporate(df, corpus_path, dirpath, successful_files, failed_files):
    rawdata = open(corpus_path, 'rb').read()
    result = chardet.detect(rawdata)
    encoding = result['encoding']

    with open(corpus_path, 'r', encoding=encoding) as f:
        [(date, *ids)] = re.findall(r'([\d]{8})_([a-z]*)_?([a-z]*)_?([\d]*)\.tar\.bz2.*',
                                    dirpath)
        [difficulty, cls, collection] = parse_ids(ids)
        try:
            body = f.read()
            row = pd.DataFrame({'date': date,
                                'difficulty': difficulty,
                                'class': cls,
                                'collection': collection,
                                'body': body}, index=[0, 1, 2, 3, 4])
            df = pd.concat([df, row], ignore_index=True)
            successful_files.append(corpus_path)
        except Exception as E:
            print(E)
            failed_files.append(corpus_path)
    return df, successful_files, failed_files


def parse_ids(ids):
    cls, difficulty, collection = None, None, None
    match ids:
        case [cls, '', '']:
            difficulty = None
            collection = None
        case [difficulty, cls, '']:
            collection = None
        case [difficulty, cls, collection]:
            pass
        case _:
            pass

    return [difficulty, cls, collection]


def save_as_parquet(df):
    spark_df = spark.createDataFrame(df)
    spark_df.write.parquet(f'{DATA_FOLDER}/corpus.parquet')

In [6]:
if 'corpus.parquet' in set(os.listdir(DATA_FOLDER)):
    df = spark.read.parquet(f'{DATA_FOLDER}/corpus.parquet')
    corpus_df = df.toPandas()
else:
    corpus_df, successful_files, failed_files = parse_data()
    save_as_parquet(corpus_df)

                                                                                

In [7]:
corpus_df

Unnamed: 0,date,difficulty,class,collection,body
0,20030228,hard,ham,,Return-Path: <bounce-lghtml-2534368@sprocket.l...
1,20030228,hard,ham,,Return-Path: <bounce-lghtml-2534368@sprocket.l...
2,20030228,hard,ham,,Return-Path: <bounce-lghtml-2534368@sprocket.l...
3,20030228,hard,ham,,Return-Path: <bounce-lghtml-2534368@sprocket.l...
4,20030228,hard,ham,,Return-Path: <bounce-lghtml-2534368@sprocket.l...
...,...,...,...,...,...
53650,20030228,easy,ham,,From exmh-workers-admin@redhat.com Tue Aug 27...
53651,20030228,easy,ham,,From exmh-workers-admin@redhat.com Tue Aug 27...
53652,20030228,easy,ham,,From exmh-workers-admin@redhat.com Tue Aug 27...
53653,20030228,easy,ham,,From exmh-workers-admin@redhat.com Tue Aug 27...


# Preprocessing

In [66]:
def extract_headers(row):
    try:
        [headers, body] = row['body'].split('\n\n', maxsplit=1)
        header_rows = headers.split('\n')
        current_key = None
        for h in header_rows:
            row, current_key = parse_header(row, h, current_key)
        row['body'] = body
    except Exception as e:
        row['header_error'] = e
        row['has_headers'] = False
    return row


def parse_header(row, h, current_key):
    match re.split(r"([\w\-]+)\:\s*", h):
        case ['', key, value]:
            row[key.lower()] = value
            current_key = key.lower()
        case [value]:
            if current_key:
                row[current_key] += value
        case _:
            pass
    return row, current_key

In [207]:
preproc_df = corpus_df.drop_duplicates(subset='body')
preproc_df.loc[:, 'has_headers'] = True
preproc_df = preproc_df.apply(extract_headers, axis=1)

In [235]:
reduced_preproc_df = preproc_df.dropna(thresh=1000, axis=1)

In [236]:
def extract_email_address(row, cols):
    for col in cols:
        if str(row[col]) and type(row[col]) != list:
            row[col] = re.findall(r"([^\s\<]+\@[^\s\>]+)", str(row[col])) or None
    return row

In [238]:
envelope_cols = ['delivered-to', 'errors-to', 'from', 'in-reply-to', 'list-id', 'message-id', 'received', 'references',
                 'reply-to', 'return-path', 'sender', 'to', 'x-beenthere']
extracted_emails_df = reduced_preproc_df.apply(lambda r: extract_email_address(r, envelope_cols), axis=1)

In [239]:
matching_cols = [
    ['delivered-to', 'to'],
    ['errors-to', 'from'],
    ['errors-to', 'return-path'],
    ['from', 'reply-to'],
    ['from', 'return-path'],
    ['from', 'sender'],
    ['x-beenthere', 'list-id']
]

def envelope_cols_match(row):
    for [col1, col2] in matching_cols:
        values_exist = row[col1] and row[col2]
        if not values_exist:
            return row
        row[f'feat-match-{col1}-{col2}'] = 1 if set(row[col1]) == set(row[col2]) else 0
    return row

In [240]:
matched_envelopes_df = extracted_emails_df.apply(envelope_cols_match, axis=1)

In [241]:
feat_cols = [col for col in matched_envelopes_df.columns if re.match(r"^feat\-.*$", col) is not None]
matched_envelopes_df[feat_cols].describe()

Unnamed: 0,feat-match-delivered-to-to,feat-match-errors-to-from,feat-match-errors-to-return-path,feat-match-from-reply-to,feat-match-from-return-path,feat-match-from-sender
count,7760.0,4569.0,4568.0,1662.0,1662.0,1654.0
mean,0.147938,0.003502,0.992557,0.29302,0.010229,0.00786
std,0.355062,0.059079,0.085961,0.455285,0.100648,0.088333
min,0.0,0.0,0.0,0.0,0.0,0.0
25%,0.0,0.0,1.0,0.0,0.0,0.0
50%,0.0,0.0,1.0,0.0,0.0,0.0
75%,0.0,0.0,1.0,1.0,0.0,0.0
max,1.0,1.0,1.0,1.0,1.0,1.0


In [259]:
message_ids = {}

def duplicate_message_id(row):
    if not row['message-id']:
        matched_envelopes_df.loc[row.name, 'feat-duplicate-message-id'] = 0
        return row

    for message_id in row['message-id']:
        if message_id not in message_ids:
            message_ids[message_id] = row.name
            matched_envelopes_df.loc[row.name, 'feat-duplicate-message-id'] = 0
        else:
            matched_envelopes_df.loc[message_ids[message_id], 'feat-duplicate-message-id'] = 1
    return row

matched_envelopes_df.apply(duplicate_message_id, axis=1)
duplicate_messages_df = matched_envelopes_df

count    6109.000000
mean        0.478147
std         0.499563
min         0.000000
25%         0.000000
50%         0.000000
75%         1.000000
max         1.000000
Name: feat-duplicate-message-id, dtype: float64

In [None]:
embedding_cols = ['cc', 'content-type', 'subject', 'x-mailer']
categorical_cols = ['content-transfer-encoding', 'mime-version', 'precedence', 'x-mailman-version', 'x-msmail-priority',
                    'ax-priority', 'x-spam-level', 'x-spam-status']

0                                                      NaN
1295     No, hits=-32.7 required=5.0\ttests=ALL_CAP_POR...
1300     Yes, hits=5.2 required=5.0 tests=SUBJ_HAS_SPAC...
1330     Yes, hits=11.5 required=5.0 tests=NO_REAL_NAME...
1350     No, hits=-9.1 required=5.0\ttests=AWL,FORGED_R...
                               ...                        
50542    No, hits=-5.0 required=5.0\ttests=AWL,EMAIL_AT...
50552    No, hits=-3.1 required=5.0\ttests=AWL,X_LOOP,X...
50557    No, hits=-10.2 required=7.0\ttests=AWL,EMAIL_A...
50572    No, hits=-645.4 required=5.0\ttests=AWL\tversi...
50577    No, hits=-14.5 required=5.0\ttests=AWL,DEAR_SO...
Name: x-spam-status, Length: 1786, dtype: object