In [0]:
#check that emails.csv file successfully uploaded into DBFS
dbutils.fs.ls("/FileStore/tables/emails.csv")

[FileInfo(path='dbfs:/FileStore/tables/emails.csv', name='emails.csv', size=1426122219, modificationTime=1739876309000)]

In [0]:
#inspect start of emails.csv file 
dbutils.fs.head("FileStore/tables/emails.csv")

[Truncated to first 65536 bytes]


'"file","message"\n"allen-p/_sent_mail/1.","Message-ID: <18782981.1075855378110.JavaMail.evans@thyme>\nDate: Mon, 14 May 2001 16:39:00 -0700 (PDT)\nFrom: phillip.allen@enron.com\nTo: tim.belden@enron.com\nSubject: \nMime-Version: 1.0\nContent-Type: text/plain; charset=us-ascii\nContent-Transfer-Encoding: 7bit\nX-From: Phillip K Allen\nX-To: Tim Belden <Tim Belden/Enron@EnronXGate>\nX-cc: \nX-bcc: \nX-Folder: \\Phillip_Allen_Jan2002_1\\Allen, Phillip K.\\\'Sent Mail\nX-Origin: Allen-P\nX-FileName: pallen (Non-Privileged).pst\n\nHere is our forecast\n\n "\n"allen-p/_sent_mail/10.","Message-ID: <15464986.1075855378456.JavaMail.evans@thyme>\nDate: Fri, 4 May 2001 13:51:00 -0700 (PDT)\nFrom: phillip.allen@enron.com\nTo: john.lavorato@enron.com\nSubject: Re:\nMime-Version: 1.0\nContent-Type: text/plain; charset=us-ascii\nContent-Transfer-Encoding: 7bit\nX-From: Phillip K Allen\nX-To: John J Lavorato <John J Lavorato/ENRON@enronXgate@ENRON>\nX-cc: \nX-bcc: \nX-Folder: \\Phillip_Allen_Jan2002_

In [0]:
#split lines in emails.csv file to make it easier to see structure
for line in dbutils.fs.head("/FileStore/tables/emails.csv").splitlines():
    print(line)

[Truncated to first 65536 bytes]
"file","message"
"allen-p/_sent_mail/1.","Message-ID: <18782981.1075855378110.JavaMail.evans@thyme>
Date: Mon, 14 May 2001 16:39:00 -0700 (PDT)
From: phillip.allen@enron.com
To: tim.belden@enron.com
Subject: 
Mime-Version: 1.0
Content-Type: text/plain; charset=us-ascii
Content-Transfer-Encoding: 7bit
X-From: Phillip K Allen
X-To: Tim Belden <Tim Belden/Enron@EnronXGate>
X-cc: 
X-bcc: 
X-Folder: \Phillip_Allen_Jan2002_1\Allen, Phillip K.\'Sent Mail
X-Origin: Allen-P
X-FileName: pallen (Non-Privileged).pst

Here is our forecast

 "
"allen-p/_sent_mail/10.","Message-ID: <15464986.1075855378456.JavaMail.evans@thyme>
Date: Fri, 4 May 2001 13:51:00 -0700 (PDT)
From: phillip.allen@enron.com
To: john.lavorato@enron.com
Subject: Re:
Mime-Version: 1.0
Content-Type: text/plain; charset=us-ascii
Content-Transfer-Encoding: 7bit
X-From: Phillip K Allen
X-To: John J Lavorato <John J Lavorato/ENRON@enronXgate@ENRON>
X-cc: 
X-bcc: 
X-Folder: \Phillip_Allen_Jan2002_1\All

In [0]:
#read emails.csv file into dataframe (df) without splitting lines in email message onto multiple rows of dataframe 
df = spark.read.csv(
"/FileStore/tables/emails.csv",
header=True, # Use the first row as the header
inferSchema=True, # Infer data types
quote='"', # Define the quote character
escape='"', # Escape quotes inside quoted fields
multiLine=True # Enable multiline support
)

In [0]:
#inspect top 5 rows of dataframe 
df.show(5)

