# Apache Spark Streaming Project

Purpose of this is to use the ETL/Modeling/Streaming abilities of Apache Spark in one script. This is for showcasing the abilities of Spark and reference. This does not cover indepth model creation and feature reduction.

## 1. Cassandra Setup
Run python file to connect to Cassandra Cluster(Single Node for this example) and create Keyspace. Then create and load Train table and create table for Test data for inserting our csv streaming data into.

In [2]:
%run csv2cassandra.py

Establishing connection to default cluster 127.0.0.1:9024
Dropping Keyspace sparkdb if exists.
Creating sparkdb Keyspace
Creating table ny_times_train in sparkdb Keyspace
Create table to store stream test data
Load CSV NYTimesBlogTrain and Insert into Table...

Data successfully Inserted.


## 2. Create SC and SqlContext
Create SC and SqlContext and get Datastax Cassandra connector as PYSPARK_SUBMIT_ARGS 

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = """--packages datastax:spark-cassandra-connector:1.6.0-s_2.10 pyspark-shell"""

In [2]:
conf = SparkConf()\
        .setMaster("local[2]")\
        .setAppName('Spark Streaming Project')\
        .set('spark.cassandra.connection.host','127.0.0.1')
        #.set("spark.rpc.netty.dispatcher.numThreads","2")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

## 2. Load Cassandra NY training data
Load the NY training csv file we stored into Cassandra into a Spark DataFrame

In [3]:
nydata_raw = sqlContext.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table="ny_times_train", keyspace="sparkdb")\
    .load()

## 3. UDF Feature Creation
From here we import 2 functions from features file.

registerFunctions -> Registers the functions to the SQLContext

featurecreation -> Applies the features we just registered

In [4]:
from features import registerFunctions, featurecreation

registerFunctions(sqlContext)
nydata = featurecreation(nydata_raw, 'nydata_raw', sqlContext)

## 4. TF-IDF 
Create TF-IDF features using the process_headline column it has punctuation/stop words removed and stemming applied to the original headline column. Also note we use the CountVectorizer instead of HashingTF. 

Differences between CountVectorizer and HashingTF -

CountVectorizer allows us to call vocabulary function which keeps the list of words in the sparse matrix. So you can essentially see what word has what TF-IDF score.

Where as HashingTF is one way once you apply it you can't go back. There is also is a possiblity of collision but this is rare.

Depending on the size of your dataset it might be best to use HashingTF since there is extra memory required to store the vocabulary list.


In [17]:
from pyspark.ml.feature import Tokenizer, CountVectorizer, IDF

tokenizer = Tokenizer(inputCol="process_headline", outputCol="headline_token")
vectorizer = CountVectorizer(inputCol="headline_token", outputCol="headline_vec")
idf = IDF(minDocFreq = 2, inputCol = 'headline_vec', outputCol ='tfidf_headline')

## 5. StringIndexer and OneHotEncoding
The 3 columns sectionname/subsectionname/newsdesk need to be converted to Binary values since we are using Logistic Regression which cannot take any categorical features. Store the indexers and encoders in a dictionary.

In [18]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
def oHe(Ohecols):
    ind = 'ind'
    vec = 'vec'
    
    dictOhe = {}
    
    for col in Ohecols:
        stringIndKey = col+ind
        OheVecKey = col+vec
        
        stringIndexer = StringIndexer(inputCol=col, outputCol=stringIndKey)
        encoder = OneHotEncoder(dropLast=False, inputCol = stringIndKey, outputCol=OheVecKey)
        
        dictOhe[stringIndKey] = stringIndexer
        dictOhe[OheVecKey] = encoder
        
    return dictOhe

OHE_Cols = ['sectionname', 'subsectionname', 'newsdesk']
Ohe = oHe(OHE_Cols)

## 6. VectorIndexer and VectorAssembler
Combine all the Vector columns together using the Assembler so that we can easily apply the VectorIndexer to all the columns. The function getMaxCategories goes through all the Columns and gets a distinct count and returns the max. Apply the VectorIndexer function.

In [19]:
from pyspark.ml.feature import VectorIndexer, VectorAssembler
from pyspark.sql.functions import countDistinct

vectorizeCols = ['question', 'religion', 'political', 
                 'socialmedia','dowpub', 'hourpub']
                 #'monthpub'] 
                  #'monthpub', 'dowpub', 'hourpub']
interm_assembler = VectorAssembler(inputCols=vectorizeCols, outputCol="raw_features")

def getMaxCategories(df, vecCols):
    
    numCategories = []
    
    for col in vecCols:
        numCategories.append(df.agg(countDistinct(col)).collect()[0][0])
        
    return max(numCategories)

maxCat = getMaxCategories(nydata, vectorizeCols)


interm_indexer = VectorIndexer(inputCol="raw_features", outputCol="features_numerical", maxCategories=maxCat)

## 7. Final Assembler
Combine all the columns we are going to use in our model into one column called features

In [20]:
from pyspark.ml.feature import VectorAssembler


