### Context Aware Text Analysis from Emails

In [1]:
# Load libraries
import email
from email.policy import default
from email.parser import Parser
from datetime import timezone
from dateutil.parser import parse
from collections import namedtuple 

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark.sql.functions import udf
import altair as alt


In [2]:
# Create spark session
spark = SparkSession\
    .builder\
    .appName("EnronEmailAnalysis")\
    .getOrCreate()

In [4]:
# Defining the initial schema to read the source file
fileDir = StructField("file", StringType(), True)
emailMsg = StructField("message", StringType(), True)
columnList = [fileDir, emailMsg]
emailDfSchema = StructType(columnList)
# emailDfSchema

In [5]:
# Generate the first spak dataframe
emailDf = spark.read.csv(
	path='emails.csv',
	header=True,
	multiLine=True,
	quote='"',
	escape='"',
	schema=emailDfSchema
)

In [6]:
# Showing the initial corpus count and the dataframe along with the schema
print ('Total row count: ',emailDf.count())
emailDf.show(5, truncate=True)
emailDf.printSchema()


Total row count:  517401
+--------------------+--------------------+
|                file|             message|
+--------------------+--------------------+
|allen-p/_sent_mai...|Message-ID: <1878...|
|allen-p/_sent_mai...|Message-ID: <1546...|
|allen-p/_sent_mai...|Message-ID: <2421...|
|allen-p/_sent_mai...|Message-ID: <1350...|
|allen-p/_sent_mai...|Message-ID: <3092...|
+--------------------+--------------------+
only showing top 5 rows

root
 |-- file: string (nullable = true)
 |-- message: string (nullable = true)



In [7]:
# Intermediate Step 1
# Splitting the file column
emailDf.select(
	"file",
	f.split("file","/").alias("filelist"),
	# f.posexplode(f.split("file","/")).alias("pos","val"),
	"message"
)\
.show()

