<a href="https://colab.research.google.com/github/sanjaysathish2000/Cloud-Assignment/blob/main/Spam_vs_Ham.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [13]:
pip install pyspark



In [14]:
from pyspark.sql import SparkSession

# Step 1: Create a Spark session
spark = SparkSession.builder.appName("Spam vs Ham").getOrCreate()

# Step 2: Read a dataset
file_path = "/content/spam_email_dataset.csv"

# Read the CSV file into a DataFrame
# You can adjust options based on your CSV file's format
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Step 3: Perform operations on the DataFrame
df.printSchema()
df.show(5)



root
 |-- Email: string (nullable = true)
 |-- Subject: string (nullable = true)
 |-- Sender: string (nullable = true)
 |-- Recipient: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- Attachments: integer (nullable = true)
 |-- Link Count: integer (nullable = true)
 |-- Word Count: integer (nullable = true)
 |-- Uppercase Count: integer (nullable = true)
 |-- Exclamation Count: integer (nullable = true)
 |-- Question Count: integer (nullable = true)
 |-- Dollar Count: integer (nullable = true)
 |-- Punctuation Count: integer (nullable = true)
 |-- HTML Tags Count: integer (nullable = true)
 |-- Spam Indicator: integer (nullable = true)

+--------------------+--------------------+--------------------+--------------------+----------+-------------------+-----------+----------+----------+---------------+-----------------+--------------+------------+-----------------+---------------+--------------+
|               Email|             Su

In [15]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import col, lower, when
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, explode
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, DoubleType
from math import log
from collections import Counter

# Select relevant columns
selected_columns = ["Sender", "Subject", "`Spam Indicator`"]
df = df.select(*selected_columns)

df.show(5)

# Check for null values in "Subject," "Sender," and "Spam Indicator" columns
null_check_columns = ["Subject", "Sender", "`Spam Indicator`"]
for column in null_check_columns:
    null_count = df.filter(col(column).isNull()).count()
    print(f"Number of null values in {column}: {null_count}")

# Convert "Subject" to lowercase
df = df.withColumn("Subject", lower(col("Subject")))

# Handle null values in "Spam Indicator" column (assuming 0 for null)
df = df.withColumn("`Spam Indicator`", when(col("`Spam Indicator`").isNull(), 0).otherwise(col("`Spam Indicator`")))

# Tokenize the "Subject" column
tokenizer = Tokenizer(inputCol="Subject", outputCol="words")
df = tokenizer.transform(df)

# Remove stop words from the tokenized words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
df = remover.transform(df)

# Select only specified columns
selected_columns = ["Subject", "Sender", "`Spam Indicator`", "filtered"]
df = df.select(*selected_columns)

# Show the updated DataFrame
df.show(5)


+--------------------+--------------------+--------------+
|              Sender|             Subject|Spam Indicator|
+--------------------+--------------------+--------------+
|emilyscott@exampl...|Even hotel commun...|             1|
|annwhite@example.net|Try themselves gu...|             1|
| david88@example.net|Environmental com...|             1|
|lindaalvarez@exam...|Smile real TV fat...|             0|
|vstafford@example...|Fast stage he oil...|             1|
+--------------------+--------------------+--------------+
only showing top 5 rows

