## Apache Spark
Apache Spark is a fast cluster computing framework which is used for processing, querying and analyzing Big data. It is based on In-memory computation, which is a big advantage of Apache Spark over several other big data Frameworks

But what if the data you are dealing with cannot be fit into a single machine? Maybe you can implement careful sampling to do your analysis on a single machine, but with distributed computing framework like PySpark, you can efficiently implement the task for large datasets.

Spark has three different data structures available through its APIs: RDD, Dataframe (this is different from Pandas data frame), Dataset. For this post, I will work with Dataframe, and the corresponding machine learning library SparkML

SparkMLLib is used with RDD, while SparkML supports Dataframe.

In [14]:
import findspark
#
import pandas as pd
import numpy as np
import pyspark as ps
import warnings
from pyspark.sql import SQLContext
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator


**First step in any Apache programming is to create a SparkContext.**
SparkContext is needed when we want to execute operations in a cluster. SparkContext tells Spark how and where to access a cluster. It is first step to connect with Apache Cluster.

In [6]:
try:
    # create SparkContext on all CPUs available: in my case I have 4 CPUs on my laptop
    sc = ps.SparkContext('local[4]')
    sqlContext = SQLContext(sc)
    print("Just created a SparkContext")
except ValueError:
    warnings.warn("SparkContext already exists in this scope")

Just created a SparkContext


In [10]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/Users/vinitachaudhary/Desktop/Machine Learning/MLCourse.AI/clean_tweet.csv')
type(df)

pyspark.sql.dataframe.DataFrame

In [16]:
df.show(5)

+---+--------------------+------+
|_c0|                text|target|
+---+--------------------+------+
|  0|awww that bummer ...|     0|
|  1|is upset that he ...|     0|
|  2|dived many times ...|     0|
|  3|my whole body fee...|     0|
|  4|no it not behavin...|     0|
+---+--------------------+------+
only showing top 5 rows



In [17]:
df = df.dropna()
df.count()

1596041

After dropping NA, we have a bit less than 1.6 million Tweets. I will split this into three parts; training, validation, test. Since I have around 1.6 million entries, 1% each for validation and test set will be enough to test the models.

In [24]:
(train_set, val_set, test_set) = df.randomSplit([0.98, 0.01, 0.01], seed = 2000)

Through my previous attempt at sentiment analysis with Pandas and Scikit-Learn, I learned that TF-IDF with Logistic Regression is quite a strong combination, and showed robust performance, as high as Word2Vec + Convolutional Neural Network model.

In [18]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

## TFIDF
**TFIDF** is another way to convert textual data to numeric form, and is short for Term Frequency-Inverse Document Frequency. The vector value it yields is the product of these two terms; TF and IDF.

We have already looked at term frequency with count vectorizer, but this time, we need one more step to calculate the relative frequency.
<img src = "TfIdf.png">

we need to get **Inverse Document Frequency**, which measures how important a word is to differentiate each document by following the calculation as below.
<img src="Idf.png">

Now TDFIDF:
<img src="FinalTfIdf.png">

**TFIDF score is 0**, which means the term is not really informative in differentiating documents. The rest is same as count vectorizer, TFIDF vectorizer will calculate these scores for terms in documents, and convert textual data into the numeric form.

In [26]:
# tokenizer
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show(5)

