<img src="http://spark.apache.org/images/spark-logo.png">

# Email leakage prevention using Spark and Machine Learning

## General Overview
* What is the problem and why is it important?
* What approach have we used?
* What was our data?
* Results

## Let's have some algorithm in action!

## Spark Introduction
It's bright new and it's sparkling!

<img style="float: left;" src = "http://www.scala-lang.org/resources/img/smooth-spiral.png" > <img style="float: left;" src="http://insights.dice.com/wp-content/uploads/2012/04/java_logo.jpg">   <img style="float: left;" src = "http://jeroenooms.github.io/r-dependency-versioning/slides/logo.png" > <img style="float: left;" src = "http://www.element14.com/community/servlet/JiveServlet/downloadImage/38-13581-176681/140-140/python-logo.png" >




<img align="left" src="https://spark.apache.org/images/logistic-regression.png">

Logistic regression in Hadoop and Spark

### RDDs - Resilent Distributed Datasets
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.

####Setting up Spark

In [None]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
master = "local"
sc = SparkContext(master, "LEAKAGE")
sql_ctx = SQLContext(sc)
print('DONE! Using Spark version', sc.version)

####Creating RDDs

In [None]:
names = ['Joe', 'Tomek', 'Kate']
namesRDD = sc.parallelize(names)
print(type(namesRDD), namesRDD)

In [None]:
RDDcollected = namesRDD.collect()
print(type(RDDcollected), RDDcollected)

####Our data

In [None]:
def term_frequency(some_text):
        from pyspark.ml.feature import HashingTF, Tokenizer
        hashingTF = HashingTF()
        featurizedData = hashingTF.transform(some_text.split())
        return featurizedData.collect()[0][2]

In [None]:
from data_abstraction import MessageEntry

In [None]:
a = MessageEntry(body='         Hello    World!')
print(a)

In [None]:
print(a.__eq__(a))
from copy import deepcopy
b = deepcopy(a)
b.mid = 10
print(a.__eq__(b))

In [None]:
df = sql_ctx.read.json('messages.json')
#print(type(df), df)
data = df.rdd.map(lambda row: MessageEntry(mid = row[6], body = row[1], from_field = row[5], subject = row[8], \
                 date = row[3], owner = row[7], to = row[9], cc = row[2], bcc = row[0], folder=row[4]))
#print(type(q))
print("Number of entries", data.count())
print(data.collect())

### RDD Operations
#### Filter

In [None]:
from numpy import all, any
folders = ['Sent', 'All documents']
data = data.filter(lambda message: all([folder not in message.folder for folder in folders]))

print("Number of entries", data.count())
print(data.collect())

In [None]:
data = data.filter(lambda message: message.date != "")
print("Number of entries", data.count())
print(data.collect())

#### Distinct

In [None]:
data = data.distinct()
print("Number of entries", data.count())
print(data.collect())

####Map

In [None]:
unification_dict = {'vkaminski@aol.com':'vince.kaminski@enron.com', 'vkamins@enron.com':'vince.kaminski@enron.com', 'vince.j.kaminski@enron.com':'vince.kaminski@enron.com', 'j.kaminski@enron.com':'vince.kaminski@enron.com'}

def unificate(message):
    if message.from_field in unification_dict.keys():
        message.from_field = unification_dict[message.from_field]
    for idx, recipient in enumerate([message.cc, message.bcc, message.to]):
        if recipient is not None:
            rec_list = []
            for address in recipient:
                if address in unification_dict.keys():
                    rec_list.append(unification_dict[address])
                else:
                    rec_list.append(address)
            if idx == 0:
                message.cc = tuple(rec_list)
            elif idx == 1:
                message.bcc = tuple(rec_list)
            else:
                message.to = tuple(rec_list)
            
    return message
 
data = data.map(lambda message: unificate(message))
print("Number of entries", data.count())
print(data.collect())

####CombineByKey

In [None]:
data = data.map(lambda message: (message.owner, message))
print("Number of entries", data.count())
print(data.collect())

In [None]:
# ...:         .aggregateByKey(
# ...:                    # Value to start aggregation (passed as s to `lambda s, d`)
# ...:                    "start",
# ...:                    # Function to join final data type (string) and rdd data type
# ...:                    lambda s, d: "[ %s %s ]" % (s, d["value"]),
# ...:                    # Function to join two final data types.
# ...:                    lambda s1, s2: "{ %s %s }" % (s1, s2),
# ...:                    ) \

data = data.aggregateByKey(0, lambda number, message: number+1, lambda number1, number2: number1 + number2)
print("Number of entries", data.count())
print(data.collect())

## Explore on your own!
https://spark.apache.org/docs/latest/programming-guide.html

http://spark.apache.org/docs/latest/ml-features.html

In [None]:
# insert code below and have fun
