# Spark MLlib Exercises


http://spark.apache.org/docs/latest/ml-statistics.html

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 46 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 60.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=5871b092e81c2e0b805cee905f6e2fa0e70a223e2704ef9fd1c8f5fd9c38a755
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, IDF, QuantileDiscretizer, StringIndexer, Tokenizer, VectorAssembler
from pyspark.ml.stat import Correlation, KolmogorovSmirnovTest

spark = SparkSession.builder.getOrCreate()

In [3]:
spark

## 1. Statistics (1p.)

Download the following dataset: https://www.kaggle.com/c/titanic/data?select=train.csv

In [7]:
file = "titanic_train.csv"
titanic_df = spark.read.format("csv").options(inferSchema="true", header="true").load(file)
titanic_df = titanic_df.dropna(how='any')
titanic_df.show(10)
print(titanic_df.dtypes)

+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|   Fare|      Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----------+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|PC 17599|71.2833|        C85|       C|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|  113803|   53.1|       C123|       S|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|   17463|51.8625|        E46|       S|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1| PP 9549|   16.7|         G6|       S|
|         12|       1|     1|Bonnell, Miss. El...|female|58.0|    0|    0|  113783|  26.55|       C103|       S|
|         22|       1|     2|Beesley, Mr. Lawr...|  male|34.0|    0|    0|  248698|   13.0|     