+--------------------+--------------------+
|                file|             message|
+--------------------+--------------------+
|allen-p/_sent_mai...|Message-ID: <1878...|
|allen-p/_sent_mai...|Message-ID: <1546...|
|allen-p/_sent_mai...|Message-ID: <2421...|
|allen-p/_sent_mai...|Message-ID: <1350...|
|allen-p/_sent_mai...|Message-ID: <3092...|
+--------------------+--------------------+
only showing top 5 rows



In [0]:
#inspect top 2 rows of dataframe without truncation 
df.show(2, truncate = False)

+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
#inspect automatically-generated schema of df 
df.printSchema()

root
 |-- file: string (nullable = true)
 |-- message: string (nullable = true)



In [0]:
#count how many rows in dataframe (that is, how many instances or email messages)
df.count()

517401

In [0]:
#transform message column of dataframe into RDD
rdd = df.select("message").rdd

In [0]:
#inspect first 2 elements of RDD
rdd.take(1)

[Row(message="Message-ID: <18782981.1075855378110.JavaMail.evans@thyme>\nDate: Mon, 14 May 2001 16:39:00 -0700 (PDT)\nFrom: phillip.allen@enron.com\nTo: tim.belden@enron.com\nSubject: \nMime-Version: 1.0\nContent-Type: text/plain; charset=us-ascii\nContent-Transfer-Encoding: 7bit\nX-From: Phillip K Allen\nX-To: Tim Belden <Tim Belden/Enron@EnronXGate>\nX-cc: \nX-bcc: \nX-Folder: \\Phillip_Allen_Jan2002_1\\Allen, Phillip K.\\'Sent Mail\nX-Origin: Allen-P\nX-FileName: pallen (Non-Privileged).pst\n\nHere is our forecast\n\n ")]

In [0]:
#check number of elements in RDD is same as number of rows in dataframe 
rdd.count()

517401

In [0]:
"""
This splits the message column of the df dataframe by newline characters (\n) but limits the splitting to 15 parts. This avoids splitting the body of the emails, which contain many newline characters. The resulting list is then converted into an RDD, which is then flatMapped so each element in x (the list created by splitting the message) will be returned as a separate element in the RDD called split_rdd.
"""
import pyspark.sql.functions as F

split_rdd = df.select(F.split(df.message,"\n", 15)).rdd.flatMap(lambda x: x)

In [0]:
# Check the code in the cell above worked by inspecting the first 2 elements of split_rdd
split_rdd.take(2)