+--------------------+--------------------+--------------------+
|                file|            filelist|             message|
+--------------------+--------------------+--------------------+
|allen-p/_sent_mai...|[allen-p, _sent_m...|Message-ID: <1878...|
|allen-p/_sent_mai...|[allen-p, _sent_m...|Message-ID: <1546...|
|allen-p/_sent_mai...|[allen-p, _sent_m...|Message-ID: <2421...|
|allen-p/_sent_mai...|[allen-p, _sent_m...|Message-ID: <1350...|
|allen-p/_sent_mai...|[allen-p, _sent_m...|Message-ID: <3092...|
|allen-p/_sent_mai...|[allen-p, _sent_m...|Message-ID: <3096...|
|allen-p/_sent_mai...|[allen-p, _sent_m...|Message-ID: <1625...|
|allen-p/_sent_mai...|[allen-p, _sent_m...|Message-ID: <1718...|
|allen-p/_sent_mai...|[allen-p, _sent_m...|Message-ID: <2064...|
|allen-p/_sent_mai...|[allen-p, _sent_m...|Message-ID: <3079...|
|allen-p/_sent_mai...|[allen-p, _sent_m...|Message-ID: <3307...|
|allen-p/_sent_mai...|[allen-p, _sent_m...|Message-ID: <2545...|
|allen-p/_sent_mai...|[al

In [8]:
# Intermediate Step 2
# Adding the user name column

emailDf.select(
    "file",
   	f.split("file", "/").alias("filelist"),
	f.posexplode(f.split("file","/")).alias("pos","val"),
   	"message"
)\
.drop("val")\
.select(
	"file",
	f.concat(f.lit("username"),f.col("pos").cast("string")).alias("users"),
	f.expr("filelist[pos]").alias("val"),
	"message"
)\
.show()

+--------------------+---------+----------+--------------------+
|                file|    users|       val|             message|
+--------------------+---------+----------+--------------------+
|allen-p/_sent_mai...|username0|   allen-p|Message-ID: <1878...|
|allen-p/_sent_mai...|username1|_sent_mail|Message-ID: <1878...|
|allen-p/_sent_mai...|username2|        1.|Message-ID: <1878...|
|allen-p/_sent_mai...|username0|   allen-p|Message-ID: <1546...|
|allen-p/_sent_mai...|username1|_sent_mail|Message-ID: <1546...|
|allen-p/_sent_mai...|username2|       10.|Message-ID: <1546...|
|allen-p/_sent_mai...|username0|   allen-p|Message-ID: <2421...|
|allen-p/_sent_mai...|username1|_sent_mail|Message-ID: <2421...|
|allen-p/_sent_mai...|username2|      100.|Message-ID: <2421...|
|allen-p/_sent_mai...|username0|   allen-p|Message-ID: <1350...|
|allen-p/_sent_mai...|username1|_sent_mail|Message-ID: <1350...|
|allen-p/_sent_mai...|username2|     1000.|Message-ID: <1350...|
|allen-p/_sent_mai...|use

In [9]:
# Intermediate Step 3
# Grouping by user name column

df1 = emailDf.select(
    "file",
   	f.split("file", "/").alias("filelist"),
   	f.posexplode(f.split("file", "/")).alias("pos", "val"),
   	"message"
)\
    .drop("val")\
    .select(
	"file",
	f.concat(f.lit("username"), f.col("pos").cast("string")).alias("users"),
	f.expr("filelist[pos]").alias("val"),
	"message"
)\
.select(
	"users",
	"val",
	"message"
)

df1.show()


+---------+----------+--------------------+
|    users|       val|             message|
+---------+----------+--------------------+
|username0|   allen-p|Message-ID: <1878...|
|username1|_sent_mail|Message-ID: <1878...|
|username2|        1.|Message-ID: <1878...|
|username0|   allen-p|Message-ID: <1546...|
|username1|_sent_mail|Message-ID: <1546...|
|username2|       10.|Message-ID: <1546...|
|username0|   allen-p|Message-ID: <2421...|
|username1|_sent_mail|Message-ID: <2421...|
|username2|      100.|Message-ID: <2421...|
|username0|   allen-p|Message-ID: <1350...|
|username1|_sent_mail|Message-ID: <1350...|
|username2|     1000.|Message-ID: <1350...|
|username0|   allen-p|Message-ID: <3092...|
|username1|_sent_mail|Message-ID: <3092...|
|username2|     1001.|Message-ID: <3092...|
|username0|   allen-p|Message-ID: <3096...|
|username1|_sent_mail|Message-ID: <3096...|
|username2|     1002.|Message-ID: <3096...|
|username0|   allen-p|Message-ID: <1625...|
|username1|_sent_mail|Message-ID

In [10]:
# Creating the temporary view with the DF for querying the data
df1.createOrReplaceTempView("tmpView_emailDf")

In [53]:
spark.sql("select val,count(message) as messages from tmpView_emailDf\
			where users='username0'\
			group by val\
			order by messages desc").show()


+------------+--------+
|         val|messages|
+------------+--------+
|  kaminski-v|   28465|
|  dasovich-j|   28234|
|      kean-s|   25351|
|      mann-k|   23381|
|     jones-t|   19950|
|shackleton-s|   18687|
|    taylor-m|   13875|
|    farmer-d|   13032|
|   germany-c|   12436|
|      beck-s|   11830|
|     symes-k|   10827|
|     nemec-g|   10655|
|     scott-s|    8022|
|    rogers-b|    8009|
|      bass-e|    7823|
|   sanders-r|    7329|
|  campbell-l|    6490|
|   shapiro-r|    6071|
|    guzman-m|    6054|
|       lay-k|    5937|
+------------+--------+
only showing top 20 rows



In [67]:
# loading to a dataframe in order to plot a graph
df2 = spark.sql("select val,count(message) as messages from tmpView_emailDf\
			where users='username0'\
			group by val\
			order by messages desc").toPandas()

In [68]:
top20df2 = df2.nlargest(20,['messages'])
top20df2

Unnamed: 0,val,messages
0,kaminski-v,28465
1,dasovich-j,28234
2,kean-s,25351
3,mann-k,23381
4,jones-t,19950
5,shackleton-s,18687
6,taylor-m,13875
7,farmer-d,13032
8,germany-c,12436
9,beck-s,11830


In [81]:
# Plotting

bars = alt.Chart(top20df2).mark_bar().encode(
	alt.X('val',title = 'Users'),
	alt.Y('messages', title = '# of messages')
	)

text = bars.mark_text(
	align='center',
	baseline='middle',
	dx=1
).encode(
	text='messages'
)

(bars + text).properties(width=800)

In [50]:
spark.sql("select count(1) from tmpView_emailDf").show()
# 1,583,415


+--------+
|count(1)|
+--------+
| 1583415|
+--------+



In [108]:
# message sample
plain_sample_msg_df = spark.sql("select message from tmpView_emailDf LIMIT 10").toPandas()
plain_sample_msg = plain_sample_msg_df.message
plain_sample_msg = plain_sample_msg[8].strip()
len(plain_sample_msg)
plain_sample_msg



"Message-ID: <24216240.1075855687451.JavaMail.evans@thyme>\nDate: Wed, 18 Oct 2000 03:00:00 -0700 (PDT)\nFrom: phillip.allen@enron.com\nTo: leah.arsdall@enron.com\nSubject: Re: test\nMime-Version: 1.0\nContent-Type: text/plain; charset=us-ascii\nContent-Transfer-Encoding: 7bit\nX-From: Phillip K Allen\nX-To: Leah Van Arsdall\nX-cc: \nX-bcc: \nX-Folder: \\Phillip_Allen_Dec2000\\Notes Folders\\'sent mail\nX-Origin: Allen-P\nX-FileName: pallen.nsf\n\ntest successful.  way to go!!!"

In [13]:
output_columns = [
    'username',
    'payload',
    'Message-ID',
    'Date',
    'From',
    'To',
    'Subject',
    'Mime-Version',
    'Content-Type',
    'Content-Transfer-Encoding',
    'X-From',
    'X-To',
    'X-cc',
    'X-bcc',
    'X-Folder',
    'X-Origin',
    'X-FileName',
    'Cc',
    'Bcc'
]

columns = [column.replace('-', '_') for column in output_columns]
# columns = output_columns
ParsedEmail = namedtuple('ParsedEmail', columns)
ParsedEmail._fields

('username',
 'payload',
 'Message_ID',
 'Date',
 'From',
 'To',
 'Subject',
 'Mime_Version',
 'Content_Type',
 'Content_Transfer_Encoding',
 'X_From',
 'X_To',
 'X_cc',
 'X_bcc',
 'X_Folder',
 'X_Origin',
 'X_FileName',
 'Cc',
 'Bcc')

In [11]:
def parse_email(original_msg):
    result = {}
    msg = Parser(policy=default).parsestr(original_msg)
    result['payload'] = msg.get_payload()
    # result['text'] = parse_html_payload(result['payload'])
    try:
        for key, value in msg.items():
            result[key.replace('-', '_')] = value
    except Exception as e:
        print('Problem parsing email: {} {}'.format("email", e))

    try:
        result['Date'] = parse(result['Date'], ignoretz=False).isoformat()
    except Exception as e:
        print('Problem converting date: {} {}'.format(result.get('date'), e))

    tuple_result = tuple([str(result.get(column, None)) for column in columns])
    return ParsedEmail(*tuple_result)


In [120]:
parsed_msg = parse_email(plain_sample_msg)
print(parsed_msg.payload)
print(parsed_msg.username)
print(parsed_msg.Date)
print(parsed_msg.Message_ID)
print(parsed_msg.From)


test successful.  way to go!!!
None
2000-10-18T03:00:00-07:00
<24216240.1075855687451.JavaMail.evans@thyme>
phillip.allen@enron.com


In [14]:
## This creates a schema for the email data
email_struct = StructType()

for column in columns:
    email_struct.add(column, StringType(), True)

In [17]:
parse_email_func = udf(lambda z: parse_email(z), email_struct)


def parse_emails(input_df):
    # new_df = input_df.select(
    #     'username', 'id', 'original_msg', parse_email_func(
    #         'original_msg').alias('parsed_email')
    # )
    new_df = input_df.select(
        'users', 'val', 'message', parse_email_func(
            'message').alias('parsed_email')
    )

    for column in columns:
        new_df = new_df.withColumn(column, new_df.parsed_email[column])

    new_df = new_df.drop('parsed_email')
    return new_df


In [2]:
df3 = spark.sql("select * from tmpView_emailDf	where users='username0'")
df3.show()

NameError: name 'spark' is not defined

In [22]:
n_df = parse_emails(df3)
p_df = n_df.toPandas()
p_df.show()

In [1]:
p_df.show()

NameError: name 'p_df' is not defined

In [None]:
# ## This creates a user-defined function which can be used in Spark
# parse_email_func = udf(lambda z: parse_email(z), email_struct)


# def parse_emails(input_df):
#     new_df = input_df.select(
#         'username', 'id', 'original_msg', parse_email_func(
#             'original_msg').alias('parsed_email')
#     )
#     for column in columns:
#         new_df = new_df.withColumn(column, new_df.parsed_email[column])

#     new_df = new_df.drop('parsed_email')
#     return new_df


# class ParseEmailsTransformer(Transformer):
#     def _transform(self, dataset):
#         """
#         Transforms the input dataset.

#         :param dataset: input dataset, which is an instance of :py:class:`pyspark.sql.DataFrame`
#         :returns: transformed dataset
#         """
#         return dataset.transform(parse_emails)


# ## Use the custom ParseEmailsTransformer, Tokenizer, and CountVectorizer
# ## to create a spark pipeline
# tokenizer = Tokenizer(inputCol="original_msg", outputCol="words")
# count_vectorizer = CountVectorizer(
#     inputCol=tokenizer.getOutputCol(), outputCol="features")

# email_pipeline = Pipeline(
#     ## TODO: Complete code
#     stages=[
#         # //*** Stage 1: parsing the email message
#         ParseEmailsTransformer(),
#         # //*** Tokenizer
#         tokenizer,
#         # //*** Implement Count vectorizer
#         count_vectorizer
#     ]
# )
# model = email_pipeline.fit(df)
# result = model.transform(df)
