# 3. Using Apache Spark for Distributed Text Analysis with Python

###Sentiment analysis of text with TfIdf and Logistic Regression

Python is great for data science modelling, thanks to its numerous modules and packages that help achieve data science goals. But what if the data you are dealing with cannot be fit into a single machine? Maybe you can implement careful sampling to do your analysis on a single machine, but with distributed computing framework like Pyspark, you can efficiently implement the task for large data sets.

Spark API is available in multiple programming languages (Scala, Java, Python and R). There are debates about how Spark performance varies depending on which language you run it on, but since we are comfortable with python, we went ahead with Pyspark

---
**Why Tf-Idf + Logistic Regression?**

Through our research in this area, we learnt that TF-IDF with Logistic Regression is quite strong a combination, and showed robust performance, even as high as a Word2Vec + Convolutional Neural Network model. So in the project, we decided to use TF-IDF + Logistic Regression model with Pyspark.

---



In [None]:
import 

## Introduction

We are going to use PySpark and the previously cleaned dataset to perform parallel and optimised Logistic Regression to predict the sentiment of a tweet. Logistic regression works with numerical data, but we have textual data on our hands. How do we move ahead?

We use Tf-Idf which stands for Term Frequency - Inverse Document Frequence. What this is will be covered later in the report when we actually use it. For now, suffice to say, it is a method to meaningfully converting text in a corpus into multi dimensional numerical data.

In [0]:
from pyspark.sql import SparkSession, DataFrameReader, SQLContext
from pyspark.context import SparkContext
sc = SparkContext()
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

First step in any Apache programming is to create a SparkContext. SparkContext is needed when we want to execute operations in a cluster. SparkContext tells Spark how and where to access a cluster. It is first step to connect with Apache Cluster.

We have just created an Apache spark context

In [0]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('clean_tweet.csv')

Read the cleaned tweets dataset into the spark context

In [0]:
df.show(5)

+---+--------------------+------+
|_c0|                text|target|
+---+--------------------+------+
|  0|wants to compete ...|     0|
|  1|it seems we are s...|     0|
|  2|where the are my ...|     0|
|  3|ff the meetin hat...|     0|
|  4|        reply me pls|     4|
+---+--------------------+------+
only showing top 5 rows



Quick visual of what the data looks like

In [0]:
df = df.dropna()

We drop all records that have "NA" data.

In [0]:
(train_set, val_set, test_set) = df.randomSplit([0.98, 0.01, 0.01], seed = 0)

Here, we divide the entire dataset into 3 parts, each used respectively for training , validation, and testing. Here the ratios are 98% : 1% : 1% due to the generality and variety of data, we need to apportion the lion's share to the training phase.

In [0]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show(5)

+---+--------------------+------+--------------------+--------------------+--------------------+-----+
|_c0|                text|target|               words|                  tf|            features|label|
+---+--------------------+------+--------------------+--------------------+--------------------+-----+
|  0|wants to compete ...|     0|[wants, to, compe...|(65536,[2437,6194...|(65536,[2437,6194...|  1.0|
|  1|it seems we are s...|     0|[it, seems, we, a...|(65536,[4488,5660...|(65536,[4488,5660...|  1.0|
|  2|where the are my ...|     0|[where, the, are,...|(65536,[2025,2458...|(65536,[2025,2458...|  1.0|
|  3|ff the meetin hat...|     0|[ff, the, meetin,...|(65536,[7173,1412...|(65536,[7173,1412...|  1.0|
|  4|        reply me pls|     4|    [reply, me, pls]|(65536,[2037,6933...|(65536,[2037,6933...|  0.0|
+---+--------------------+------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



Here, we perform the main Tf-Idf operation on the dataset. 

###So what is Tf-Idf?

> Tf-idf stands for term frequency-inverse document frequency, and the tf-idf weight is a weight often used in information retrieval and text mining. This weight is a statistical measure used to evaluate how important a word is to a document in a collection or corpus. The importance increases proportionally to the number of times a word appears in the document but is offset by the frequency of the word in the corpus. Variations of the tf-idf weighting scheme are often used by search engines as a central tool in scoring and ranking a document's relevance given a user query.

>One of the simplest ranking functions is computed by summing the tf-idf for each query term; many more sophisticated ranking functions are variants of this simple model.

>Tf-idf can be successfully used for stop-words filtering in various subject fields including text summarization and classification.

But how is it computed?

>Typically, the tf-idf weight is composed by two terms: the first computes the normalized Term Frequency (TF), aka. the number of times a word appears in a document, divided by the total number of words in that document; the second term is the Inverse Document Frequency (IDF), computed as the logarithm of the number of the documents in the corpus divided by the number of documents where the specific term appears.

>**TF**: Term Frequency, which measures how frequently a term occurs in a document. Since every document is different in length, it is possible that a term would appear much more times in long documents than shorter ones. Thus, the term frequency is often divided by the document length (aka. the total number of terms in the document) as a way of normalization: 

>**TF(t) = (Number of times term t appears in a document) / (Total number of terms in the document)**

>**IDF**: Inverse Document Frequency, which measures how important a term is. While computing TF, all terms are considered equally important. However it is known that certain terms, such as "is", "of", and "that", may appear a lot of times but have little importance. Thus we need to weigh down the frequent terms while scale up the rare ones, by computing the following: 

>**IDF(t) = log_e(Total number of documents / Number of documents with term t in it)**

---
Therefore this gives us a very simple yet elegant method of converting text into numerical data that can be used later on in the Analytics pipeline to extract useful information.

---

In [0]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)

Here, we use Logistic Regression to actually classify the sentiments of the text and evaluate how well our model and pipeline performed.

###What is Logistic Regression?

> Logistic regression is the appropriate regression analysis to conduct when the dependent variable is dichotomous (binary).  Like all regression analyses, the logistic regression is a predictive analysis. It is used to describe data and to explain the relationship between one dependent binary variable and one or more nominal, ordinal, interval or ratio-level independent variables.

---


![alt text](https://cdn-images-1.medium.com/max/1000/1*UgYbimgPXf6XXxMy2yqRLw.png)


---

The above image is a very apt description of what Logistic regression does, without getting too mathematical.

---

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.8413855253027225

##Result
On evaluation of our model, we got an ~84% accuracy prediction, which shocked us. As we ventured further into what this value is, we found out that it is actually the ROC Area Under the Curve, which is a very famous metric for evaluating classifiers.

In [0]:
evaluator.getMetricName()

'areaUnderROC'

As seen here

In [0]:
predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())

0.781054736315921

Therefore when we actually looked at our prediction accuracy, we got a value of ~78%.

Thus after completing the whole project, our prediction accuracy was 78%, implemented in Apache Spark using Python. 

This taught all of us the nuances of this platform pretty well and thoroughly enjoyed the process.