[['Message-ID: <18782981.1075855378110.JavaMail.evans@thyme>',
  'Date: Mon, 14 May 2001 16:39:00 -0700 (PDT)',
  'From: phillip.allen@enron.com',
  'To: tim.belden@enron.com',
  'Subject: ',
  'Mime-Version: 1.0',
  'Content-Type: text/plain; charset=us-ascii',
  'Content-Transfer-Encoding: 7bit',
  'X-From: Phillip K Allen',
  'X-To: Tim Belden <Tim Belden/Enron@EnronXGate>',
  'X-cc: ',
  'X-bcc: ',
  "X-Folder: \\Phillip_Allen_Jan2002_1\\Allen, Phillip K.\\'Sent Mail",
  'X-Origin: Allen-P',
  'X-FileName: pallen (Non-Privileged).pst\n\nHere is our forecast\n\n '],
 ['Message-ID: <15464986.1075855378456.JavaMail.evans@thyme>',
  'Date: Fri, 4 May 2001 13:51:00 -0700 (PDT)',
  'From: phillip.allen@enron.com',
  'To: john.lavorato@enron.com',
  'Subject: Re:',
  'Mime-Version: 1.0',
  'Content-Type: text/plain; charset=us-ascii',
  'Content-Transfer-Encoding: 7bit',
  'X-From: Phillip K Allen',
  'X-To: John J Lavorato <John J Lavorato/ENRON@enronXgate@ENRON>',
  'X-cc: ',
  'X-bcc: 

In [0]:
def splitter(r: list[str]) -> list[str]:
    """
    define a function that takes a list of strings r and splits each string by the first occurrence of the colon (:) if there is a colon. Where there is a colon, it takes the second part (s.split(":")[1]), trims any surrounding whitespace with strip(), and appends the cleaned data to a list called new_row. Where there isn't a colon, it appends the string 'None' to the list called new_row. 
    """
    new_row = []
    for string in r:
        if ":" in string:
            cleaned_data = string.split(":", 1)[1].strip()
            new_row.append(cleaned_data)
        else:
            new_row.append("None")
    return new_row

In [0]:
# The map transformation applies the splitter function defined above to each element of split_rdd. The splitter function processes each list of strings (that is, each element of split_rdd) and returns a new list of cleaned strings. This results in an RDD where each element is a list of strings, and the strings are dates, email addresses, subject lines etc (corresponding to fields like Date, From, Subject, etc.) or "None" if a colon was missing 
mapped_rdd = split_rdd.map(splitter)

# this function checks how many times the string "None" appears in the mapped_rdd. The count comes to 0, indicating that every item in the RDD had a colon when the function in the cell above was applied 
def none_filter(s: str) -> bool:
    return s=="None"
#mapped_rdd.filter(none_filter).count()

In [0]:
# Check that the code in the cell above worked by inspecting the first 2 elements of mapped_rdd
mapped_rdd.take(2)

[['<18782981.1075855378110.JavaMail.evans@thyme>',
  'Mon, 14 May 2001 16:39:00 -0700 (PDT)',
  'phillip.allen@enron.com',
  'tim.belden@enron.com',
  '',
  '1.0',
  'text/plain; charset=us-ascii',
  '7bit',
  'Phillip K Allen',
  'Tim Belden <Tim Belden/Enron@EnronXGate>',
  '',
  '',
  "\\Phillip_Allen_Jan2002_1\\Allen, Phillip K.\\'Sent Mail",
  'Allen-P',
  'pallen (Non-Privileged).pst\n\nHere is our forecast'],
 ['<15464986.1075855378456.JavaMail.evans@thyme>',
  'Fri, 4 May 2001 13:51:00 -0700 (PDT)',
  'phillip.allen@enron.com',
  'john.lavorato@enron.com',
  'Re:',
  '1.0',
  'text/plain; charset=us-ascii',
  '7bit',
  'Phillip K Allen',
  'John J Lavorato <John J Lavorato/ENRON@enronXgate@ENRON>',
  '',
  '',
  "\\Phillip_Allen_Jan2002_1\\Allen, Phillip K.\\'Sent Mail",
  'Allen-P',
  "pallen (Non-Privileged).pst\n\nTraveling to have a business meeting takes the fun out of the trip.  Especially if you have to prepare a presentation.  I would suggest holding the business plan m

In [0]:
# After applying the splitter function above, we convert the resulting RDD (mapped_rdd) back into a DataFrame (mapped_df). The defined list of column names (['Message-ID', 'Date', ...]) is passed to the .toDF() method to create a DataFrame with the corresponding schema.
mapped_df = mapped_rdd.toDF(['Message-ID', 'Date', 'From', 'To', 'Subject', 'Mime-Version', 'Content-Type', 'Content-Transfer-Encoding', 'X-From', 'X-To', 'X-cc', 'X-bcc', 'X-Folder', 'X-Origin', 'X-FileName and body'])

In [0]:
# Select first 5 columns of mapped_df only as the remaining columns are not relevant to the assignment. Check that the code in the cell above worked by inspecting the first 5 rows of the dataframe
relevant_mapped_df = mapped_df.select(mapped_df.columns[:5])
relevant_mapped_df.show(n=5, truncate=False)

+---------------------------------------------+-------------------------------------+-----------------------+-----------------------+---------+
|Message-ID                                   |Date                                 |From                   |To                     |Subject  |
+---------------------------------------------+-------------------------------------+-----------------------+-----------------------+---------+
|<18782981.1075855378110.JavaMail.evans@thyme>|Mon, 14 May 2001 16:39:00 -0700 (PDT)|phillip.allen@enron.com|tim.belden@enron.com   |         |
|<15464986.1075855378456.JavaMail.evans@thyme>|Fri, 4 May 2001 13:51:00 -0700 (PDT) |phillip.allen@enron.com|john.lavorato@enron.com|Re:      |
|<24216240.1075855687451.JavaMail.evans@thyme>|Wed, 18 Oct 2000 03:00:00 -0700 (PDT)|phillip.allen@enron.com|leah.arsdall@enron.com |Re: test |
|<13505866.1075863688222.JavaMail.evans@thyme>|Mon, 23 Oct 2000 06:13:00 -0700 (PDT)|phillip.allen@enron.com|randall.gay@enron.com  |   

In [0]:
#check number of rows in relevant_mapped_df is still the same as before 
relevant_mapped_df.count()

517401

In [0]:
# Checking datatype of contents of columns  
print(relevant_mapped_df.dtypes)

[('Message-ID', 'string'), ('Date', 'string'), ('From', 'string'), ('To', 'string'), ('Subject', 'string')]


In [0]:
# We need to convert the contents of the 'Date' column from 'string' datatype to 'date' datatype. To do this, we first remove the weekday part from the start of each string (eg 'Fri'), then convert the remaining string to a timestamp format. Then we pass the timestamps to the date_format() function to convert from timestamp datatype to date datatype

relevant_mapped_df = relevant_mapped_df.withColumn("cleaned_date", 
    F.regexp_replace(F.col("Date"), r"^[a-zA-Z]{3},\s*", ""))

relevant_mapped_df = relevant_mapped_df.withColumn("timestamp", 
    F.to_timestamp(F.col("cleaned_date"), "d MMM yyyy HH:mm:ss Z (zzz)")  # "d" allows single and double digits for day
)

relevant_mapped_df = relevant_mapped_df.withColumn("formatted_date", 
    F.date_format(F.col("timestamp"), "yyyy-MM-dd")
)

relevant_mapped_df = relevant_mapped_df.withColumn("formatted_date", F.to_date(F.col("formatted_date")))
print(relevant_mapped_df.dtypes)


[('Message-ID', 'string'), ('Date', 'string'), ('From', 'string'), ('To', 'string'), ('Subject', 'string'), ('cleaned_date', 'string'), ('timestamp', 'timestamp'), ('formatted_date', 'date')]


In [0]:
# create new dataframe with the formatted_date column but without the older 'date', 'cleane_date' and 'timestamp' columns. Double-check the contents of the 'formatted_date' column are in date datatype, not string datatype 

five_column_df = relevant_mapped_df.drop("Date", "cleaned_date", "timestamp")
print(five_column_df.dtypes)
five_column_df.show(n=5, truncate=False)

[('Message-ID', 'string'), ('From', 'string'), ('To', 'string'), ('Subject', 'string'), ('formatted_date', 'date')]
+---------------------------------------------+-----------------------+-----------------------+---------+--------------+
|Message-ID                                   |From                   |To                     |Subject  |formatted_date|
+---------------------------------------------+-----------------------+-----------------------+---------+--------------+
|<18782981.1075855378110.JavaMail.evans@thyme>|phillip.allen@enron.com|tim.belden@enron.com   |         |2001-05-14    |
|<15464986.1075855378456.JavaMail.evans@thyme>|phillip.allen@enron.com|john.lavorato@enron.com|Re:      |2001-05-04    |
|<24216240.1075855687451.JavaMail.evans@thyme>|phillip.allen@enron.com|leah.arsdall@enron.com |Re: test |2000-10-18    |
|<13505866.1075863688222.JavaMail.evans@thyme>|phillip.allen@enron.com|randall.gay@enron.com  |         |2000-10-23    |
|<30922949.1075863688243.JavaMail.eva

In [0]:
#check number of entries in five_column_df is same as original df 
five_column_df.count()

In [0]:
# check how many entries in the 'To' column don't contain an '@' sign 

invalid_email_in_To_df = five_column_df.filter(
    ~F.col('To').contains('@')
)

invalid_email_in_To_df.count()

22354

In [0]:
# check how many entries in the 'From' column don't contain an '@' sign 

invalid_email_in_From_df = five_column_df.filter(
    ~F.col('From').contains('@')
)

invalid_email_in_From_df.count()

1

In [0]:
#The code in the 2 cells above show that 22,354 rows don't cotain an '@' sign in the 'To' column and 1 row doesn't contain an '@' sign in the 'From' column. This equates to 4.32% and 0.0001% of the dataset, respectively. We consider these instances rare enough to not have a big impact on the assignment 