# IST 718: Big Data Analytics

- Professor: Willard Williamson <wewillia@syr.edu>
- Faculty Assistant: Palaniappan Muthukkaruppan
## General instructions:

- You are welcome to discuss the problems with your classmates but __you are not allowed to copy any part of your answers from your classmates.  Short code snippets are allowed from the internet.  Any code is allowed from the class text books or class provided code.__
- Please do not change the file names. The FAs and the professor use these names to grade your homework.
- Remove or comment out code that contains `raise NotImplementedError`. This is mainly to make the `assert` statement fail if nothing is submitted.
- The tests shown in some cells (i.e., `assert` and `np.testing.` statements) are used to grade your answers. **However, the professor and FAs will use __additional__ test for your answer. Think about cases where your code should run even if it passess all the tests you see.**
- Before submitting your work through Blackboard, remember to save and press `Validate` (or go to 
`Kernel`$\rightarrow$`Restart and Run All`).

In [2]:
# Load the packages needed for this part
# create spark and sparkcontext objects
from pyspark.sql import SparkSession
import numpy as np

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

import pyspark
from pyspark.ml import feature, regression, Pipeline, classification, pipeline, evaluation
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import functions as fn, Row
from pyspark.sql.functions import when, regexp_extract, col
from pyspark import sql

import matplotlib.pyplot as plt
import pandas as pd

In this section, you are going to develop a SMS spam detector based on logistic regression. This is the same idea behind sentiment analysis, but instead of predicting positive sentiment vs negative sentiment, you are going to predict whether a SMS text is spam or not.

The dataset will be in `sms_spam_df`

In [4]:
sms_spam_df = spark.read.csv('/FileStore/tables/sms_spam.csv', header=True, inferSchema=True)

# Question 1.1 (10 pts)

Encode the `type` column to be 1 for `spam` and 0 for `ham` and store the result in `sms_spam2_df`

In [6]:
# create sms_spam2_df below
sms_spam2_df = sms_spam_df.withColumn("type", fn.when(fn.col('type') == ('spam'),1).otherwise(0))
sms_spam2_df.show()
#raise NotImplementedError()

In [7]:
# (10 pts)
np.testing.assert_array_equal(
    sms_spam2_df.groupBy('type').count().orderBy('type').rdd.map(lambda x: x['count']).collect(),
    [4827, 747]
)

# Question 1.2: tfidf feature engineering (15 pts)
Create a pipeline that combines a `Tokenizer`, `CounterVectorizer`, and a `IDF` estimator to compute the tfidf vectors of each SMS. Fit this pipeline and assign the pipeline transformer to a variable `tfidf_pipeline`. The `Tokenizer` step should create a column `words`, the `CounterVectorizer` step should create a column `tf`, and the `IDF` step should create a column `tfidf`.

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF

In [10]:
# create a Pipeline transformer and name it tfidf_pipeline
tokenizer = Tokenizer().setInputCol('text').setOutputCol('words')
CounterVectorizer = CountVectorizer().setInputCol('words').setOutputCol('tf')
IDF = IDF().setInputCol('tf').setOutputCol('tfidf')
tfidf_pipeline = Pipeline(stages = [tokenizer, CounterVectorizer, IDF]).fit(sms_spam2_df)
#raise NotImplementedError()

In [11]:
tfidf_pipeline.transform(sms_spam2_df).show(5)

In [12]:
# (15 pts)
np.testing.assert_array_equal([type(s) for s in tfidf_pipeline.stages],
                              [feature.Tokenizer, feature.CountVectorizerModel, feature.IDFModel])

Investigate the fitted pieline above and create a variable `lowest_idf` that contains the set of words with the 5 lowest IDF. **Hint: you must extract the vocabulary from the fitted `CountVectorizer` and the IDF values from the fitted `IDF`, both in the stages of `tfidf_pipeline`. You can put both lists into Pandas dataframe columns and sort by idf, picking 5 after sorting**

In [14]:
tfidf_pd_df = tfidf_pipeline.transform(sms_spam2_df).toPandas()
tfidf_pd_df.head()

