In [21]:
import pandas as pd
import re
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from scipy.sparse import hstack, csr_matrix
from sklearn.base import BaseEstimator, TransformerMixin
import joblib

In [22]:
# Load CEAS_08 dataset CSV
df = pd.read_csv('C:/Users/D.Sathiya Pandi/Downloads/CEAS_08.csv')
print(df.head())

                                              sender  \
0                   Young Esposito <Young@iworld.de>   
1                       Mok <ipline's1983@icable.ph>   
2  Daily Top 10 <Karmandeep-opengevl@universalnet...   
3                 Michael Parker <ivqrnai@pobox.com>   
4  Gretchen Suggs <externalsep1@loanofficertool.com>   

                                         receiver  \
0                     user4@gvc.ceas-challenge.cc   
1                   user2.2@gvc.ceas-challenge.cc   
2                   user2.9@gvc.ceas-challenge.cc   
3  SpamAssassin Dev <xrh@spamassassin.apache.org>   
4                   user2.2@gvc.ceas-challenge.cc   

                              date  \
0  Tue, 05 Aug 2008 16:31:02 -0700   
1  Tue, 05 Aug 2008 18:31:03 -0500   
2  Tue, 05 Aug 2008 20:28:00 -1200   
3  Tue, 05 Aug 2008 17:31:20 -0600   
4  Tue, 05 Aug 2008 19:31:21 -0400   

                                             subject  \
0                          Never agree to be a loser   
1  

In [23]:
# Drop duplicates & missing values in key columns (body and label)
df = df.drop_duplicates()
df = df.dropna(subset=['body', 'label'])

In [None]:
# Text cleaning function
def clean_text(text):
    if isinstance(text, str):
        return text.lower().strip()
    return ''

df['body_clean'] = df['body'].apply(clean_text)

# Label encoding
label_mapping = {'0': 0, 'legitimate': 0, 'ham': 0,
                 '1': 1, 'spam': 1,
                 '2': 2, 'phishing': 2}
df['label'] = df['label'].astype(str).str.strip().str.lower().map(label_mapping)
df = df.dropna(subset=['label'])
df['label'] = df['label'].astype(int)

# Custom feature extractor class
class AdditionalFeaturesExtractor(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        spam_keywords = ['congratulations', 'winner', 'prize', 'urgent', 'bank', 'claim', 'lottery']
        suspicious_domains = ['.ru', 'xn--', '.xyz', '.top', '.biz', '.info']
        features = []

        for text in X:
            text_lower = text.lower()
            spam_word_count = sum(text_lower.count(word) for word in spam_keywords)
            url_count = len(re.findall(r"http[s]?://", text_lower))
            suspicious_domain_flag = int(any(domain in text_lower for domain in suspicious_domains))
            features.append([spam_word_count, url_count, suspicious_domain_flag])

        return csr_matrix(features)

# TF-IDF vectorization
tfidf = TfidfVectorizer(stop_words='english', max_features=10000, ngram_range=(1, 2), max_df=0.9, min_df=5)
X_tfidf = tfidf.fit_transform(df['body_clean'])

# Additional features extraction
add_features_extractor = AdditionalFeaturesExtractor()
X_add_features = add_features_extractor.transform(df['body_clean'])

# Combine features
X_combined = hstack([X_tfidf, X_add_features])

In [None]:
# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X_combined, df['label'], test_size=0.2, random_state=42, stratify=df['label'])

# Train model
model = MultinomialNB()
model.fit(X_train, y_train)

# Save model and vectorizers
joblib.dump(model, 'model_ceas08_enhanced.pkl')
joblib.dump(tfidf, 'vectorizer_ceas08.pkl')
joblib.dump(add_features_extractor, 'additional_features_extractor.pkl')


In [None]:
# Evaluate
y_pred = model.predict(X_test)
print(f"Accuracy : {accuracy_score(y_test, y_pred):.4f}")
print(f"Precision: {precision_score(y_test, y_pred, average='weighted'):.4f}")
print(f"Recall   : {recall_score(y_test, y_pred, average='weighted'):.4f}")
print(f"F1 Score : {f1_score(y_test, y_pred, average='weighted'):.4f}")

In [17]:
%%writefile email.py
import streamlit as st
import joblib

# Load CEAS_08-trained model & vectorizer
model = joblib.load('model_ceas08.pkl')
vectorizer = joblib.load('vectorizer_ceas08.pkl')

label_map = {0: "Legitimate", 1: "Spam", 2: "Phishing"}

st.title("📧 CEAS_08 Email Spam & Phishing Detector")

email_text = st.text_area("Paste your email content here:")

if st.button("Predict"):
    if not email_text.strip():
        st.warning("Please enter email content to predict.")
    else:
        # Preprocess & vectorize input
        X_input = vectorizer.transform([email_text.lower().strip()])

        # Predict with the loaded model
        pred = model.predict(X_input)[0]

        # Display result
        st.success(f"Prediction: {label_map[pred]}")


Writing email.py