Number of null values in Subject: 0
Number of null values in Sender: 0
Number of null values in `Spam Indicator`: 0
+--------------------+--------------------+--------------+--------------------+
|             Subject|              Sender|Spam Indicator|            filtered|
+--------------------+--------------------+--------------+--------------------+
|even hotel commun...|emilyscott@exampl...|             1|[even, hotel, com...|
|try the

In [16]:
# Separate values based on "Spam Indicator" (0 for ham, 1 for spam)
ham_df = df.filter("`Spam Indicator` == 0")
spam_df = df.filter("`Spam Indicator` == 1")

# Collect unique words used in ham and spam
ham_words = ham_df.select("filtered").rdd.flatMap(lambda x: x["filtered"]).distinct().collect()
spam_words = spam_df.select("filtered").rdd.flatMap(lambda x: x["filtered"]).distinct().collect()

# Display unique words in ham and spam
print("Unique words in ham emails:")
print(ham_words)

print("\nUnique words in spam emails:")
print(spam_words)

# Show the updated DataFrame
ham_df.show(5)
spam_df.show(5)


Unique words in ham emails:
['smile', 'real', 'tv', 'father', 'commercial', 'day', 'increase.', 'sort', 'prepare', 'skin', 'story', 'glass', 'use.', 'cell', 'pass', 'perform', 'popular.', 'particular', 'suffer', 'possible.', 'control', 'remain', 'morning', 'general', 'discuss.', 'represent', 'speak', 'weight', 'pretty', 'discover', 'room', 'food.', 'policy', 'energy', 'truth', 'issue.', 'method', 'candidate', 'happy.', 'become', 'pick', 'under.', 'town', 'defense', 'young', 'return', 'matter', 'discover.', 'boy', 'away', 'cold', 'may', 'control.', 'five', 'enough', 'exist', 'author', 'community.', 'science', 'western', 'mr', 'often', 'blue.', 'need', 'senior', 'sound', 'citizen.', 'go', 'situation', 'traditional', 'practice', 'east', 'wind.', 'alone', 'public', 'help', 'great.', 'listen', 'cup', 'southern', 'laugh.', 'yet', 'answer', 'goal', 'reality', 'piece', 'long', 'stop.', 'relationship', 'husband', 'maintain', 'nor.', 'sister', 'indeed', 'give', 'writer', 'effort', 'actually.', '

In [17]:
# Collect all the filtered words used in ham emails
ham_words = ham_df.select("filtered").rdd.flatMap(lambda x: x["filtered"]).collect()

# Use Counter to count occurrences of each word in ham emails
word_counts_ham = Counter(ham_words)

# Convert the Counter to a DataFrame
word_counts_ham_df = spark.createDataFrame(word_counts_ham.items(), ["word", "count"])

# Show the top 10 ham words
print("Top 10 Ham Words:")
word_counts_ham_df.orderBy(F.desc("count")).show(10)

Top 10 Ham Words:
+---------+-----+
|     word|count|
+---------+-----+
|    exist|   28|
| whatever|   26|
|    field|   26|
|    black|   26|
|    thing|   25|
|     move|   25|
|education|   25|
|  science|   24|
|  section|   24|
|      lay|   23|
+---------+-----+
only showing top 10 rows



In [18]:
from pyspark.sql import functions as F
from pyspark.sql.functions import explode, col

# Assuming you have a DataFrame named spam_df with columns "Sender", "Subject", and "filtered" containing tokenized words

# Select the top 10 ham words
top_ham_words = word_counts_ham_df.orderBy(F.desc("count")).limit(10).select("word").rdd.flatMap(lambda x: x).collect()

# Explode the array column to separate rows in ham emails
ham_df_exploded = ham_df.select("Sender", "Subject", explode("filtered").alias("word"))

# Filter based on the top ham words
top_ham_senders = (
    ham_df_exploded
    .filter(col("word").isin(top_ham_words))
    .groupBy("Sender", "Subject")
    .count()
    .orderBy(F.desc("count"))
    .limit(10)
)

# Show the top 10 ham senders and subjects
print("Top 10 ham Senders and Subjects:")
top_ham_senders.show(truncate=False)


Top 10 ham Senders and Subjects:
+-----------------------------+-----------------------------------------------------------------+-----+
|Sender                       |Subject                                                          |count|
+-----------------------------+-----------------------------------------------------------------+-----+
|ndelgado@example.com         |choose move force dream method book few.                         |2    |
|meaganlarson@example.com     |us lose southern thing whatever response.                        |2    |
|jessicaallen@example.net     |exist imagine game record science skill mother.                  |2    |
|laurenmcdonald@example.com   |method item rule skin deep hold thing paper.                     |2    |
|mossjoshua@example.com       |enter exist help section than.                                   |2    |
|meganfrye@example.org        |certain step field top field see past.                           |2    |
|fmacdonald@example.net       |

In [19]:
# Collect all the filtered words used in spam emails
spam_words = spam_df.select("filtered").rdd.flatMap(lambda x: x["filtered"]).collect()

# Use Counter to count occurrences of each word in spam emails
word_counts_spam = Counter(spam_words)

# Convert the Counter to a DataFrame
word_counts_spam_df = spark.createDataFrame(word_counts_spam.items(), ["word", "count"])

# Show the top 10 spam words
print("Top 10 Spam Words:")
word_counts_spam_df.orderBy(F.desc("count")).show(10)

Top 10 Spam Words:
+-------+-----+
|   word|count|
+-------+-----+
|western|   30|
|produce|   29|
|control|   27|
|    buy|   25|
|   like|   24|
|    try|   24|
|federal|   24|
| return|   24|
|   girl|   24|
|address|   24|
+-------+-----+
only showing top 10 rows



In [20]:
from pyspark.sql import functions as F
from pyspark.sql.functions import explode, col

# Select the top 10 spam words
top_spam_words = word_counts_spam_df.orderBy(F.desc("count")).limit(10).select("word").rdd.flatMap(lambda x: x).collect()

# Explode the array column to separate rows in spam emails
spam_df_exploded = spam_df.select("Sender", "Subject", explode("filtered").alias("word"))

# Filter based on the top spam words
top_spam_senders = (
    spam_df_exploded
    .filter(col("word").isin(top_spam_words))
    .groupBy("Sender", "Subject")
    .count()
    .orderBy(F.desc("count"))
    .limit(10)
)

# Show the top 10 spam senders and subjects
print("Top 10 Spam Senders and Subjects:")
top_spam_senders.show(truncate=False)


Top 10 Spam Senders and Subjects:
+--------------------------+-------------------------------------------------------+-----+
|Sender                    |Subject                                                |count|
+--------------------------+-------------------------------------------------------+-----+
|hollandkatelyn@example.net|control control receive per.                           |2    |
|bryanconner@example.com   |wonder wonder appear pull.                             |2    |
|jimmy09@example.com       |board behind buy accept operation federal enjoy.       |2    |
|james78@example.org       |each girl yet address pick hotel.                      |2    |
|hooperdakota@example.org  |federal produce peace explain you certainly everything.|2    |
|lisa57@example.org        |produce kitchen quickly mother learn return face.      |2    |
|veronicaburke@example.org |address similar produce street oil.                    |2    |
|david33@example.net       |buy job why western interest

In [21]:
# Select relevant columns from top_ham_senders
df = top_ham_senders.select("Sender", "Subject")

# Tokenize the "Subject" column
tokenizer_udf = udf(lambda text: text.split(), ArrayType(StringType()))
df = df.withColumn("words", tokenizer_udf(df["Subject"]))

# Calculate Term Frequencies (TF)
calculate_tf_udf = udf(lambda word_list: {word: word_list.count(word) / len(word_list) for word in set(word_list)}, StringType())
df = df.withColumn("tf", calculate_tf_udf(df["words"]))

# Extract unique words
unique_words = list(set(df.selectExpr("explode(words) as word").select("word").distinct().rdd.flatMap(lambda x: x).collect()))

# Calculate Inverse Document Frequencies (IDF)
total_documents = df.count()
document_frequency = df.select("Sender", "words").rdd.flatMap(lambda x: [(word, 1) for word in set(x[1])]).reduceByKey(lambda x, y: x + y)
idf_values = document_frequency.map(lambda x: (x[0], log(total_documents / x[1])))

# Broadcast IDF values
idf_broadcast = spark.sparkContext.broadcast(dict(idf_values.collect()))

# Calculate TF-IDF for each document
def calculate_tfidf(row):
    user_name, words = row
    tfidf_values = {word: words.count(word) * idf_broadcast.value.get(word, 0.0) for word in words}
    return user_name, words, tfidf_values

tfidf_data = df.select("Sender", "words").rdd.map(calculate_tfidf)

# Display the result
tfidf_df = spark.createDataFrame(tfidf_data, ["Sender", "words", "tfidf"])
tfidf_df.show(10, truncate = False)


+-----------------------------+--------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Sender                       |words                                                                     |tfidf                                                                                                                                                                                                                                               |
+-----------------------------+--------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [22]:
# Select relevant columns from top_spam_senders
df = top_spam_senders.select("Sender", "Subject")

# Tokenize the "Subject" column
tokenizer_udf = udf(lambda text: text.split(), ArrayType(StringType()))
df = df.withColumn("words", tokenizer_udf(df["Subject"]))

# Calculate Term Frequencies (TF)
calculate_tf_udf = udf(lambda word_list: {word: word_list.count(word) / len(word_list) for word in set(word_list)}, StringType())
df = df.withColumn("tf", calculate_tf_udf(df["words"]))

# Extract unique words
unique_words = list(set(df.selectExpr("explode(words) as word").select("word").distinct().rdd.flatMap(lambda x: x).collect()))

# Calculate Inverse Document Frequencies (IDF)
total_documents = df.count()
document_frequency = df.select("Sender", "words").rdd.flatMap(lambda x: [(word, 1) for word in set(x[1])]).reduceByKey(lambda x, y: x + y)
idf_values = document_frequency.map(lambda x: (x[0], log(total_documents / x[1])))

# Broadcast IDF values
idf_broadcast = spark.sparkContext.broadcast(dict(idf_values.collect()))

# Calculate TF-IDF for each document
def calculate_tfidf(row):
    user_name, words = row
    tfidf_values = {word: words.count(word) * idf_broadcast.value.get(word, 0.0) for word in words}
    return user_name, words, tfidf_values

tfidf_data = df.select("Sender", "words").rdd.map(calculate_tfidf)

# Display the result
tfidf_df = spark.createDataFrame(tfidf_data, ["Sender", "words", "tfidf"])
tfidf_df.show(10, truncate = False)



+--------------------------+---------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Sender                    |words                                                          |tfidf                                                                                                                                                                                                               |
+--------------------------+---------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|hollandkatelyn@example.net|[control, control, receive, per.]                     