Unnamed: 0,type,text,words,tf,tfidf
0,0,"Go until jurong point, crazy.. Available only ...","[go, until, jurong, point,, crazy.., available...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.975..."
1,0,Ok lar... Joking wif u oni...,"[ok, lar..., joking, wif, u, oni...]","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 2.02617709712, 0.0, ..."
2,1,Free entry in 2 a wkly comp to win FA Cup fina...,"[free, entry, in, 2, a, wkly, comp, to, win, f...","(3.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, ...","(3.60564694586, 0.0, 0.0, 1.56987231223, 0.0, ..."
3,0,U dun say so early hor... U c already then say...,"[u, dun, say, so, early, hor..., u, c, already...","(0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 4.05235419423, 0.0, ..."
4,0,"Nah I don't think he goes to usf, he lives aro...","[nah, i, don't, think, he, goes, to, usf,, he,...","(1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.20188231529, 1.25016544811, 0.0, 0.0, 0.0, ..."


In [15]:
# create lower_idf with list of words with 5 lowest IDF values.
vocab = tfidf_pipeline.stages[1].vocabulary
idf = tfidf_pipeline.stages[-1].idf.toArray()
lowest_idf = pd.DataFrame({'Vocab': vocab, 'IDF': idf})
lowest_idf = lowest_idf.sort_values('IDF')
lowest_idf = set(lowest_idf.Vocab.head())
#raise NotImplementedError()

In [16]:
# (5 pts)
# it is a set
np.testing.assert_equal(type(lowest_idf), set)
# it has 5 elements
np.testing.assert_equal(len(lowest_idf), 5)
# each element is a string
np.testing.assert_equal({type(w) for w in lowest_idf}, {str})

# Question 1.3: uppercase feature (15 pts)

Typical spam messages contain words that are upper case. Create a dataframe `sms_spam3_df` where you add a new column `has_uppercase` which contains an integer `1` if the first sequence of uppercase letters is longer or equal to 3 and an integer `0` otherwise.

In [18]:
pd.options.display.max_colwidth = 500
pd.set_option('display.max_rows', None)

In [19]:
# create sms_spam3_df below
sms_spam3_df = sms_spam2_df.select(fn.col('type'), fn.col('text'), fn.when((fn.length(fn.regexp_extract(fn.col('text'),'[A-Z]{3,}',0))) >= 3,1).otherwise(0).alias('has_uppercase'))
#raise NotImplementedError()

In [20]:
sms_spam3_df.show()

The first three messages with `has_uppercase == 1` are as follows:

```python
sms_spam3_df.where('has_uppercase == 1').take(3)
```

```console
[Row(type=1, text='WINNER!! As a valued network customer you have been selected to receivea £900 prize reward! To claim call 09061701461. Claim code KL341. Valid 12 hours only.', has_uppercase=1),
 Row(type=1, text='Had your mobile 11 months or more? U R entitled to Update to the latest colour mobiles with camera for Free! Call The Mobile Update Co FREE on 08002986030', has_uppercase=1),
 Row(type=1, text='SIX chances to win CASH! From 100 to 20,000 pounds txt> CSH11 and send to 87575. Cost 150p/day, 6days, 16+ TsandCs apply Reply HL 4 info', has_uppercase=1)]
```

In [22]:
# try it here
sms_spam3_df.where('has_uppercase == 1').take(3)

In [23]:
# (15 pts)
np.testing.assert_equal(set(sms_spam3_df.columns), {'has_uppercase', 'text', 'type'})
np.testing.assert_equal(type(sms_spam3_df.schema['has_uppercase'].dataType), sql.types.IntegerType)
np.testing.assert_equal(sms_spam3_df.rdd.map(lambda x : x['has_uppercase']).sum(), 891)

# Question 1.4: Compare models (15 pts)

Using the following splits:

In [25]:
training_df, validation_df, testing_df = sms_spam2_df.randomSplit([0.6, 0.3, 0.1], seed=0)

In [26]:
[training_df.count(), validation_df.count(), testing_df.count()]

Create pipelines where the first stage is the `tfidf_pipeline` created above and the second stage is a `LogisticRegression` model with different regularization parameters ($\lambda$) and elastic net mixture ($\alpha$). Fit those pipelines to the appropriate data split.

1. Logistic regression with $\lambda=0$ and $\alpha=0$ (assign the fitted pipeline to `lr_pipeline1`)
2. Logistic regression with $\lambda=0.02$ and $\alpha=0.2$ (assign the fitted pipeline to `lr_pipeline2`)
3. Logistic regression with $\lambda=0.1$ and $\alpha=0.4$ (assign the fitted pipeline to `lr_pipeline3`)

