# Spark MLlib Exercises


http://spark.apache.org/docs/latest/ml-statistics.html

In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
spark = SparkSession.builder.getOrCreate()

In [2]:
spark

In [167]:
from pyspark.ml.stat import KolmogorovSmirnovTest, Correlation
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer, Tokenizer, HashingTF, IDF, StringIndexer

## 1. Statistics (1p.)

Download the following dataset: https://www.kaggle.com/c/titanic/data?select=train.csv

In [3]:
file = "titanic_train.csv"
titanic_df = spark.read.format("csv").options(inferSchema="true", header="true").load(file)
titanic_df = titanic_df.dropna(how='any')
titanic_df.show(10)
print(titanic_df.dtypes)

+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|   Fare|      Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----------+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|PC 17599|71.2833|        C85|       C|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|  113803|   53.1|       C123|       S|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|   17463|51.8625|        E46|       S|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1| PP 9549|   16.7|         G6|       S|
|         12|       1|     1|Bonnell, Miss. El...|female|58.0|    0|    0|  113783|  26.55|       C103|       S|
|         22|       1|     2|Beesley, Mr. Lawr...|  male|34.0|    0|    0|  248698|   13.0|     

### Exercise 1.A.
**TODO:** Calculate descriptive statistics for 'Age' and 'Fare' (see https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/sql/DataFrame.html#describe(scala.collection.Seq))

In [5]:
titanic_df.describe('Age', 'Fare').show()

+-------+------------------+-----------------+
|summary|               Age|             Fare|
+-------+------------------+-----------------+
|  count|               183|              183|
|   mean|  35.6744262295082|78.68246885245901|
| stddev|15.643865966849717|76.34784270040569|
|    min|              0.92|              0.0|
|    max|              80.0|         512.3292|
+-------+------------------+-----------------+



### Exercise 1.B.

**TODO:** Check if 'Age' and 'Fare' have normal distribution (see http://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/stat/KolmogorovSmirnovTest.html)

In [33]:
KolmogorovSmirnovTest.test(titanic_df, 'Age', 'norm').first()

Row(pValue=1.943689653671754e-11, statistic=0.9713276975967852)

In [34]:
KolmogorovSmirnovTest.test(titanic_df, 'Fare', 'norm').first()

Row(pValue=8.816725127758218e-12, statistic=0.9890707515997943)

P-values are really small (<0.05) so there are from normal distribution

### Exercise 1.C.

**TODO:** Calculate Pearson correlation between the following pairs of features:  
* 'Age' and 'Survived'
* 'Sex' and 'Survived' *(remember about encoding 'Sex' attributes as 0s and 1s)*

Which correlation is stronger?

In [42]:
titanic_df.corr('Age', 'Survived')

-0.2540847542030532

In [49]:
def translate(mapping):
    def translate_(col):
        return mapping.get(col)
    return udf(translate_, IntegerType())

mapping = {
    'male': 0,
    'female': 1
}

titanic_df = titanic_df.withColumn("Sex", translate(mapping)("Sex"))

In [50]:
titanic_df.corr('Sex', 'Survived')

0.5324179744538412

Stronger Correlation for Sex

## 2. Loading data

Doc: http://spark.apache.org/docs/latest/ml-datasource.html 

Download data from https://github.com/apache/spark/blob/master/data/mllib/sample_libsvm_data.txt and load as DataFrame. 

In [51]:
file = "sample_libsvm_data.txt"

df = spark.read.format("libsvm").option("numFeatures", "780").load(file)
df.show(10)
df.take(1)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(780,[127,128,129...|
|  1.0|(780,[158,159,160...|
|  1.0|(780,[124,125,126...|
|  1.0|(780,[152,153,154...|
|  1.0|(780,[151,152,153...|
|  0.0|(780,[129,130,131...|
|  1.0|(780,[158,159,160...|
|  1.0|(780,[99,100,101,...|
|  0.0|(780,[154,155,156...|
|  0.0|(780,[127,128,129...|
+-----+--------------------+
only showing top 10 rows



[Row(label=0.0, features=SparseVector(780, {127: 51.0, 128: 159.0, 129: 253.0, 130: 159.0, 131: 50.0, 154: 48.0, 155: 238.0, 156: 252.0, 157: 252.0, 158: 252.0, 159: 237.0, 181: 54.0, 182: 227.0, 183: 253.0, 184: 252.0, 185: 239.0, 186: 233.0, 187: 252.0, 188: 57.0, 189: 6.0, 207: 10.0, 208: 60.0, 209: 224.0, 210: 252.0, 211: 253.0, 212: 252.0, 213: 202.0, 214: 84.0, 215: 252.0, 216: 253.0, 217: 122.0, 235: 163.0, 236: 252.0, 237: 252.0, 238: 252.0, 239: 253.0, 240: 252.0, 241: 252.0, 242: 96.0, 243: 189.0, 244: 253.0, 245: 167.0, 262: 51.0, 263: 238.0, 264: 253.0, 265: 253.0, 266: 190.0, 267: 114.0, 268: 253.0, 269: 228.0, 270: 47.0, 271: 79.0, 272: 255.0, 273: 168.0, 289: 48.0, 290: 238.0, 291: 252.0, 292: 252.0, 293: 179.0, 294: 12.0, 295: 75.0, 296: 121.0, 297: 21.0, 300: 253.0, 301: 243.0, 302: 50.0, 316: 38.0, 317: 165.0, 318: 253.0, 319: 233.0, 320: 208.0, 321: 84.0, 328: 253.0, 329: 252.0, 330: 165.0, 343: 7.0, 344: 178.0, 345: 252.0, 346: 240.0, 347: 71.0, 348: 19.0, 349: 28.0

### Exercise 2.A
**TODO:** Load wine data from https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/wine.scale
Dataset description: http://archive.ics.uci.edu/ml/datasets/Wine

In [58]:
file = "wine.txt"

df = spark.read.format("libsvm").option("numFeatures", "13").load(file)
df.show(10)
df.take(1)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
+-----+--------------------+
only showing top 10 rows



[Row(label=1.0, features=SparseVector(13, {0: 0.6842, 1: -0.6166, 2: 0.1444, 3: -0.4845, 4: 0.2391, 5: 0.2552, 6: 0.1477, 7: -0.434, 8: 0.1861, 9: -0.256, 10: -0.0894, 11: 0.9414, 12: 0.1227}))]

## 3. Classification (2p.)

In [76]:
file = "wine.csv" # https://gist.githubusercontent.com/tijptjik/9408623/raw/b237fa5848349a14a14e5d4107dc7897c21951f5/wine.csv

# Remember about deleting dots from the headers of this csv file!
winedf2 = spark.read.format("csv").options(inferSchema="true", header="true").load(file)
winedf2.show(10)
print(winedf2.dtypes)

+----+-------+---------+----+----+---+-------+----------+--------------------+-------+--------+----+----+-------+
|Wine|Alcohol|Malicacid| Ash| Acl| Mg|Phenols|Flavanoids|Nonflavanoid phenols|Proanth|Colorint| Hue|  OD|Proline|
+----+-------+---------+----+----+---+-------+----------+--------------------+-------+--------+----+----+-------+
|   1|  14.23|     1.71|2.43|15.6|127|    2.8|      3.06|                0.28|   2.29|    5.64|1.04|3.92|   1065|
|   1|   13.2|     1.78|2.14|11.2|100|   2.65|      2.76|                0.26|   1.28|    4.38|1.05| 3.4|   1050|
|   1|  13.16|     2.36|2.67|18.6|101|    2.8|      3.24|                 0.3|   2.81|    5.68|1.03|3.17|   1185|
|   1|  14.37|     1.95| 2.5|16.8|113|   3.85|      3.49|                0.24|   2.18|     7.8|0.86|3.45|   1480|
|   1|  13.24|     2.59|2.87|21.0|118|    2.8|      2.69|                0.39|   1.82|    4.32|1.04|2.93|    735|
|   1|   14.2|     1.76|2.45|15.2|112|   3.27|      3.39|                0.34|   1.97|  

### Exercise 3.A
**TODO:** 

Remember about deleting dots from the headers of this csv file and splitting data into train and test set


1) Create pipeline with VectorAssembler and DecisionTreeClassifier.

2) Use the pipeline to make predictions.

3) Evaluate predictions using MulticlassClassificationEvaluator.

4) Calculate accuracy and test error

5) Print the structure of the trained decision tree (hint: use toDebugString attribute)

In [77]:
train,test=winedf2.randomSplit([0.7,0.3])

In [78]:
train.count()

123

In [79]:
test.count()

55

In [104]:
featureColumns = [c for c in winedf2.columns if c != 'Wine']

# create and configure the assembler
assembler = VectorAssembler(inputCols=featureColumns, 
                            outputCol="features")

In [81]:
dt = DecisionTreeClassifier(maxDepth=2, labelCol="Wine", leafCol="leafId")

In [87]:
evaluator = MulticlassClassificationEvaluator(
    labelCol='Wine', predictionCol="prediction")

In [91]:
pipeline = Pipeline(stages=[assembler, dt])

dtPipelineModel = pipeline.fit(train)

# make predictions
traningPredictionsDF = dtPipelineModel.transform(train)
testPredictionsDF = dtPipelineModel.transform(test)

# evaluate the model on test and traning data
print("Accuracy on traning data = %g" % evaluator.evaluate(traningPredictionsDF, {evaluator.metricName: "accuracy"}))

print("Accuracy on test data = %g" % evaluator.evaluate(testPredictionsDF, {evaluator.metricName: "accuracy"}))

print("F1 on traning data = %g" % evaluator.evaluate(traningPredictionsDF, {evaluator.metricName: "f1"}))

print("F1 on test data = %g" % evaluator.evaluate(testPredictionsDF, {evaluator.metricName: "f1"}))

Accuracy on traning data = 0.918699
Accuracy on test data = 0.818182
F1 on traning data = 0.921012
F1 on test data = 0.819537


In [98]:
print(dtPipelineModel.stages[1]._call_java('toDebugString'))

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_5a09292cff8e, depth=2, numNodes=7, numClasses=4, numFeatures=13
  If (feature 12 <= 787.5)
   If (feature 9 <= 3.82)
    Predict: 2.0
   Else (feature 9 > 3.82)
    Predict: 3.0
  Else (feature 12 > 787.5)
   If (feature 6 <= 2.1550000000000002)
    Predict: 3.0
   Else (feature 6 > 2.1550000000000002)
    Predict: 1.0



### Exercise 3.B
**TODO:** 

1) Extend the pipeline from the previos task with QuantileDiscretizer 

2) Try using a couple of different numbers of buckets, which cinfiguration gives the best results?

3) Can you see any difference in the structure of the decistion tree?

In [152]:
featureColumns = [c for c in winedf2.columns + ['Proline_o'] if ('Wine' != c and 'Proline' != c)]

# create and configure the assembler
assembler = VectorAssembler(inputCols=featureColumns, 
                            outputCol="features")

In [153]:
for i in range(10):
    print()
    print()
    print('Buckets:', i)
    qds2 = QuantileDiscretizer(relativeError=0.01, handleInvalid="keep", numBuckets=2,
    inputCols=["Proline"], outputCols=["Proline_o"])
    
    pipeline = Pipeline(stages=[qds2, assembler, dt])

    dtPipelineModel = pipeline.fit(train)

    # make predictions
    traningPredictionsDF = dtPipelineModel.transform(train)
    testPredictionsDF = dtPipelineModel.transform(test)

    # evaluate the model on test and traning data
    print("    Accuracy on traning data = %g" % evaluator.evaluate(traningPredictionsDF, {evaluator.metricName: "accuracy"}))

    print("    Accuracy on test data = %g" % evaluator.evaluate(testPredictionsDF, {evaluator.metricName: "accuracy"}))

    print("    F1 on traning data = %g" % evaluator.evaluate(traningPredictionsDF, {evaluator.metricName: "f1"}))

    print("    F1 on test data = %g" % evaluator.evaluate(testPredictionsDF, {evaluator.metricName: "f1"}))
    
    print("    \n" + dtPipelineModel.stages[2]._call_java('toDebugString'))



Buckets: 0
    Accuracy on traning data = 0.918699
    Accuracy on test data = 0.909091
    F1 on traning data = 0.918941
    F1 on test data = 0.907706
    
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_5a09292cff8e, depth=2, numNodes=5, numClasses=4, numFeatures=13
  If (feature 9 <= 3.82)
   Predict: 2.0
  Else (feature 9 > 3.82)
   If (feature 6 <= 1.58)
    Predict: 3.0
   Else (feature 6 > 1.58)
    Predict: 1.0



Buckets: 1
    Accuracy on traning data = 0.918699
    Accuracy on test data = 0.909091
    F1 on traning data = 0.918941
    F1 on test data = 0.907706
    
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_5a09292cff8e, depth=2, numNodes=5, numClasses=4, numFeatures=13
  If (feature 9 <= 3.82)
   Predict: 2.0
  Else (feature 9 > 3.82)
   If (feature 6 <= 1.58)
    Predict: 3.0
   Else (feature 6 > 1.58)
    Predict: 1.0



Buckets: 2
    Accuracy on traning data = 0.918699
    Accuracy on test data = 0.909091
    F1 on traning data = 0.91894

In [155]:
featureColumns = [c for c in winedf2.columns + ['Proline_o'] if ('Wine' != c and 'Proline' != c and 'Colorint' != c)]

# create and configure the assembler
assembler = VectorAssembler(inputCols=featureColumns, 
                            outputCol="features")

In [156]:
for i in range(32):
    print()
    print()
    print('Buckets:', i)
    qds2 = QuantileDiscretizer(relativeError=0.01, handleInvalid="keep", numBuckets=2,
    inputCols=["Proline", "Colorint"], outputCols=["Proline_o", "Colorint_o"])
    
    pipeline = Pipeline(stages=[qds2, assembler, dt])

    dtPipelineModel = pipeline.fit(train)

    # make predictions
    traningPredictionsDF = dtPipelineModel.transform(train)
    testPredictionsDF = dtPipelineModel.transform(test)

    # evaluate the model on test and traning data
    print("    Accuracy on traning data = %g" % evaluator.evaluate(traningPredictionsDF, {evaluator.metricName: "accuracy"}))

    print("    Accuracy on test data = %g" % evaluator.evaluate(testPredictionsDF, {evaluator.metricName: "accuracy"}))

    print("    F1 on traning data = %g" % evaluator.evaluate(traningPredictionsDF, {evaluator.metricName: "f1"}))

    print("    F1 on test data = %g" % evaluator.evaluate(testPredictionsDF, {evaluator.metricName: "f1"}))
    
    print("    \n" + dtPipelineModel.stages[2]._call_java('toDebugString'))



Buckets: 0
    Accuracy on traning data = 0.943089
    Accuracy on test data = 0.872727
    F1 on traning data = 0.942577
    F1 on test data = 0.866274
    
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_5a09292cff8e, depth=2, numNodes=7, numClasses=4, numFeatures=12
  If (feature 0 <= 12.745000000000001)
   If (feature 10 <= 2.035)
    Predict: 3.0
   Else (feature 10 > 2.035)
    Predict: 2.0
  Else (feature 0 > 12.745000000000001)
   If (feature 6 <= 1.8450000000000002)
    Predict: 3.0
   Else (feature 6 > 1.8450000000000002)
    Predict: 1.0



Buckets: 1
    Accuracy on traning data = 0.943089
    Accuracy on test data = 0.872727
    F1 on traning data = 0.942577
    F1 on test data = 0.866274
    
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_5a09292cff8e, depth=2, numNodes=7, numClasses=4, numFeatures=12
  If (feature 0 <= 12.745000000000001)
   If (feature 10 <= 2.035)
    Predict: 3.0
   Else (feature 10 > 2.035)
    Predict: 2.0
  Else (feature 

    Accuracy on traning data = 0.943089
    Accuracy on test data = 0.872727
    F1 on traning data = 0.942577
    F1 on test data = 0.866274
    
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_5a09292cff8e, depth=2, numNodes=7, numClasses=4, numFeatures=12
  If (feature 0 <= 12.745000000000001)
   If (feature 10 <= 2.035)
    Predict: 3.0
   Else (feature 10 > 2.035)
    Predict: 2.0
  Else (feature 0 > 12.745000000000001)
   If (feature 6 <= 1.8450000000000002)
    Predict: 3.0
   Else (feature 6 > 1.8450000000000002)
    Predict: 1.0



Buckets: 16
    Accuracy on traning data = 0.943089
    Accuracy on test data = 0.872727
    F1 on traning data = 0.942577
    F1 on test data = 0.866274
    
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_5a09292cff8e, depth=2, numNodes=7, numClasses=4, numFeatures=12
  If (feature 0 <= 12.745000000000001)
   If (feature 10 <= 2.035)
    Predict: 3.0
   Else (feature 10 > 2.035)
    Predict: 2.0
  Else (feature 0 > 12.74500

    Accuracy on traning data = 0.943089
    Accuracy on test data = 0.872727
    F1 on traning data = 0.942577
    F1 on test data = 0.866274
    
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_5a09292cff8e, depth=2, numNodes=7, numClasses=4, numFeatures=12
  If (feature 0 <= 12.745000000000001)
   If (feature 10 <= 2.035)
    Predict: 3.0
   Else (feature 10 > 2.035)
    Predict: 2.0
  Else (feature 0 > 12.745000000000001)
   If (feature 6 <= 1.8450000000000002)
    Predict: 3.0
   Else (feature 6 > 1.8450000000000002)
    Predict: 1.0



Buckets: 31
    Accuracy on traning data = 0.943089
    Accuracy on test data = 0.872727
    F1 on traning data = 0.942577
    F1 on test data = 0.866274
    
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_5a09292cff8e, depth=2, numNodes=7, numClasses=4, numFeatures=12
  If (feature 0 <= 12.745000000000001)
   If (feature 10 <= 2.035)
    Predict: 3.0
   Else (feature 10 > 2.035)
    Predict: 2.0
  Else (feature 0 > 12.74500

## 4. Text classification (2p.)

### Exercise 4
**TODO:** 
Build a pipeline consisting of Tokenizer, HashingTF, IDF and StringIndexer and LogisticRegression, fit it to training data: 
http://help.sentiment140.com/for-students/

What is the accuracy of this classifier?

In [216]:
file = "text_train.csv" 
text_train = spark.read.format("csv").options(inferSchema="true", header="false").load(file)
text_train.show(10)
print(text_train.dtypes)

file = "text_test.csv"
text_test = spark.read.format("csv").options(inferSchema="true", header="false").load(file)

def translate(mapping):
    def translate_(col):
        return mapping.get(col)
    return udf(translate_, IntegerType())

mapping = {
    0: 0,
    2: 4,
    4: 4
}

text_test = text_test.withColumn("_c0", translate(mapping)("_c0"))
text_test.show(10)
print(text_test.dtypes)

+---+----------+--------------------+--------+---------------+--------------------+
|_c0|       _c1|                 _c2|     _c3|            _c4|                 _c5|
+---+----------+--------------------+--------+---------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|  0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|  0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|  0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|  0|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4Hollywood|@Tatiana_K nop

In [217]:
tokenizer = Tokenizer(inputCol = "_c5", outputCol="words")
htf = HashingTF(inputCol="words", outputCol="tf")
idf = IDF(inputCol="tf", outputCol="tfidf")
si = StringIndexer(inputCol="_c0", outputCol="indexed")
blor = LogisticRegression(featuresCol = "tfidf", labelCol="indexed", predictionCol="prediction")
evaluator = MulticlassClassificationEvaluator(
    labelCol='indexed', predictionCol="prediction")

In [218]:
pipeline = Pipeline(stages=[tokenizer, htf, idf, si, blor])

dtPipelineModel = pipeline.fit(text_train)

# make predictions
traningPredictionsDF = dtPipelineModel.transform(text_train)
testPredictionsDF = dtPipelineModel.transform(text_test)

# evaluate the model on test and traning data
print("Accuracy on traning data = %g" % evaluator.evaluate(traningPredictionsDF, {evaluator.metricName: "accuracy"}))

print("Accuracy on test data = %g" % evaluator.evaluate(testPredictionsDF, {evaluator.metricName: "accuracy"}))

print("F1 on traning data = %g" % evaluator.evaluate(traningPredictionsDF, {evaluator.metricName: "f1"}))

print("F1 on test data = %g" % evaluator.evaluate(testPredictionsDF, {evaluator.metricName: "f1"}))

Accuracy on traning data = 0.858104
Accuracy on test data = 0.706827
F1 on traning data = 0.858103
F1 on test data = 0.709953


In [215]:
traningPredictionsDF.select('_c0').distinct().show()

+---+
|_c0|
+---+
|  4|
|  0|
+---+



In [214]:
testPredictionsDF.select('_c0').distinct().show()

+---+
|_c0|
+---+
|  4|
|  2|
|  0|
+---+