features = [col for col in list(Ohe.keys()) if 'vec' in col]\
        + ['wordcount_norm', 'features_numerical', 'tfidf_headline']
final_assembler = VectorAssembler(inputCols=features, outputCol="features")

## 8. Logistic Regression Model

In [21]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=10, regParam=0.01)

## 9. Train and Test Split
Create train and test data set of nydata.

In [22]:
nydata.cache()

train, test = nydata.randomSplit([0.7, 0.3], seed=0)

## 10. Pipeline
Create Pipeline to contain all the steps we are going to apply to the data. This means including One Hot Encoder/Assembler/Vectorizer/Logistic Regression Model.

In [23]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[Ohe['newsdeskind'],Ohe['newsdeskvec'], 
                            Ohe['sectionnameind'], Ohe['sectionnamevec'],
                            Ohe['subsectionnameind'], Ohe['subsectionnamevec'],
                            interm_assembler, interm_indexer, tokenizer,
                            vectorizer, idf, final_assembler, lr])

### 10a. Fit training data and transform test data

In [24]:
model = pipeline.fit(train)

prediction_test = model.transform(test) 
#This warning message has been removed in Spark 2.0 from Pull Request 12732



## 11. Confusion Matrix and Model Accuracy
Now lets collect the prediction and label and create a confusion matrix to determine the model accuracy.

In [25]:
predictionsAndLabels_test = prediction_test.map(lambda row: (row.prediction, row.label)).collect()

In [26]:
import pandas as pd

def confusion_matrix(predAndLabel):
    y_actual = pd.Series([x for x, y in predAndLabel], name = 'Actual')
    y_pred = pd.Series([y for x, y in predAndLabel], name = 'Predicted')
    
    matrix = pd.crosstab(y_actual,y_pred)
    accuracy = (matrix[0][0] + matrix[1][1])/ \
                (matrix[0][0] + matrix[0][1] + matrix[1][0] + matrix[1][1])

    return matrix, accuracy

In [27]:
df_confusion_Logit, accuracy_Logit = confusion_matrix(predictionsAndLabels_test)

print('Confusion Matrix:')
print(df_confusion_Logit)
print('Logistic Model Accuracy: {0}'.format(accuracy_Logit))


Confusion Matrix:
Predicted   0.0  1.0
Actual              
0.0        1475  158
1.0          96  192
Logistic Model Accuracy: 0.8677771993753254


## 11. Spark Streaming
To simulate streaming data first we will stream NYTimesBlogTest.csv to port 9999. Run the csv2stream.py file to start listening to the port.

In [34]:
from pyspark.streaming import StreamingContext
import sys

ssc = StreamingContext(sc, 2)

lines = ssc.socketTextStream("localhost", 9999)

lineRDD = lines.map(lambda line: (line.split("\001")))

### 11a. Function to transform stream and insert into Cassandra

In [35]:
from pyspark.sql import Row
def streamrdd_to_df(rdd):

    try:
        #Create dataframe from raw text
        sqlContext = SQLContext(rdd.context)        
        rawCols = Row('newsdesk', 'sectionname', 'subsectionname',
                  'headline', 'snippet', 'abstract', 'wordcount',
                  'pubdate', 'uniqueid')
        
        raw_rdd = rdd.map(lambda r: rawCols(r[0], r[1], r[2], r[3],
                                               r[4], r[5], r[6], r[7], r[8]))
        
        raw_df = sqlContext.createDataFrame(raw_rdd)
        
        #Create new DF with features
        registerFunctions(sqlContext)
        feature_df = featurecreation(raw_df, 'df', sqlContext)
        
        #Transform new Dataframe
        prediction = model.transform(feature_df)
        
        #Recreate DF with original columns + probability
        cassCols = Row('newsdesk', 'sectionname', 'subsectionname',
          'headline', 'snippet', 'abstract', 'wordcount',
          'pubdate', 'uniqueid', 'popular_probability')

        pred_rdd = prediction.map(lambda r: cassCols(r.newsdesk, r.sectionname, 
                                                        r.subsectionname, r.headline,
                                                        r.snippet, r.abstract,
                                                        r.wordcount, r.pubdate,
                                                        r.uniqueid, float(r.probability[0])))
        
        pred_df = sqlContext.createDataFrame(pred_rdd)

        
        #Insert the data into ny_times_test_stream table
        sys.stdout.write('\rInserting {0} rows...'.format(pred_df.count()))
        
        pred_df.write\
            .format("org.apache.spark.sql.cassandra")\
            .options(table="ny_times_test_stream", keyspace="sparkdb")\
            .mode('append')\
            .save()

    except ValueError:
        print('RDD Empty')
        
    except Exception as e:
        print (e)

In [36]:
lineRDD.foreachRDD(streamrdd_to_df) #Passes each RDD into the function

sys.stdout.flush() 

### 11b. Start Stream

In [None]:
ssc.start()             
ssc.awaitTermination()  

Inserting 20 rows...