In [1]:
## Case Study: Email AnalyticsDomain: IT Security Firm

In [2]:
import pyspark.sql.functions as F

In [4]:
spark = SparkSession.builder.appName("SpamDetection Notebook").getOrCreate()

In [5]:
raw = spark.read.option("delimiter","#").csv("use_cases/maildir/output.csv").toDF("messageid","date","from_","to_","subject")

In [6]:
df = raw.withColumn("date",F.trim(F.split(raw.date,":")[1])).withColumn("from_",F.trim(F.split(raw.from_,":")[1])).withColumn("to_",F.trim(F.split(raw.to_,":")[1])).withColumn("subject",F.trim(F.split(raw.subject,"Subject:")[1]))

In [7]:
df.show(3)

+--------------------+-------------------+--------------------+--------------------+--------------------+
|           messageid|               date|               from_|                 to_|             subject|
+--------------------+-------------------+--------------------+--------------------+--------------------+
|Message-ID: <2586...|Tue, 16 Oct 2001 10|greg.whalley@enro...|mark.frevert@enro...|FW: Project South...|
|Message-ID: <1534...|Fri, 28 Sep 2001 06|greg.whalley@enro...|john.sherriff@enr...|    RE: Confidential|
|Message-ID: <2997...| Mon, 5 Nov 2001 15|diana.scholtes@en...|josie.jarnagin@en...|RE: Reminder-Flu ...|
+--------------------+-------------------+--------------------+--------------------+--------------------+
only showing top 3 rows



In [9]:
# Display the top 10 high frequency users based on weekly numbers of mails send
df1 = df.withColumn("week", F.weekofyear(F.unix_timestamp(df.date,"EEE, dd MMM yyyy HH").cast("timestamp")))
maxweek = df1.agg(F.max(df1.week)).first()[0]
df1.groupBy("from_").count().withColumn("avgcount",F.col("count")/maxweek).sort(F.col("avgcount").desc()).show()
                                                         



+--------------------+-----+--------------------+
|               from_|count|            avgcount|
+--------------------+-----+--------------------+
|shelley.corman@en...|  481|                9.25|
|lynn.blair@enron.com|  300|   5.769230769230769|
|greg.whalley@enro...|   68|  1.3076923076923077|
|diana.scholtes@en...|   61|  1.1730769230769231|
|martin.cuilla@enr...|   20| 0.38461538461538464|
|michelle.cash@enr...|    2|0.038461538461538464|
|brad.mckay@enron.com|    1|0.019230769230769232|
+--------------------+-----+--------------------+



In [10]:
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer().setInputCol("subject").setOutputCol("words")
transformed = tokenizer.transform(df1)

In [13]:
# Extract top 20 keywords from the subject text for both 1) for the top 10 high frequency users and 2) for the non-high frequency users
top_users = [v.asDict()["from_"] for v in df1.groupBy("from_").count().sort(F.col("count").desc()).take(10)]
topuserdata = transformed.filter(transformed.subject != "").filter(transformed.from_.isin(top_users))
topuserdata.withColumn("keyword",F.explode("words")).groupBy("keyword").count().sort(F.col("count").desc()).show(20)
otheruserdata = transformed.filter(transformed.subject != "").filter(transformed.from_.isin(top_users) == False)
otheruserdata.withColumn("keyword",F.explode("words")).groupBy("keyword").count().sort(F.col("count").desc()).show(20)
                                 




+--------+-----+
| keyword|count|
+--------+-----+
|     re:|  407|
|     fw:|  245|
|     for|   59|
|       -|   57|
| meeting|   52|
|      of|   39|
|      to|   34|
|     nng|   29|
|      on|   27|
|     the|   26|
|      tw|   22|
|     gas|   20|
|     and|   19|
|  update|   18|
|    2001|   18|
| request|   17|
|northern|   16|
|   lunch|   15|
|      in|   15|
|     pep|   15|
+--------+-----+
only showing top 20 rows

+-------+-----+
|keyword|count|
+-------+-----+
+-------+-----+



In [None]:
# Extract top 10 keywords from subject by identifying removing the common stop words

In [15]:
from pyspark.ml.feature import StopWordsRemover
remover = StopWordsRemover().setInputCol("words").setOutputCol("filtered")
cleaned = remover.transform(transformed)
cleaned.filter(cleaned.subject != "").withColumn("keyword",F.explode(cleaned.words)).groupBy("keyword").count().sort(F.col("count").desc()).show()


+--------+-----+
| keyword|count|
+--------+-----+
|     re:|  407|
|     fw:|  245|
|     for|   59|
|       -|   57|
| meeting|   52|
|      of|   39|
|      to|   34|
|     nng|   29|
|      on|   27|
|     the|   26|
|      tw|   22|
|     gas|   20|
|     and|   19|
|  update|   18|
|    2001|   18|
| request|   17|
|northern|   16|
|   lunch|   15|
|      in|   15|
|     pep|   15|
+--------+-----+
only showing top 20 rows



In [18]:
# Introduce a new column label to identify new, replied, and forwarded messages
df2 = cleaned.withColumn("msgtype",F.when(cleaned.subject.startswith("Re:"),1).otherwise(F.when(cleaned.subject.startswith("Fw:"),2).otherwise(0)))

In [19]:
# Get the trend of the overall mail activity using the pivot table from spark itself
df2.groupBy("week").pivot("msgtype").count().show()

+----+---+----+----+
|week|  0|   1|   2|
+----+---+----+----+
|  31|  3|null|null|
|  34| 34|null|null|
|  26| 10|null|null|
|  27|  2|null|null|
|  44| 40|   2|null|
|  12| 34|   1|null|
|  47| 24|null|null|
|   1| 10|null|null|
|  52| 24|null|null|
|  13|  9|   1|null|
|   6| 14|   6|   1|
|   3| 20|   4|null|
|  40| 20|null|null|
|  48|100|null|null|
|   5| 11|   3|null|
|  41| 14|null|null|
|  43| 12|null|   1|
|  37|  2|null|null|
|   9| 35|   5|null|
|  35| 28|null|null|
+----+---+----+----+
only showing top 20 rows



In [21]:
# Convert keywords to feature vector
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel
df4 = df2.filter(df2.subject != "")
cvmodel = CountVectorizer().setInputCol("filtered").setOutputCol("features").fit(df4)
featured = cvmodel.transform(df4)

In [None]:
# Use kmeans clustering to create 4 clusters from the extracted keywords
from org.apache.spark.ml.clustering import KMeans
kmeans = KMeans().setK(4).setSeed(1L)
model = kmeans.fit(featured)
predictions = model.transform(featured)

In [None]:
# Use LDA to generate 4 topics from the extracted keywords
from pyspark.ml.clustering import LDA
lda = LDA().setK(4).setMaxIter(10)
model = lda.fit(featured)
topics = model.describeTopics(4)
topic_indices = topics.select("termIndices").rdd.map(lambda x:x[0][0]).collect()
[cvmodel.vocabulary[v] for v in topic_indices]