### Exercise 1.A.
**TODO:** Calculate descriptive statistics for 'Age' and 'Fare' (see https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/sql/DataFrame.html#describe(scala.collection.Seq))

In [9]:
titanic_df.describe("age", "Fare").show()


+-------+------------------+-----------------+
|summary|               age|             Fare|
+-------+------------------+-----------------+
|  count|               183|              183|
|   mean|  35.6744262295082|78.68246885245901|
| stddev|15.643865966849717|76.34784270040569|
|    min|              0.92|              0.0|
|    max|              80.0|         512.3292|
+-------+------------------+-----------------+



### Exercise 1.B.

**TODO:** Check if 'Age' and 'Fare' have normal distribution (see http://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/stat/KolmogorovSmirnovTest.html)

In [17]:
ksResult = KolmogorovSmirnovTest.test(titanic_df, 'Age', 'norm', 0.0, 1.0).first()
pValue = ksResult.pValue
if pValue < 0.05:
  print(f'reject the null hypothesis that the sample comes from a normal distribution as pValue equals {pValue}')
else:
  print('fail to reject the null hypothesis')

reject the null hypothesis that the sample comes from a normal distribution as pValue equals 1.943689653671754e-11


In [18]:
ksResult = KolmogorovSmirnovTest.test(titanic_df, 'Fare', 'norm', 0.0, 1.0).first()
pValue = ksResult.pValue
if pValue < 0.05:
  print(f'reject the null hypothesis that the sample comes from a normal distribution as pValue equals {pValue}')
else:
  print('fail to reject the null hypothesis')

reject the null hypothesis that the sample comes from a normal distribution as pValue equals 8.816725127758218e-12


### Exercise 1.C.

**TODO:** Calculate Pearson correlation between the following pairs of features:  
* 'Age' and 'Survived'
* 'Sex' and 'Survived' *(remember about encoding 'Sex' attributes as 0s and 1s)*

Which correlation is stronger?

In [21]:
titanic_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+-----------+--------+-----------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|     Ticket|    Fare|      Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+-----------+--------+-----------+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|   PC 17599| 71.2833|        C85|       C|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|     113803|    53.1|       C123|       S|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|      17463| 51.8625|        E46|       S|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1|    PP 9549|    16.7|         G6|       S|
|         12|       1|     1|Bonnell, Miss. El...|female|58.0|    0|    0|     113783|   26.55|       C103|       S|
|         22|       1|     2|Beesley, Mr. Lawr...|  male|34.0|  

In [28]:
feature1 = titanic_df.select("Age").rdd.map(lambda row: row[0])
feature2 = titanic_df.select("Survived").rdd.map(lambda row: row[0])

pearsonCorrelation = Statistics.corr(feature1, feature2, method="pearson")

print(f'We can assume that the correlation between Age and survived is weak as it is equal to {pearsonCorrelation}')

We can assume that the correlation between Age and survived is weak as it is equal to -0.25408475420305326


In [29]:
from pyspark.sql.functions import udf

def mapSexToInt(x):
  if x == "female":
    return 1
  elif x == "male":
    return 0
  else:
    return None

mapSexToIntUDF = udf(mapSexToInt)

mappedDF = titanic_df.withColumn("Sex_int", mapSexToIntUDF(titanic_df.Sex))

In [34]:
mappedDF.show()

+-----------+--------+------+--------------------+------+----+-----+-----+-----------+--------+-----------+--------+-------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|     Ticket|    Fare|      Cabin|Embarked|Sex_int|
+-----------+--------+------+--------------------+------+----+-----+-----+-----------+--------+-----------+--------+-------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|   PC 17599| 71.2833|        C85|       C|      1|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|     113803|    53.1|       C123|       S|      1|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|      17463| 51.8625|        E46|       S|      0|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1|    PP 9549|    16.7|         G6|       S|      1|
|         12|       1|     1|Bonnell, Miss. El...|female|58.0|    0|    0|     113783|   26.55|       C103|       S|      1|


In [37]:
feature1 = mappedDF.select("Sex_int").rdd.map(lambda row: row[0])
feature2 = mappedDF.select("Survived").rdd.map(lambda row: row[0])

pearsonCorrelation = Statistics.corr(feature1, feature2, method="pearson")

print(f'We can assume that the correlation between Sex and survived is present. The correlation indicates on positive linear relationship between the two variables as it is equal to {pearsonCorrelation}')

We can assume that the correlation between Sex and survived is present. The correlation indicates on positive linear relationship between the two variables as it is equal to 0.5324179744538422


## 2. Loading data

Doc: http://spark.apache.org/docs/latest/ml-datasource.html 

Download data from https://github.com/apache/spark/blob/master/data/mllib/sample_libsvm_data.txt and load as DataFrame. 

In [38]:
file = "sample_libsvm_data.txt"

df = spark.read.format("libsvm").option("numFeatures", "780").load(file)
df.show(10)
df.take(1)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(780,[127,128,129...|
|  1.0|(780,[158,159,160...|
|  1.0|(780,[124,125,126...|
|  1.0|(780,[152,153,154...|
|  1.0|(780,[151,152,153...|
|  0.0|(780,[129,130,131...|
|  1.0|(780,[158,159,160...|
|  1.0|(780,[99,100,101,...|
|  0.0|(780,[154,155,156...|
|  0.0|(780,[127,128,129...|
+-----+--------------------+
only showing top 10 rows



[Row(label=0.0, features=SparseVector(780, {127: 51.0, 128: 159.0, 129: 253.0, 130: 159.0, 131: 50.0, 154: 48.0, 155: 238.0, 156: 252.0, 157: 252.0, 158: 252.0, 159: 237.0, 181: 54.0, 182: 227.0, 183: 253.0, 184: 252.0, 185: 239.0, 186: 233.0, 187: 252.0, 188: 57.0, 189: 6.0, 207: 10.0, 208: 60.0, 209: 224.0, 210: 252.0, 211: 253.0, 212: 252.0, 213: 202.0, 214: 84.0, 215: 252.0, 216: 253.0, 217: 122.0, 235: 163.0, 236: 252.0, 237: 252.0, 238: 252.0, 239: 253.0, 240: 252.0, 241: 252.0, 242: 96.0, 243: 189.0, 244: 253.0, 245: 167.0, 262: 51.0, 263: 238.0, 264: 253.0, 265: 253.0, 266: 190.0, 267: 114.0, 268: 253.0, 269: 228.0, 270: 47.0, 271: 79.0, 272: 255.0, 273: 168.0, 289: 48.0, 290: 238.0, 291: 252.0, 292: 252.0, 293: 179.0, 294: 12.0, 295: 75.0, 296: 121.0, 297: 21.0, 300: 253.0, 301: 243.0, 302: 50.0, 316: 38.0, 317: 165.0, 318: 253.0, 319: 233.0, 320: 208.0, 321: 84.0, 328: 253.0, 329: 252.0, 330: 165.0, 343: 7.0, 344: 178.0, 345: 252.0, 346: 240.0, 347: 71.0, 348: 19.0, 349: 28.0

### Exercise 2.A
**TODO:** Load wine data from https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/wine.scale
Dataset description: http://archive.ics.uci.edu/ml/datasets/Wine

In [13]:
file = "wine.data"


df = spark.read.format("csv").load(file)
df.show()

+---+-----+----+----+----+---+----+----+---+----+----+----+----+----+
|_c0|  _c1| _c2| _c3| _c4|_c5| _c6| _c7|_c8| _c9|_c10|_c11|_c12|_c13|
+---+-----+----+----+----+---+----+----+---+----+----+----+----+----+
|  1|14.23|1.71|2.43|15.6|127| 2.8|3.06|.28|2.29|5.64|1.04|3.92|1065|
|  1| 13.2|1.78|2.14|11.2|100|2.65|2.76|.26|1.28|4.38|1.05| 3.4|1050|
|  1|13.16|2.36|2.67|18.6|101| 2.8|3.24| .3|2.81|5.68|1.03|3.17|1185|
|  1|14.37|1.95| 2.5|16.8|113|3.85|3.49|.24|2.18| 7.8| .86|3.45|1480|
|  1|13.24|2.59|2.87|  21|118| 2.8|2.69|.39|1.82|4.32|1.04|2.93| 735|
|  1| 14.2|1.76|2.45|15.2|112|3.27|3.39|.34|1.97|6.75|1.05|2.85|1450|
|  1|14.39|1.87|2.45|14.6| 96| 2.5|2.52| .3|1.98|5.25|1.02|3.58|1290|
|  1|14.06|2.15|2.61|17.6|121| 2.6|2.51|.31|1.25|5.05|1.06|3.58|1295|
|  1|14.83|1.64|2.17|  14| 97| 2.8|2.98|.29|1.98| 5.2|1.08|2.85|1045|
|  1|13.86|1.35|2.27|  16| 98|2.98|3.15|.22|1.85|7.22|1.01|3.55|1045|
|  1| 14.1|2.16| 2.3|  18|105|2.95|3.32|.22|2.38|5.75|1.25|3.17|1510|
|  1|14.12|1.48|2.32

## 3. Classification (2p.)

In [24]:
file = "wine.csv" # https://gist.githubusercontent.com/tijptjik/9408623/raw/b237fa5848349a14a14e5d4107dc7897c21951f5/wine.csv

# Remember about deleting dots from the headers of this csv file!
winedf2 = spark.read.format("csv").options(inferSchema="true", header="true").load(file)
winedf2.show(10)
print(winedf2.dtypes)

+----+-------+---------+----+----+---+-------+----------+-------------------+-------+--------+----+----+-------+
|Wine|Alcohol|Malicacid| Ash| Acl| Mg|Phenols|Flavanoids|Nonflavanoidphenols|Proanth|Colorint| Hue|  OD|Proline|
+----+-------+---------+----+----+---+-------+----------+-------------------+-------+--------+----+----+-------+
|   1|  14.23|     1.71|2.43|15.6|127|    2.8|      3.06|               0.28|   2.29|    5.64|1.04|3.92|   1065|
|   1|   13.2|     1.78|2.14|11.2|100|   2.65|      2.76|               0.26|   1.28|    4.38|1.05| 3.4|   1050|
|   1|  13.16|     2.36|2.67|18.6|101|    2.8|      3.24|                0.3|   2.81|    5.68|1.03|3.17|   1185|
|   1|  14.37|     1.95| 2.5|16.8|113|   3.85|      3.49|               0.24|   2.18|     7.8|0.86|3.45|   1480|
|   1|  13.24|     2.59|2.87|21.0|118|    2.8|      2.69|               0.39|   1.82|    4.32|1.04|2.93|    735|
|   1|   14.2|     1.76|2.45|15.2|112|   3.27|      3.39|               0.34|   1.97|    6.75|1.

### Exercise 3.A
**TODO:** 

Remember about deleting dots from the headers of this csv file and splitting data into train and test set


1) Create pipeline with VectorAssembler and DecisionTreeClassifier.

2) Use the pipeline to make predictions.

3) Evaluate predictions using MulticlassClassificationEvaluator.

4) Calculate accuracy and test error

5) Print the structure of the trained decision tree (hint: use toDebugString attribute)

In [33]:
#splitting data to train and test
train_df, test_df = winedf2.randomSplit([0.7, 0.3], seed=420)


# Define the input columns for the VectorAssembler
input_cols = train_df.columns[1:]

# Create the VectorAssembler
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")

# Create the DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="Wine", featuresCol="features")

# Define the stages of the pipeline
stages = [assembler, dt]

# Create the pipeline
pipeline = Pipeline(stages=stages)


In [34]:
train_df.show()

+----+-------+---------+----+----+---+-------+----------+-------------------+-------+--------+----+----+-------+
|Wine|Alcohol|Malicacid| Ash| Acl| Mg|Phenols|Flavanoids|Nonflavanoidphenols|Proanth|Colorint| Hue|  OD|Proline|
+----+-------+---------+----+----+---+-------+----------+-------------------+-------+--------+----+----+-------+
|   1|  12.85|      1.6|2.52|17.8| 95|   2.48|      2.37|               0.26|   1.46|    3.93|1.09|3.63|   1015|
|   1|  12.93|      3.8|2.65|18.6|102|   2.41|      2.41|               0.25|   1.98|     4.5|1.03|3.52|    770|
|   1|  13.05|     1.73|2.04|12.4| 92|   2.72|      3.27|               0.17|   2.91|     7.2|1.12|2.91|   1150|
|   1|  13.05|     1.77| 2.1|17.0|107|    3.0|       3.0|               0.28|   2.03|    5.04|0.88|3.35|    885|
|   1|  13.05|     2.05|3.22|25.0|124|   2.63|      2.68|               0.47|   1.92|    3.58|1.13| 3.2|    830|
|   1|  13.16|     2.36|2.67|18.6|101|    2.8|      3.24|                0.3|   2.81|    5.68|1.

In [35]:
# Fit the pipeline to the training data
model = pipeline.fit(train_df)
# Apply the pipeline to the test data
predictions = model.transform(test_df)

In [36]:

evaluator = MulticlassClassificationEvaluator(
        labelCol="Wine", 
        predictionCol="prediction", 
        metricName="accuracy"
    )
accuracy = evaluator.evaluate(predictions)

In [37]:
model.stages[1]

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_5ba420a42947, depth=4, numNodes=19, numClasses=4, numFeatures=13

In [40]:
print(model.stages[1].toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_5ba420a42947, depth=4, numNodes=19, numClasses=4, numFeatures=13
  If (feature 0 <= 12.745000000000001)
   If (feature 6 <= 0.875)
    If (feature 1 <= 0.985)
     Predict: 2.0
    Else (feature 1 > 0.985)
     Predict: 3.0
   Else (feature 6 > 0.875)
    If (feature 11 <= 1.3450000000000002)
     Predict: 3.0
    Else (feature 11 > 1.3450000000000002)
     Predict: 2.0
  Else (feature 0 > 12.745000000000001)
   If (feature 5 <= 2.1500000000000004)
    If (feature 6 <= 1.585)
     If (feature 2 <= 1.99)
      Predict: 2.0
     Else (feature 2 > 1.99)
      Predict: 3.0
    Else (feature 6 > 1.585)
     Predict: 2.0
   Else (feature 5 > 2.1500000000000004)
    If (feature 1 <= 1.0950000000000002)
     Predict: 2.0
    Else (feature 1 > 1.0950000000000002)
     If (feature 3 <= 25.5)
      Predict: 1.0
     Else (feature 3 > 25.5)
      Predict: 2.0



In [41]:
print(accuracy)

0.8813559322033898


### Exercise 3.B
**TODO:** 

1) Extend the pipeline from the previos task with QuantileDiscretizer 

2) Try using a couple of different numbers of buckets, which cinfiguration gives the best results?

3) Can you see any difference in the structure of the decistion tree?

In [44]:
discretized_cols = [f"{col}_disc" for col in train_df.columns[1:]]
                    
for num in range(2,6):
  discretizer = QuantileDiscretizer(
          inputCols=input_cols,
          outputCols=discretized_cols,
          numBuckets=num)
  assembler = VectorAssembler(
        inputCols=discretized_cols, 
        outputCol="features"
    )
  decision_tree = DecisionTreeClassifier(
        labelCol="Wine", 
        featuresCol="features",
        
    )
  pipeline = Pipeline(stages=[discretizer, assembler, decision_tree]) 

  model = pipeline.fit(train_df)
  predictions = model.transform(test_df)

  evaluator = MulticlassClassificationEvaluator(
        labelCol="Wine", 
        predictionCol="prediction", 
        metricName="accuracy"
    )
  accuracy = evaluator.evaluate(predictions)

  print(f'number of bins = {num}')
  print(f"Accuracy: {accuracy:.2f}")

  tree_model = model.stages[2]
  print(tree_model.toDebugString)

number of bins = 2
Accuracy: 0.88
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_483a20afef27, depth=5, numNodes=25, numClasses=4, numFeatures=13
  If (feature 0 in {0.0})
   If (feature 9 in {0.0})
    If (feature 12 in {0.0})
     Predict: 2.0
    Else (feature 12 not in {0.0})
     If (feature 2 in {0.0})
      Predict: 2.0
     Else (feature 2 not in {0.0})
      If (feature 7 in {0.0})
       Predict: 1.0
      Else (feature 7 not in {0.0})
       Predict: 2.0
   Else (feature 9 not in {0.0})
    If (feature 5 in {0.0})
     If (feature 10 in {0.0})
      Predict: 3.0
     Else (feature 10 not in {0.0})
      Predict: 2.0
    Else (feature 5 not in {0.0})
     Predict: 2.0
  Else (feature 0 not in {0.0})
   If (feature 5 in {0.0})
    If (feature 1 in {0.0})
     Predict: 2.0
    Else (feature 1 not in {0.0})
     If (feature 8 in {0.0})
      Predict: 3.0
     Else (feature 8 not in {0.0})
      If (feature 2 in {0.0})
       Predict: 2.0
      Else (feature 2 not in

The best results are achieved by the method with 4 bins where accuracy is equal to 92%.

The number of bins has an impact on the structure of the tree. The results vary in accuracy and the depth of the trees.

## 4. Text classification (2p.)

### Exercise 4
**TODO:** 
Build a pipeline consisting of Tokenizer, HashingTF, IDF and StringIndexer and LogisticRegression, fit it to training data: 
http://help.sentiment140.com/for-students/

What is the accuracy of this classifier?

In [None]:
train_data = "train.csv"
test_data = "test.csv"

traindf = spark.read.format("csv").options(inferSchema="true", header='false').load(train_data)


In [5]:
traindf.show()

+---+----------+--------------------+--------+---------------+--------------------+
|_c0|       _c1|                 _c2|     _c3|            _c4|                 _c5|
+---+----------+--------------------+--------+---------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|  0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|  0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|  0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|  0|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4Hollywood|@Tatiana_K nop

In [12]:
testdf = spark.read.format("csv").options(inferSchema="true", header='false').load(test_data)

columns = ["label", "id", "date", "query", "user", "text"]

for old, new in zip(traindf.columns, columns):
    traindf = traindf.withColumnRenamed(old, new)
    
for old, new in zip(testdf.columns, columns):
    testdf = testdf.withColumnRenamed(old, new)


In [13]:
train_df = traindf.select("label", "text")
test_df = testdf.select("label", "text")

In [14]:
train_df.show()

+-----+--------------------+
|label|                text|
+-----+--------------------+
|    0|@switchfoot http:...|
|    0|is upset that he ...|
|    0|@Kenichan I dived...|
|    0|my whole body fee...|
|    0|@nationwideclass ...|
|    0|@Kwesidei not the...|
|    0|         Need a hug |
|    0|@LOLTrish hey  lo...|
|    0|@Tatiana_K nope t...|
|    0|@twittera que me ...|
|    0|spring break in p...|
|    0|I just re-pierced...|
|    0|@caregiving I cou...|
|    0|@octolinz16 It it...|
|    0|@smarrison i woul...|
|    0|@iamjazzyfizzle I...|
|    0|Hollis' death sce...|
|    0|about to file taxes |
|    0|@LettyA ahh ive a...|
|    0|@FakerPattyPattz ...|
+-----+--------------------+
only showing top 20 rows



In [9]:
tokenizer = Tokenizer(
    inputCol="text", 
    outputCol="tokens"
)
hashing_tf = HashingTF(
    inputCol="tokens", 
    outputCol="features", 
    numFeatures=50
)
idf = IDF(
    inputCol="features", 
    outputCol="final_features"
)
string_indexer = StringIndexer(
    inputCol="label", 
    outputCol="final_label",
    handleInvalid="skip"
)
classifier = LogisticRegression(
    featuresCol="final_features", 
    labelCol="final_label", 
    predictionCol="prediction"
)

pipeline = Pipeline(stages=[tokenizer, hashing_tf, idf, string_indexer, classifier])

In [15]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

model = pipeline.fit(traindf)
predictions = model.transform(testdf)

evaluator = MulticlassClassificationEvaluator(
    labelCol="final_label", 
    predictionCol="prediction", 
    metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions) * 100

print(f"Accuracy: {accuracy:.2f}")


Accuracy: 53.20