In [28]:
# create lr_pipeline1, lr_pipeline2, and lr_pipeline3
lr1 = LogisticRegression().setLabelCol('type').setFeaturesCol('tfidf').setRegParam(0.0).setMaxIter(100).setElasticNetParam(0.0)
lr2 = LogisticRegression().setLabelCol('type').setFeaturesCol('tfidf').setRegParam(0.02).setMaxIter(100).setElasticNetParam(0.2)
lr3 = LogisticRegression().setLabelCol('type').setFeaturesCol('tfidf').setRegParam(0.1).setMaxIter(100).setElasticNetParam(0.4)
lr_pipeline1 = Pipeline(stages=[tfidf_pipeline, lr1]).fit(training_df)
lr_pipeline2 = Pipeline(stages=[tfidf_pipeline, lr2]).fit(training_df)
lr_pipeline3 = Pipeline(stages=[tfidf_pipeline, lr3]).fit(training_df)
#raise NotImplementedError()

In [29]:
# (15 pts)
np.testing.assert_equal(type(lr_pipeline1), pipeline.PipelineModel)
np.testing.assert_equal(type(lr_pipeline2), pipeline.PipelineModel)
np.testing.assert_equal(type(lr_pipeline3), pipeline.PipelineModel)
np.testing.assert_array_equal([type(s) for s in lr_pipeline1.stages],
                              [pipeline.PipelineModel, classification.LogisticRegressionModel])
np.testing.assert_array_equal([type(s) for s in lr_pipeline2.stages],
                              [pipeline.PipelineModel, classification.LogisticRegressionModel])
np.testing.assert_array_equal([type(s) for s in lr_pipeline3.stages],
                              [pipeline.PipelineModel, classification.LogisticRegressionModel])

Use the evaluator object defined below to compute the ROC AUC of your predictors. For example, to compute the AUC of pipeline 1 for a dataframe `df`, you would run

```python
evaluator.evaluate(lr_pipeline1.transform(df))
```

Assign the AUC of the three models to the variables `AUC1`, `AUC2`, and `AUC3`, and and assign the pipeline with the best model to a variable `best_model`

In [31]:
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='type')

For example, the AUC on training of the first model is perfect:

```
evaluator.evaluate(lr_pipeline1.transform(training_df))
```

```console
1.0
```

In [33]:
# AUC code here
AUC1 = evaluator.evaluate(lr_pipeline1.transform(validation_df))
AUC2 = evaluator.evaluate(lr_pipeline2.transform(validation_df))
AUC3 = evaluator.evaluate(lr_pipeline3.transform(validation_df))
#raise NotImplementedError()

In [34]:
# (15 pts)
np.testing.assert_array_equal([type(AUC1), type(AUC2), type(AUC3)],
                             [float, float, float])
# AUC less than 1
np.testing.assert_array_less([AUC1, AUC2, AUC3], [1, 1, 1])
# AUC more than 0.5
np.testing.assert_array_less([.5, .5, .5],
                            [AUC1, AUC2, AUC3])

In [35]:
print(AUC1, AUC2, AUC3)

In [36]:
# Create best_model variable here
best_model = lr_pipeline2

# Question 1.5: Choose best model (15 pts)

Using the right split and the best model selected before, compute the generalization performance and assign it to a variable `AUC_best`

In [38]:
# Compute the AUC of the best model and assign to the variable AUC_best
AUC_best = evaluator.evaluate(best_model.transform(testing_df))
#raise NotImplementedError()

In [39]:
# (15 pts)
np.testing.assert_approx_equal(AUC_best, 
                               0.976126746201693, significant=2)

# Question 1.6: Inference (15 pts)

Use the pipeline 2 fitted above (`lr_pipeline2`) to create Pandas dataframes that contain the most negative words and the most positive words. In particular, create a dataframe `positive_words` with the columns `word` and `weight` with the top 20 positive words, sorted by descending coefficient. Similarly create a `negative_words` Pandas dataframe with the top 20 negative words where the coefficient are sorted in ascending order. **Hint: Use the `sentiment_analysis.ipynb` notebook in the repo for inspiration**

In [41]:
# create positive_words and negative_words pandas dataframe below    
vocabulary = tfidf_pipeline.stages[1].vocabulary
weights = lr_pipeline2.stages[-1].coefficients.toArray()
coeffs_df = pd.DataFrame({'word': vocabulary, 'weight': weights})
positive_words = coeffs_df.sort_values('weight', ascending=False).head(20)
negative_words = coeffs_df.sort_values('weight', ascending=True).head(20)
#raise NotImplementedError()

In [42]:
positive_words.head()

Unnamed: 0,weight,word
3555,0.59087,widelive.com/index.
12237,0.533567,08714712388
15,0.5171,call
81,0.513278,txt
9064,0.468274,gbp/sms


In [43]:
negative_words.head()

