In [41]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import re
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
import nltk




In [42]:
# Initialize Spark Session
spark = SparkSession.builder.appName("MyGlueNotebook").getOrCreate()




In [43]:
# Read data from S3
df = spark.read.csv("s3://the-enron-email-dataset/combined_data.csv", header=True, inferSchema=True)




In [44]:
df.show()

+--------------------+--------------------+
|               label|                text|
+--------------------+--------------------+
|                   1|ounce feather bow...|
|                   1|wulvob get your m...|
|                   0| computer connect...|
|                   1|university degree...|
|                   0|thanks for all yo...|
|                   0|larry king live a...|
|                   0|michael pobega wr...|
|                   0|hi i have this er...|
|                   1|works gateway wor...|
|                   1|upon this account...|
|                   1|my dear fellow do...|
|                   1|dear valued membe...|
|                   1|oem software mean...|
|                   1|hello , welcome t...|
|- one of the lead...|                null|
|       coruscation v|                null|
|          melinite g|                null|
|    a collaborator l|                null|
|          flaming ll|                null|
|        l prestige a|          

In [45]:
pandas_df = df.toPandas()
print(pandas_df.head())

  label                                               text
0     1  ounce feather bowl hummingbird opec moment ala...
1     1  wulvob get your medircations online qnb ikud v...
2     0   computer connection from cnn com wednesday es...
3     1  university degree obtain a prosperous future m...
4     0  thanks for all your answers guys i know i shou...


In [46]:
# Define the preprocessing function
def preprocess_text(text):
    if text is None:
        return ""  # Return empty string if text is None
    
    stemmer = PorterStemmer()

    text = text.lower()  # Convert to lowercase
    text = re.sub(r'\d+', '', text)  # Remove numbers
    text = re.sub(r'\s+', ' ', text)  # Replace multiple spaces with a single space
    text = re.sub(r'[^\w\s]', '', text)  # Remove punctuation
    
    # Stem the words (without stopwords removal)
    words = text.split()
    filtered_words = [stemmer.stem(word) for word in words]
    return ' '.join(filtered_words)





In [47]:
# Register UDF
preprocess_udf = udf(preprocess_text, StringType())




In [48]:
# Apply preprocessing function to the 'text' column
df_processed = df.withColumn("processed_text", preprocess_udf(df["text"]))




In [49]:
# Show results
df_processed.show()

+--------------------+--------------------+--------------------+
|               label|                text|      processed_text|
+--------------------+--------------------+--------------------+
|                   1|ounce feather bow...|ounc feather bowl...|
|                   1|wulvob get your m...|wulvob get your m...|
|                   0| computer connect...|comput connect fr...|
|                   1|university degree...|univers degre obt...|
|                   0|thanks for all yo...|thank for all you...|
|                   0|larry king live a...|larri king live a...|
|                   0|michael pobega wr...|michael pobega wr...|
|                   0|hi i have this er...|hi i have thi err...|
|                   1|works gateway wor...|work gateway worl...|
|                   1|upon this account...|upon thi account ...|
|                   1|my dear fellow do...|my dear fellow do...|
|                   1|dear valued membe...|dear valu member ...|
|                   1|oem

In [50]:
# Save processed data back to S3 as CSV
df_processed.write.mode('overwrite').csv("s3://the-enron-email-dataset/output-data/", header=True)




In [51]:
# Convert to Pandas DataFrame
pandas_df = df_processed.toPandas()

# Show results as Pandas DataFrame
print(pandas_df.head())

  label  ...                                     processed_text
0     1  ...  ounc feather bowl hummingbird opec moment alab...
1     1  ...  wulvob get your medirc onlin qnb ikud viagra e...
2     0  ...  comput connect from cnn com wednesday escapenu...
3     1  ...  univers degre obtain a prosper futur money ear...
4     0  ...  thank for all your answer guy i know i should ...

[5 rows x 3 columns]