+---+--------------------+------+--------------------+--------------------+--------------------+-----+
|_c0|                text|target|               words|                  tf|            features|label|
+---+--------------------+------+--------------------+--------------------+--------------------+-----+
|  0|awww that bummer ...|     0|[awww, that, bumm...|(65536,[8436,8847...|(65536,[8436,8847...|  0.0|
|  1|is upset that he ...|     0|[is, upset, that,...|(65536,[1444,2071...|(65536,[1444,2071...|  0.0|
|  2|dived many times ...|     0|[dived, many, tim...|(65536,[2548,2888...|(65536,[2548,2888...|  0.0|
|  3|my whole body fee...|     0|[my, whole, body,...|(65536,[158,11650...|(65536,[158,11650...|  0.0|
|  4|no it not behavin...|     0|[no, it, not, beh...|(65536,[1968,4488...|(65536,[1968,4488...|  0.0|
+---+--------------------+------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



**BinaryClassificationEvaluator** evaluates is by default ***areaUnderROC.***

In [27]:

lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.860452142896146

And for binary classification, Spark doesn’t support accuracy as a metric. But I can still calculate accuracy by counting the number of predictions matching the label and dividing it by the total entries.

In [28]:
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
accuracy

0.7926482662792901

## CountVectorizer + IDF + Logistic Regression
There’s another way that to get term frequency for IDF (Inverse Document Frequency) calculation. It is ***CountVectorizer in SparkML***. Apart from the reversibility of the features (vocabularies), there is an important difference in how each of them filters top features. In case of ***HashingTF it is dimensionality reduction with possible collisions. CountVectorizer discards infrequent tokens.***

The Tfidf Transformer transforms a count matrix to a normalized tf or tf-idf representation. So although both the CountVectorizer and TfidfTransformer (with use_idf=False) produce term frequencies, TfidfTransformer is normalizing the count.

In [30]:
%%time
from pyspark.ml.feature import CountVectorizer

tokenizer = Tokenizer(inputCol="text", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])

pipelineFit = pipeline.fit(train_set)
predictions = pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions)

print ("Accuracy Score: {0:.4f}".format(accuracy))
print ("ROC-AUC: {0:.4f}".format(roc_auc))

Accuracy Score: 0.7956
ROC-AUC: 0.8657
CPU times: user 59 ms, sys: 17.3 ms, total: 76.4 ms
Wall time: 57.3 s


## N-gram
Define a range of n-grams to call TfIdf Vectorizer upon. But with Spark, it is a bit more complicated. It does not automatically combine features from different n-grams, so use VectorAssembler in the pipeline, to combine the features returned from each n-gram.

first tried to extract around 16,000 features from unigram, bigram, trigram. So, it will get aprrox 48,000 features in total. Then implemented Chi-Squared feature selection to reduce the number of features to 16,000 in total.

In [31]:
from pyspark.ml.feature import NGram, VectorAssembler
from pyspark.ml.feature import ChiSqSelector
def build_trigrams(inputCol=["text","target"], n=3):
    tokenizer = [Tokenizer(inputCol="text", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=2**14,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]
    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="rawFeatures"
    )]
    label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]
    selector = [ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+selector+lr)

In [34]:
%%time
trigram_pipelineFit = build_trigrams().fit(train_set)
predictions = trigram_pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(dev_set.count())
roc_auc = evaluator.evaluate(predictions)
# print accuracy, roc_auc
print("Accuracy Score: {0:.4f}".format(accuracy))
print ("ROC-AUC: {0:.4f}".format(roc_auc))

### Without ChiSquared feature selector

In [35]:
def build_ngrams_wocs(inputCol=["text","target"], n=3):
    tokenizer = [Tokenizer(inputCol="text", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=5460,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
    label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+lr)


In [36]:
%%time
trigramwocs_pipelineFit = build_ngrams_wocs().fit(train_set)
predictions_wocs = trigramwocs_pipelineFit.transform(val_set)
accuracy_wocs = predictions_wocs.filter(predictions_wocs.label == predictions_wocs.prediction).count() / float(val_set.count())
roc_auc_wocs = evaluator.evaluate(predictions_wocs)
# print accuracy, roc_auc
print ("Accuracy Score: {0:.4f}".format(accuracy_wocs))
print ("ROC-AUC: {0:.4f}".format(roc_auc_wocs))


Accuracy Score: 0.8109
ROC-AUC: 0.8836
CPU times: user 164 ms, sys: 46.7 ms, total: 211 ms
Wall time: 5min 2s


In [38]:
test_predictions = trigramwocs_pipelineFit.transform(test_set)
test_accuracy = test_predictions.filter(test_predictions.label == test_predictions.prediction).count() / float(test_set.count())
test_roc_auc = evaluator.evaluate(test_predictions)
# print accuracy, roc_auc
print("Accuracy Score: {0:.4f}".format(test_accuracy))
print ("ROC-AUC: {0:.4f}".format(test_roc_auc))

Accuracy Score: 0.8120
ROC-AUC: 0.8838