Unnamed: 0,weight,word
1,-0.162493,i
2444,-0.060939,fighting
3221,-0.059061,dificult
3371,-0.059061,fightng
3332,-0.059061,lose.


The `positive_words` and `negative_words` dataframe should look like this:

```python
positive_words.head()
```

<table border="1" class="dataframe">
  <thead>
    <tr style="text-align: right;">
      <th></th>
      <th>word</th>
      <th>weight</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <th>3555</th>
      <td>widelive.com/index.</td>
      <td>0.590870</td>
    </tr>
    <tr>
      <th>12237</th>
      <td>08714712388</td>
      <td>0.533567</td>
    </tr>
    <tr>
      <th>15</th>
      <td>call</td>
      <td>0.517100</td>
    </tr>
    <tr>
      <th>81</th>
      <td>txt</td>
      <td>0.513278</td>
    </tr>
    <tr>
      <th>9064</th>
      <td>gbp/sms</td>
      <td>0.468274</td>
    </tr>
  </tbody>
</table>

and 

```python
negative_words.head()
```

<table border="1" class="dataframe">
  <thead>
    <tr style="text-align: right;">
      <th></th>
      <th>word</th>
      <th>weight</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <th>1</th>
      <td>i</td>
      <td>-0.162493</td>
    </tr>
    <tr>
      <th>2444</th>
      <td>fighting</td>
      <td>-0.060939</td>
    </tr>
    <tr>
      <th>3221</th>
      <td>dificult</td>
      <td>-0.059061</td>
    </tr>
    <tr>
      <th>3371</th>
      <td>fightng</td>
      <td>-0.059061</td>
    </tr>
    <tr>
      <th>3332</th>
      <td>lose.</td>
      <td>-0.059061</td>
    </tr>
  </tbody>
</table>

In [45]:
# (15 pts)
np.testing.assert_equal(set(positive_words.columns), {'weight', 'word'})
np.testing.assert_equal(set(negative_words.columns), {'weight', 'word'})
np.testing.assert_approx_equal(positive_words.weight.sum(), 8.3701485692317927, significant=2)
np.testing.assert_approx_equal(negative_words.weight.sum(), -0.6661952507442954, significant=2)
np.testing.assert_array_less(positive_words.weight.iloc[-1], positive_words.weight.iloc[0])
np.testing.assert_array_less(negative_words.weight.iloc[0], negative_words.weight.iloc[-1])

# 1.7: Inference (15 pts)
Use the dataframe `sms_spam3_df` to create a model where the first feature is `has_uppercase` and the next set of features are the tfidf of the text. Perform feature engineering in all features using a max absolute scaler ([`MaxAbsScaler`](https://spark.apache.org/docs/2.0.2/ml-features.html#maxabsscaler)). Do a logistic regression on the resulting scaled features with regularization parameter $\lambda = 0.2$ and elastic net mixture $\alpha=0.1$ for the entire data (all of `sms_spam3_df`). Since you have scaled all features to be within the same range, you can compare them. 

**(5 pts)** with code and comments, answer below

1. is `has_uppercase` a feature that is positively or negative related to an SMS being spam?
2. what is the ratio of the coefficient of `has_uppercase` to the biggest positive tfidf coefficient?

In [47]:
# your code and comments below
# YOUR CODE HERE
va = feature.VectorAssembler(inputCols = ['has_uppercase', 'tfidf'], outputCol = 'features')
scale = feature.MaxAbsScaler(inputCol = 'features', outputCol = 'scaled_features')
scaled_lr = LogisticRegression().setLabelCol('type').setFeaturesCol('scaled_features').setRegParam(0.2).setMaxIter(100).setElasticNetParam(0.1)
scaled_pipeline = Pipeline(stages = [tokenizer, CounterVectorizer, IDF, va, scale, scaled_lr]).fit(sms_spam3_df)
#raise NotImplementedError()

In [48]:
scaled_pipeline.transform(sms_spam3_df).show()

In [49]:
has_uppercase_coeff= scaled_pipeline.stages[-1].coefficients
has_uppercase_coeff
# 1. Since the coefficient value of 'has_uppercase' is 0.92 which is a positive value, it is a feature that is positively related to an SMS being spam.

In [50]:
#Biggest positive tfidf coefficient
max_coeff = max(scaled_pipeline.stages[-1].coefficients)
ratio=(has_uppercase_coeff/max_coeff)
ratio
# 2. The ratio of the coefficient of has_uppercase to the biggest positive tfidf coefficient is 0.46.