<a href="https://colab.research.google.com/github/ndkhoa0704/Spark-MLlib-project/blob/main/Lab3_Requirement1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Install java 
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
#unzip
!tar xf spark-3.2.1-bin-hadoop3.2.tgz

In [None]:
import os 
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"
!pip install -q findspark 
!pip install pyspark



In [1]:
from pyspark.sql import SparkSession, Window
import pyspark.sql.types as types
import pyspark.sql.functions as f
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, GBTRegressor, RandomForestRegressor, FMRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from sklearn.metrics import accuracy_score
import pandas as pd
import seaborn as sns

ModuleNotFoundError: ignored

In [None]:
spark = SparkSession.builder \
    .master("local") \
    .appName("MLClassification") \
    .getOrCreate()

In [None]:
train_set = spark.read.load(path='./train.csv', format='csv', header=True)
test_set = spark.read.load(path='./test.csv', format='csv', header=True)

In [None]:
# Preprocess
train_set = train_set.withColumn('date', f.to_date(f.col('date'),r'dd.MM.yyyy').alias('date')) \
.withColumn('shop_id', f.col('shop_id').cast(types.DoubleType())) \
.withColumn('item_id', f.col('item_id').cast(types.DoubleType())) \
.withColumn('item_price', f.col('item_price').cast(types.DoubleType())) \
.withColumn('item_cnt_day', f.col('item_cnt_day').cast(types.DoubleType())) \
.withColumn('date_block_num', f.col('date_block_num').cast(types.DoubleType()))

test_set = test_set.withColumn('date', f.to_date(f.col('date'),r'dd.MM.yyyy').alias('date')) \
.withColumn('shop_id', f.col('shop_id').cast(types.DoubleType())) \
.withColumn('item_id', f.col('item_id').cast(types.DoubleType())) \
.withColumn('item_price', f.col('item_price').cast(types.DoubleType())) \
.withColumn('item_cnt_day', f.col('item_cnt_day').cast(types.DoubleType())) \
.withColumn('date_block_num', f.col('date_block_num').cast(types.DoubleType()))

train_set = train_set.filter(f.col('item_cnt_day') >= 0).filter(f.col('item_price') >= 0)
test_set = test_set.filter(f.col('item_cnt_day') >= 0).filter(f.col('item_price') >= 0)


In [None]:
# Vectorize
vecAssembler = VectorAssembler(inputCols=['item_id', 'shop_id', 'item_price'], outputCol="features")
train_set = vecAssembler.transform(train_set)
test_set = vecAssembler.transform(test_set)

In [None]:
# Prepare model
linear_model = LinearRegression(featuresCol='features', labelCol='item_cnt_day')

evaluator = RegressionEvaluator(labelCol="item_cnt_day",
                                predictionCol="prediction",
                                metricName="rmse")

paramGrid = ParamGridBuilder().addGrid(linear_model.regParam, [0.05, 1, 0.2, 0.3]) \
                              .addGrid(linear_model.elasticNetParam, [0.1, 0.2, 0.3, 0.5]).build()

cv = CrossValidator(estimator=linear_model, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

In [None]:
# Train
cvmodel = cv.fit(train_set)

In [None]:
# Evaluate (RMSE)
best_model_ln = cvmodel.bestModel
res = best_model_ln.transform(test_set)
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='item_cnt_day')
print('RMSE: ', evaluator.evaluate(res))

RMSE:  2.113625883048452


In [None]:
rfr = RandomForestRegressor(featuresCol='features', labelCol='item_cnt_day')

evaluator = RegressionEvaluator(labelCol="item_cnt_day",
                                predictionCol="prediction",
                                metricName="rmse")

paramGrid = ParamGridBuilder().addGrid(rfr.numTrees, [10, 20, 30]).build()

cv = CrossValidator(estimator=rfr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

In [None]:
# Train
cvmodel = cv.fit(train_set)

In [None]:
# Evaluate (RMSE)
best_model_ln = cvmodel.bestModel
res = best_model_ln.transform(test_set)
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='item_cnt_day')
print('RMSE: ', evaluator.evaluate(res))

RMSE:  2.042963174175608


In [None]:
gbt = GBTRegressor(featuresCol='features', labelCol='item_cnt_day')

evaluator = RegressionEvaluator(labelCol="item_cnt_day",
                                predictionCol="prediction",
                                metricName="rmse")

paramGrid = ParamGridBuilder().addGrid(gbt.minWeightFractionPerNode, [0, 0.1, 0.2])\
                              .addGrid(gbt.stepSize, [0.01, 0.05, 0.1, 0.3])\
                              .build()

cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

In [None]:
# Train
cvmodel = cv.fit(train_set)

KeyboardInterrupt: ignored

In [None]:
# Evaluate (RMSE)
best_model_gbt = cvmodel.bestModel
res = best_model_ln.transform(test_set)
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='item_cnt_day')
print('RMSE: ', evaluator.evaluate(res))

In [None]:
fm = FMRegressor(featuresCol='features', labelCol='item_cnt_day')
fm.fit(train_set)

FMRegressionModel: uid=FMRegressor_f57e53061515, numFeatures=3, factorSize=8, fitLinear=true, fitIntercept=true