# MLlib

## Setup

In [1]:
import findspark

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, IDF, QuantileDiscretizer, StringIndexer, Tokenizer, VectorAssembler
from pyspark.ml.stat import Correlation, KolmogorovSmirnovTest

In [2]:
findspark.init()

spark = (
    SparkSession
    .builder
    .getOrCreate()
)

sc = spark.sparkContext
spark

## Dataset statistics

[Titanic dataset](https://www.kaggle.com/c/titanic/data?select=train.csv) will be used.

In [3]:
filename = "titanic_train.csv"
titanic_df = (
    spark.read
    .format("csv")
    .options(inferSchema="true", header="true")
    .load(filename)
)
titanic_df = titanic_df.dropna(how="any")

titanic_df.show(10)
print(titanic_df.dtypes)

+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|   Fare|      Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----------+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|PC 17599|71.2833|        C85|       C|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|  113803|   53.1|       C123|       S|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|   17463|51.8625|        E46|       S|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1| PP 9549|   16.7|         G6|       S|
|         12|       1|     1|Bonnell, Miss. El...|female|58.0|    0|    0|  113783|  26.55|       C103|       S|
|         22|       1|     2|Beesley, Mr. Lawr...|  male|34.0|    0|    0|  248698|   13.0|     

### Descriptive statistics

Calculate descriptive statistics for "Age" and "Fare" features.

In [4]:
titanic_df.describe("Age").show()

+-------+------------------+
|summary|               Age|
+-------+------------------+
|  count|               183|
|   mean|  35.6744262295082|
| stddev|15.643865966849717|
|    min|              0.92|
|    max|              80.0|
+-------+------------------+



In [5]:
titanic_df.describe("Fare").show()

+-------+-----------------+
|summary|             Fare|
+-------+-----------------+
|  count|              183|
|   mean|78.68246885245901|
| stddev|76.34784270040569|
|    min|              0.0|
|    max|         512.3292|
+-------+-----------------+



### Normality test

Check if "Age" and "Fare" have normal distribution, using Kolmogorov-Smirnov test.

In [6]:
KolmogorovSmirnovTest.test(titanic_df, "Age", "norm", 0.0, 1.0).first()

Row(pValue=1.943689653671754e-11, statistic=0.9713276975967852)

In [7]:
KolmogorovSmirnovTest.test(titanic_df, "Fare", "norm", 0.0, 1.0).first()

Row(pValue=8.816725127758218e-12, statistic=0.9890707515997943)

p-values are very small, essentialy near zero, so for both variables we reject the null hypothesis, i.e. they have non-normal distribution.

### Correlations

Calculate Pearson correlation between pairs of features:
- "Age" and "Survived"
- "Sex" and "Survived" (remember to encode "Sex" attribute as 0/1 values)

Which correlation is stronger?

In [23]:
@F.udf(returnType=T.IntegerType())
def sex_to_integers(sex: str) -> int:
    return int(sex == "male")  # male - 1, female - 0


In [24]:
titanic_df_encoded = (
    titanic_df
    .select("Age", "Sex", "Survived")
    .withColumn("Sex", sex_to_integers(F.col("Sex")))
)

titanic_df_encoded.show(5)

+----+---+--------+
| Age|Sex|Survived|
+----+---+--------+
|38.0|  0|       1|
|35.0|  0|       1|
|54.0|  1|       0|
| 4.0|  0|       1|
|58.0|  0|       1|
+----+---+--------+
only showing top 5 rows



In [25]:
titanic_df_encoded.corr("Age", "Survived", method="pearson")

-0.2540847542030532

In [26]:
titanic_df_encoded.corr("Sex", "Survived", method="pearson")

-0.5324179744538412

Correlation between "Sex" and "Survived" is stronger (larger absolute value), which is expected, as more women survived the Titanic sinking.

## Wine classification

## Data loading from nonstandard formats

Load [Wine dataset](https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/wine.scale) ([UCI description](http://archive.ics.uci.edu/ml/datasets/Wine)), which is in the LibSVM (.scala) format.

In [49]:
filename = "wine.scala"

wine_df = (
    spark.read
    .format("libsvm")
    .option("numFeatures", "13").load(filename)
)

wine_df.show(10)
wine_df.take(1)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
|  1.0|(13,[0,1,2,3,4,5,...|
+-----+--------------------+
only showing top 10 rows



[Row(label=1.0, features=SparseVector(13, {0: 0.6842, 1: -0.6166, 2: 0.1444, 3: -0.4845, 4: 0.2391, 5: 0.2552, 6: 0.1477, 7: -0.434, 8: 0.1861, 9: -0.256, 10: -0.0894, 11: 0.9414, 12: 0.1227}))]

## Classification

### Simple pipeline

Load [pre-formatted Wine dataset](https://gist.githubusercontent.com/tijptjik/9408623/raw/b237fa5848349a14a14e5d4107dc7897c21951f5/wine.csv) ([UCI description](http://archive.ics.uci.edu/ml/datasets/Wine)). Remember about deleting dots from the headers of the CSV file and splitting data into train and test set.

Create a classification pipeline:
1. Create pipeline with `VectorAssembler` and `DecisionTreeClassifier`.
2. Use the pipeline to make predictions.
3. Evaluate predictions using `MulticlassClassificationEvaluator`.
4. Calculate accuracy and test error
5. Print the structure of the trained decision tree, using the `toDebugString`


In [50]:
filename = "wine.csv"

wine_df = (
    spark.read
    .format("csv")
    .options(inferSchema="true", header="true")
    .load("wine.csv")
    .withColumnRenamed("Malic.acid", "Malic_acid")
    .withColumnRenamed("Nonflavanoid.phenols", "Nonflavanoid_phenols")
    .withColumnRenamed("Color.int", "Color_int")
)

train_df, test_df = wine_df.randomSplit([0.8, 0.2], seed=0)

wine_df.show(5)
print(wine_df.dtypes)

+----+-------+----------+----+----+---+-------+----------+--------------------+-------+---------+----+----+-------+
|Wine|Alcohol|Malic_acid| Ash| Acl| Mg|Phenols|Flavanoids|Nonflavanoid_phenols|Proanth|Color_int| Hue|  OD|Proline|
+----+-------+----------+----+----+---+-------+----------+--------------------+-------+---------+----+----+-------+
|   1|  14.23|      1.71|2.43|15.6|127|    2.8|      3.06|                0.28|   2.29|     5.64|1.04|3.92|   1065|
|   1|   13.2|      1.78|2.14|11.2|100|   2.65|      2.76|                0.26|   1.28|     4.38|1.05| 3.4|   1050|
|   1|  13.16|      2.36|2.67|18.6|101|    2.8|      3.24|                 0.3|   2.81|     5.68|1.03|3.17|   1185|
|   1|  14.37|      1.95| 2.5|16.8|113|   3.85|      3.49|                0.24|   2.18|      7.8|0.86|3.45|   1480|
|   1|  13.24|      2.59|2.87|21.0|118|    2.8|      2.69|                0.39|   1.82|     4.32|1.04|2.93|    735|
+----+-------+----------+----+----+---+-------+----------+--------------

In [51]:
def train_wine_decision_tree(train_df, test_df):
    num_classes = train_df.select("Wine").distinct().count()
    feature_cols = train_df.columns[1:]
    
    assembler = VectorAssembler(
        inputCols=feature_cols, 
        outputCol="features"
    )
    decision_tree = DecisionTreeClassifier(
        labelCol="Wine", 
        featuresCol="features"
    )
    pipeline = Pipeline(stages=[assembler, decision_tree]) 

    model = pipeline.fit(train_df)
    predictions = model.transform(test_df)

    evaluator = MulticlassClassificationEvaluator(
        labelCol="Wine", 
        predictionCol="prediction", 
        metricName="accuracy"
    )
    accuracy = evaluator.evaluate(predictions) * 100
    
    return model, accuracy


In [52]:
model, accuracy = train_wine_decision_tree(train_df, test_df)

tree_model = model.stages[1]
print(tree_model.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_e8a5b53a7692, depth=5, numNodes=19, numClasses=4, numFeatures=13
  If (feature 12 <= 755.0)
   If (feature 6 <= 1.385)
    If (feature 9 <= 3.77)
     Predict: 2.0
    Else (feature 9 > 3.77)
     Predict: 3.0
   Else (feature 6 > 1.385)
    If (feature 0 <= 13.135)
     Predict: 2.0
    Else (feature 0 > 13.135)
     If (feature 1 <= 1.6749999999999998)
      Predict: 2.0
     Else (feature 1 > 1.6749999999999998)
      If (feature 0 <= 13.285)
       Predict: 1.0
      Else (feature 0 > 13.285)
       Predict: 3.0
  Else (feature 12 > 755.0)
   If (feature 5 <= 1.6150000000000002)
    If (feature 1 <= 1.62)
     Predict: 2.0
    Else (feature 1 > 1.62)
     Predict: 3.0
   Else (feature 5 > 1.6150000000000002)
    If (feature 0 <= 11.98)
     Predict: 2.0
    Else (feature 0 > 11.98)
     Predict: 1.0



### Additional experiments

1. Extend the pipeline from the previous task with `QuantileDiscretizer`
2. Try using a few different numbers of buckets, which configuration gives the best results?
3. Can you see any difference in the structure of the decision tree?

In [55]:
def train_binned_wine_decision_tree(train_df, test_df, num_buckets: int):
    num_classes = train_df.select("Wine").distinct().count()
    feature_cols = train_df.columns[1:]
    discretized_cols = [f"{col}_disc" for col in train_df.columns[1:]]
    
    discretizer = QuantileDiscretizer(
        inputCols=feature_cols,
        outputCols=discretized_cols,
        numBuckets=num_buckets
    )
    assembler = VectorAssembler(
        inputCols=discretized_cols, 
        outputCol="features"
    )
    decision_tree = DecisionTreeClassifier(
        labelCol="Wine", 
        featuresCol="features",
        
    )
    pipeline = Pipeline(stages=[discretizer, assembler, decision_tree]) 

    model = pipeline.fit(train_df)
    predictions = model.transform(test_df)

    evaluator = MulticlassClassificationEvaluator(
        labelCol="Wine", 
        predictionCol="prediction", 
        metricName="accuracy"
    )
    accuracy = evaluator.evaluate(predictions) * 100
    
    return model, accuracy


In [56]:
for num_bins in range(2, 6):
    print("Bins:", num_bins)
    model, accuracy = train_binned_wine_decision_tree(train_df, test_df, num_bins)
    print(f"Accuracy: {accuracy:.2f}")

    tree_model = model.stages[2]
    print(tree_model.toDebugString)
    print()

Bins: 2
Accuracy: 88.89
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_7386f045b0d4, depth=5, numNodes=21, numClasses=4, numFeatures=13
  If (feature 6 in {0.0})
   If (feature 9 in {0.0})
    If (feature 10 in {0.0})
     If (feature 8 in {0.0})
      If (feature 1 in {0.0})
       Predict: 2.0
      Else (feature 1 not in {0.0})
       Predict: 3.0
     Else (feature 8 not in {0.0})
      Predict: 2.0
    Else (feature 10 not in {0.0})
     Predict: 2.0
   Else (feature 9 not in {0.0})
    If (feature 10 in {0.0})
     Predict: 3.0
    Else (feature 10 not in {0.0})
     Predict: 2.0
  Else (feature 6 not in {0.0})
   If (feature 12 in {0.0})
    Predict: 2.0
   Else (feature 12 not in {0.0})
    If (feature 0 in {0.0})
     If (feature 2 in {0.0})
      Predict: 2.0
     Else (feature 2 not in {0.0})
      If (feature 3 in {0.0})
       Predict: 1.0
      Else (feature 3 not in {0.0})
       Predict: 2.0
    Else (feature 0 not in {0.0})
     Predict: 1.0


Bins: 3
Accu

Accuracy changes quite a bit depending on number of bins, from 89% to 97%. Highest one is achieved with 3 bins.

Tree structure also changes, in terms of both number of bins used for splits and depth of the trees.

## Text classification

Build a pipeline consisting of `Tokenizer`, `HashingTF`, `IDF`, `StringIndexer` and `LogisticRegression`. Use [the Sentiment 140 dataset](http://help.sentiment140.com/for-students/).

What is the accuracy of this classifier?

In [57]:
columns = ["label", "id", "date", "query", "user", "text"]

train_df = (
    spark.read
    .format("csv")
    .options(inferSchema="true", header="false")
    .load("sentiment_train.csv")
)
for old, new in zip(train_df.columns, columns):
    train_df = train_df.withColumnRenamed(old, new)


test_df = (
    spark.read
    .format("csv")
    .options(inferSchema="true", header="false")
    .load("sentiment_test.csv")
)
for old, new in zip(test_df.columns, columns):
    test_df = test_df.withColumnRenamed(old, new)

    
train_df = train_df.select("label", "text")
test_df = test_df.select("label", "text")

train_df.show(5)
print(train_df.dtypes)

+-----+--------------------+
|label|                text|
+-----+--------------------+
|    0|@switchfoot http:...|
|    0|is upset that he ...|
|    0|@Kenichan I dived...|
|    0|my whole body fee...|
|    0|@nationwideclass ...|
+-----+--------------------+
only showing top 5 rows

[('label', 'int'), ('text', 'string')]


Since there is no class 2 (neutral sentiment) in the training set, I remove it from the test set with `handleInvalid="skip"`.

In [62]:
tokenizer = Tokenizer(
    inputCol="text", 
    outputCol="tokens"
)
hashing_tf = HashingTF(
    inputCol="tokens", 
    outputCol="features", 
    numFeatures=50
)
idf = IDF(
    inputCol="features", 
    outputCol="final_features"
)
string_indexer = StringIndexer(
    inputCol="label", 
    outputCol="final_label",
    handleInvalid="skip"
)
classifier = LogisticRegression(
    featuresCol="final_features", 
    labelCol="final_label", 
    predictionCol="prediction"
)

pipeline = Pipeline(stages=[tokenizer, hashing_tf, idf, string_indexer, classifier])

In [63]:
model = pipeline.fit(train_df)
predictions = model.transform(test_df)

evaluator = MulticlassClassificationEvaluator(
    labelCol="final_label", 
    predictionCol="prediction", 
    metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions) * 100

print(f"Accuracy: {accuracy:.2f}")

Accuracy: 53.20
