# Jeb Bush Spark

## References

https://stackoverflow.com/questions/37513355/converting-pandas-dataframe-into-spark-dataframe-error 

https://towardsdatascience.com/multi-class-text-classification-with-pyspark-7d78d022ed35

In [1]:
import pandas as pd
from io import StringIO
import matplotlib.pyplot as plt
import os

In [2]:
in_directory = r'E:\Corpora\PII_Jeb_20190507'

In [3]:
# Read DataFrame with text from a pickle
df2 = pd.read_pickle(os.path.join(in_directory,'df_pickle_001.pkl'))

In [5]:
df2.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8763 entries, 0 to 8762
Data columns (total 7 columns):
GUID             8763 non-null object
Name             8763 non-null object
MIME Type        8763 non-null object
Kind             8763 non-null object
Tags             8763 non-null object
content          8763 non-null object
minimum_label    8763 non-null object
dtypes: object(7)
memory usage: 479.3+ KB


In [9]:
import pyspark
from pyspark.sql import SQLContext
from pyspark import SparkContext

In [None]:
sc = SparkContext()

In [14]:
sqlContext = SQLContext(sc)

In [15]:
sdf = sqlContext.createDataFrame(df2)

In [18]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

In [19]:
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="content", outputCol="words", pattern="\\W")

In [20]:
# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"] 

In [27]:
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

In [28]:
# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)


In [29]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "minimum_label", outputCol = "label")

In [30]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

In [32]:
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(sdf)
dataset = pipelineFit.transform(sdf)
dataset.show(5)

+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|                GUID|                Name|           MIME Type|    Kind|                Tags|             content|       minimum_label|               words|            filtered|            features|label|
+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|fba3c4e5-719f-462...|RE: Agency for Wo...|application/vnd.m...|   Email|PII|ID|social_sec...|Subject: RE: Agen...|PII|ID|social_sec...|[subject, re, age...|[subject, re, age...|(10000,[0,1,2,3,4...|  1.0|
|8af93e08-5904-467...|RE: Ed Curtiss-Ju...|application/vnd.m...|   Email|   PII|ID|bar_number|Subject: RE: Ed C...|   PII|ID|bar_number|[subject, re, ed,...|[subject, re, ed,..

In [34]:
dataset[0]

Column<b'GUID'>