### 1. Задача регрессии: LinearRegression
### 2. Задача бинарной классификации: RandomForest

In [1]:
# The Jupyter Notebook and dataset were downloaded from https://www.kaggle.com/code/tylerx/machine-learning-with-spark

# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load in 

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the "../input/" directory.
# For example, running this (by clicking run or pressing Shift+Enter) will list the files in the input directory

import os

# Any results you write to the current directory are saved as output.

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler,  StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator



spark = SparkSession.builder.master("local[*]").getOrCreate()
df_full = spark.read.parquet("data/df.parquet")
df_full.show(10)

+------------------+----------+--------------+---------------+--------------+----------+-----------+-----------+----------+-------------+-------+--------------+----------+
|           tweetid|    userid|follower_count|following_count|tweet_language|is_retweet|quote_count|reply_count|like_count|retweet_count|is_like|is_en_language|is_popular|
+------------------+----------+--------------+---------------+--------------+----------+-----------+-----------+----------+-------------+-------+--------------+----------+
|685133352202993664| 992417071|         11555|          10348|            en|     false|          0|          0|         0|            0|  false|          true|     false|
|611013594306605057|2553888938|         17709|          16095|            in|     false|          0|          1|         0|            0|  false|         false|      true|
|718813673154719744|1261250516|         19393|          14704|            in|     false|          0|          1|         0|            0|  f

### Задача регрессии

In [2]:
df = df_full.select("following_count", "follower_count", "like_count", "retweet_count")
df.show(10)
# Распределение на обучение
splits = df.randomSplit([0.8, 0.2])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

+---------------+--------------+----------+-------------+
|following_count|follower_count|like_count|retweet_count|
+---------------+--------------+----------+-------------+
|          10348|         11555|         0|            0|
|          16095|         17709|         0|            0|
|          14704|         19393|         0|            0|
|          12638|         12756|         0|            0|
|           5848|          8651|         0|            0|
|          23684|         24655|         0|            0|
|           7494|          9873|         0|            0|
|          13520|         23420|         0|            1|
|          10031|         12326|         0|            0|
|          22187|         24438|         0|            0|
+---------------+--------------+----------+-------------+
only showing top 10 rows

Training Rows: 2673613  Testing Rows: 668308


In [4]:
numVect = VectorAssembler(inputCols = ["like_count", "following_count", "retweet_count"], outputCol="unscaled_features")
scaler = MinMaxScaler(inputCol="unscaled_features", outputCol="features")
lr = LinearRegression(featuresCol="features", labelCol="follower_count")
pipeline = Pipeline(stages=[numVect, scaler, lr])
model = pipeline.fit(train)
predictions = model.transform(test)

In [5]:
rmse_evaluator = RegressionEvaluator(labelCol="follower_count", predictionCol="prediction", metricName="rmse")
mae_evaluator = RegressionEvaluator(labelCol="follower_count", predictionCol="prediction", metricName="mae")
mse_evaluator = RegressionEvaluator(labelCol="follower_count", predictionCol="prediction", metricName="mse")
r2_evaluator = RegressionEvaluator(labelCol="follower_count", predictionCol="prediction", metricName="r2")

rmse = rmse_evaluator.evaluate(predictions)
mse = mse_evaluator.evaluate(predictions)
mae = mae_evaluator.evaluate(predictions)
r2 = r2_evaluator.evaluate(predictions)

print(f"RMSE: {rmse:0.3f}" )
print(f"MSE: {mse:0.3f}" )
print(f"MAE: {mae:0.3f}" )
print(f"r2: {r2:0.3f}" )

RMSE: 3554.286
MSE: 12632950.178
MAE: 2754.649
r2: 0.544


#### MSE: Средний квадрат разности предсказанного и фактического значения  
#### RMSE: sqrt из MSE  
#### MAE: Среднее абсолютное значение разностей между предсказанным и фактическим значением
#### R2:  Коэффициент детерминации 

In [6]:
predicted = predictions.select("features", "prediction", "follower_count")
predicted.show(20)

+--------------------+------------------+--------------+
|            features|        prediction|follower_count|
+--------------------+------------------+--------------+
|[0.0,0.0013686498...|6954.0462045600325|          5841|
|[0.0,0.0016504307...| 6959.181327872822|         21472|
|[0.0,0.0037034055...| 6996.594369151717|          5758|
|[0.0,0.0037436599...| 6997.327958196402|         15404|
|[0.0,0.0037436599...| 6997.327958196402|         15404|
|[0.0,0.0037436599...| 6997.327958196402|         15404|
|[0.0,0.0037436599...| 6997.327958196402|         15404|
|[0.0,0.0037436599...| 6997.327958196402|         15404|
|[0.0,0.0037436599...| 6997.327958196402|         15404|
|[0.0,0.0037436599...| 6997.327958196402|         15404|
|[0.0,0.0037436599...| 6997.327958196402|         15404|
|[0.0,0.0037436599...| 6997.327958196402|         15404|
|[0.0,0.0037436599...| 6997.327958196402|         15404|
|[0.0,0.0037436599...| 6997.327958196402|         15404|
|[0.0,0.0037436599...| 6997.327

In [7]:
paramGrid = (ParamGridBuilder()
             .addGrid(lr.maxIter, [100, 200, 300])
             .addGrid(lr.regParam, [0.1, 0.01])
             .build())


In [8]:
cv = CrossValidator(estimator=pipeline, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=RegressionEvaluator(
                                predictionCol="prediction", \
                                labelCol="follower_count", \
                                metricName="rmse"), \
                    numFolds=2)

cv_model = cv.fit(train)
best_model = cv_model.bestModel
best_predictions = best_model.transform(test)
rmse_evaluator = RegressionEvaluator(labelCol="follower_count", predictionCol="prediction", metricName="rmse")
mae_evaluator = RegressionEvaluator(labelCol="follower_count", predictionCol="prediction", metricName="mae")
mse_evaluator = RegressionEvaluator(labelCol="follower_count", predictionCol="prediction", metricName="mse")
r2_evaluator = RegressionEvaluator(labelCol="follower_count", predictionCol="prediction", metricName="r2")
best_rmse = rmse_evaluator.evaluate(best_predictions)
best_mse = mse_evaluator.evaluate(best_predictions)
best_mae = mae_evaluator.evaluate(best_predictions)
best_r2 = r2_evaluator.evaluate(best_predictions)
print(f"RMSE: {best_rmse:0.3f}" )
print(f"MSE: {best_mse:0.3f}" )
print(f"MAE: {best_mae:0.3f}" )
print(f"r2: {best_r2:0.3f}" )

RMSE: 3554.286
MSE: 12632950.599
MAE: 2754.669
r2: 0.544


### Задача классификации

In [9]:
df = df_full.select("following_count", "follower_count", "like_count", "retweet_count", "is_popular")
df = df.withColumn("popular", when(col("is_popular") == True, 1).otherwise(0))
df = df.drop('is_popular')

df.show(10)
df.printSchema()
total_count = df.count()
true_count = df.filter(df["popular"] == 1).count()
false_count = df.filter(df["popular"] == 0).count()
print("True:", true_count)
print("False:", false_count)
true_percentage = (true_count / total_count) * 100
false_percentage = (false_count / total_count) * 100
print(f"True: {true_percentage:0.3f}%")
print(f"False: {false_percentage:0.3f}%")

+---------------+--------------+----------+-------------+-------+
|following_count|follower_count|like_count|retweet_count|popular|
+---------------+--------------+----------+-------------+-------+
|          10348|         11555|         0|            0|      0|
|          16095|         17709|         0|            0|      1|
|          14704|         19393|         0|            0|      1|
|          12638|         12756|         0|            0|      0|
|           5848|          8651|         0|            0|      0|
|          23684|         24655|         0|            0|      1|
|           7494|          9873|         0|            0|      0|
|          13520|         23420|         0|            1|      1|
|          10031|         12326|         0|            0|      0|
|          22187|         24438|         0|            0|      1|
+---------------+--------------+----------+-------------+-------+
only showing top 10 rows

root
 |-- following_count: integer (nullable = tru

In [10]:
splits = df.randomSplit([0.8, 0.2])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 2674395  Testing Rows: 667526


In [11]:
numVect = VectorAssembler(inputCols = ["like_count", "following_count", "retweet_count", "follower_count"], outputCol="features")
rf = RandomForestClassifier(labelCol="popular", featuresCol="features")
pipeline = Pipeline(stages=[numVect, rf])
model = pipeline.fit(train)
predictions = model.transform(test)
predicted = predictions.select("features", "prediction", "popular")
predicted.show(20)
evaluator = MulticlassClassificationEvaluator(labelCol="popular", predictionCol="prediction", metricName="accuracy")

+--------------------+----------+-------+
|            features|prediction|popular|
+--------------------+----------+-------+
|[1.0,38.0,1.0,784...|       0.0|      0|
|[0.0,42.0,0.0,214...|       1.0|      1|
|[0.0,42.0,0.0,214...|       1.0|      1|
|[0.0,77.0,0.0,149...|       0.0|      0|
|[0.0,77.0,0.0,149...|       0.0|      0|
|[0.0,94.0,0.0,154...|       0.0|      1|
|[0.0,94.0,0.0,154...|       0.0|      1|
|[0.0,94.0,0.0,154...|       0.0|      1|
|[0.0,94.0,0.0,154...|       0.0|      1|
|[0.0,94.0,0.0,154...|       0.0|      1|
|[0.0,94.0,0.0,154...|       0.0|      1|
|[0.0,94.0,0.0,154...|       0.0|      1|
|[0.0,94.0,0.0,154...|       0.0|      1|
|[0.0,94.0,0.0,154...|       0.0|      1|
|[0.0,94.0,0.0,154...|       0.0|      1|
|[0.0,94.0,0.0,154...|       0.0|      1|
|[0.0,94.0,0.0,154...|       0.0|      1|
|[0.0,94.0,0.0,154...|       0.0|      1|
|[0.0,94.0,0.0,154...|       0.0|      1|
|[1.0,94.0,0.0,154...|       0.0|      1|
+--------------------+----------+-

In [12]:
tp = float(predicted.filter("prediction == 1.0 AND popular == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND popular == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND popular == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND popular == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()


+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|          304713.0|
|       FP|              19.0|
|       TN|          362445.0|
|       FN|             349.0|
|Precision|0.9999376501319192|
|   Recall|0.9988559702617829|
|       F1|0.9993965175124714|
+---------+------------------+



#### Precision: Точность
#### Recall:  Полнота
#### F1:  Среднее между точностью и полнотой

In [13]:
evaluator = BinaryClassificationEvaluator(labelCol="popular", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(predictions)
print ("AUR = ", aur)


AUR =  0.9999984879627133


In [14]:
df = df_full.select("following_count", "follower_count", "like_count", "retweet_count", "is_retweet")
df = df.withColumn("retweet", when(col("is_retweet") == True, 1).otherwise(0))
df = df.drop('is_retweet')

df.show(10)
df.printSchema()
total_count = df.count()
true_count = df.filter(df["retweet"] == 1).count()
false_count = df.filter(df["retweet"] == 0).count()
print("True:", true_count)
print("False:", false_count)
true_percentage = (true_count / total_count) * 100
false_percentage = (false_count / total_count) * 100
print(f"True: {true_percentage:0.3f}%")
print(f"False: {false_percentage:0.3f}%")
splits = df.randomSplit([0.8, 0.2])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)
numVect = VectorAssembler(inputCols = ["like_count", "following_count", "retweet_count", "follower_count"], outputCol="features")
rf = RandomForestClassifier(labelCol="retweet", featuresCol="features")
pipeline = Pipeline(stages=[numVect, rf])
model = pipeline.fit(train)
predictions = model.transform(test)
predicted = predictions.select("features", "prediction", "retweet")
predicted.show(20)
evaluator = MulticlassClassificationEvaluator(labelCol="retweet", predictionCol="prediction", metricName="accuracy")
tp = float(predicted.filter("prediction == 1.0 AND retweet == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND retweet == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND retweet == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND retweet == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()
evaluator = BinaryClassificationEvaluator(labelCol="retweet", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(predictions)
print ("AUR = ", aur)



+---------------+--------------+----------+-------------+-------+
|following_count|follower_count|like_count|retweet_count|retweet|
+---------------+--------------+----------+-------------+-------+
|          10348|         11555|         0|            0|      0|
|          16095|         17709|         0|            0|      0|
|          14704|         19393|         0|            0|      0|
|          12638|         12756|         0|            0|      0|
|           5848|          8651|         0|            0|      0|
|          23684|         24655|         0|            0|      0|
|           7494|          9873|         0|            0|      0|
|          13520|         23420|         0|            1|      0|
|          10031|         12326|         0|            0|      0|
|          22187|         24438|         0|            0|      0|
+---------------+--------------+----------+-------------+-------+
only showing top 10 rows

root
 |-- following_count: integer (nullable = tru