## Data Pre-processing

In [2]:
import os
import pandas as pd
from pyspark.sql import SparkSession
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

spark = SparkSession.builder.master("local[8]").appName("DataFrame").getOrCreate()

In [None]:
review_df = pd.read_csv("./home/steam/Game_Reviews.csv")
cf_df = review_df[['id', 'steamid', 'voted_up']].drop_duplicates()
cf_df.to_csv("./home/steam/cf.csv")

In [2]:
cf_df = pd.read_csv("./home/steam/cf.csv")
cf_df.head()

Unnamed: 0.1,Unnamed: 0,id,steamid,voted_up
0,0,920210,76561198021087473,True
1,1,920210,76561198065754924,True
2,2,920210,76561199122198186,True
3,3,920210,76561198007644548,True
4,4,920210,76561198290950126,True


In [3]:
# remap user ids
Id = 0
id_dict = {}
for i in cf_df['steamid'].unique():
    id_dict[i]=Id
    Id += 1
id_dict

def change_id(row):
    steam_id = row['steamid']
    newid = id_dict[steam_id]
    return newid

cf_df['userid'] = cf_df.apply(change_id, axis = 1)
cf_df.head()

Unnamed: 0.1,Unnamed: 0,id,steamid,voted_up,userid
0,0,920210,76561198021087473,True,0
1,1,920210,76561198065754924,True,1
2,2,920210,76561199122198186,True,2
3,3,920210,76561198007644548,True,3
4,4,920210,76561198290950126,True,4


In [5]:
cf_df = cf_df.rename(columns={'id':"gameid"})[['gameid', 'userid', 'voted_up']]
cf_df.to_csv("./home/steam/updated_cf.csv", index=False)

## Read Data

In [3]:
cf_rdd = spark.read.csv('hdfs://cluster-steam-m/user/dataproc/updated_cf.csv',header=True)

In [4]:
from pyspark.sql.functions import when

cf_rdd = cf_rdd.withColumn("rating", when(cf_rdd.voted_up == "True",5) \
      .when(cf_rdd.voted_up == "False",1)).select("gameid", "userid", "rating")
cf_rdd.show()

+------+------+------+
|gameid|userid|rating|
+------+------+------+
|920210|     0|     5|
|920210|     1|     5|
|920210|     2|     5|
|920210|     3|     5|
|920210|     4|     5|
|920210|     5|     5|
|920210|     6|     5|
|920210|     7|     5|
|920210|     8|     5|
|920210|     9|     5|
|920210|    10|     5|
|920210|    11|     5|
|920210|    12|     5|
|920210|    13|     5|
|920210|    14|     5|
|920210|    15|     5|
|920210|    16|     5|
|920210|    17|     5|
|920210|    18|     5|
|920210|    19|     5|
+------+------+------+
only showing top 20 rows



In [5]:
from pyspark.sql.types import IntegerType

cf_rdd = cf_rdd.\
        withColumn("gameid", cf_rdd["gameid"].cast(IntegerType())).\
        withColumn("userid", cf_rdd["userid"].cast(IntegerType()))
cf_rdd.show()

+------+------+------+
|gameid|userid|rating|
+------+------+------+
|920210|     0|     5|
|920210|     1|     5|
|920210|     2|     5|
|920210|     3|     5|
|920210|     4|     5|
|920210|     5|     5|
|920210|     6|     5|
|920210|     7|     5|
|920210|     8|     5|
|920210|     9|     5|
|920210|    10|     5|
|920210|    11|     5|
|920210|    12|     5|
|920210|    13|     5|
|920210|    14|     5|
|920210|    15|     5|
|920210|    16|     5|
|920210|    17|     5|
|920210|    18|     5|
|920210|    19|     5|
+------+------+------+
only showing top 20 rows



## CF

In [6]:
(training, test) = cf_rdd.randomSplit([0.8, 0.2])

In [29]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

als = ALS(maxIter=5, regParam=0.01, userCol="userid", itemCol="gameid", ratingCol="rating",
          rank=5,nonnegative = True, implicitPrefs = False,coldStartStrategy="drop")
model = als.fit(training)

In [30]:
predictions = model.transform(test)
predictions.show(50)

+------+-------+------+----------+
|gameid| userid|rating|prediction|
+------+-------+------+----------+
|  9900|1105600|     5| 6.3294764|
|  9900| 269947|     1|  7.871271|
|  9900|1734476|     1| 5.9641733|
|  9900|1734482|     5|  9.966679|
|  9900|1734471|     5|  6.019036|
|  9900|1734468|     1|  9.279769|
|  9900| 466961|     1|  8.418024|
|  9900|1224431|     5| 6.2005367|
|  9900| 479448|     5| 7.0406437|
|  9900| 233239|     5|  9.469792|
|  9900|1734457|     5|  8.079119|
|  9900|1734488|     1|    8.1271|
|  9900|1583440|     5| 6.8447466|
|  9900|1734465|     5| 7.7213335|
|  9900|1734498|     1| 4.4306645|
|  9900|1734490|     1|  9.446645|
|  9900|1734473|     1| 3.3659592|
|  9900| 782409|     1|  7.102413|
|  9900| 553913|     1| 1.3634845|
| 32460|1962811|     5|  4.393107|
| 32460|3518618|     5|  5.450842|
| 32460|1027236|     1|  5.308881|
| 32460| 116355|     5|  5.341689|
| 32460| 318240|     5| 4.5154886|
| 32460|3518811|     5|  5.223428|
| 32460|1010095|    

In [None]:
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

df_stats = predictions.select(
    _mean(col('prediction')).alias('mean')).collect()

mean = df_stats[0]['mean']

In [None]:
mean

In [21]:
predictions = predictions.withColumn("pred_rating", when(predictions.prediction > mean, float(5)) \
      .when(predictions.prediction <= mean,float(1)))
predictions.show()

+------+-------+------+----------+-----------+
|gameid| userid|rating|prediction|pred_rating|
+------+-------+------+----------+-----------+
|  9900|1105600|     5| 10.116807|        5.0|
|  9900| 269947|     1| -5.218788|        1.0|
|  9900|1734476|     1| -5.429006|        1.0|
|  9900|1734482|     5|  9.843704|        5.0|
|  9900|1734471|     5| 11.920536|        5.0|
|  9900|1734468|     1|  8.452037|        5.0|
|  9900| 466961|     1|  1.914526|        1.0|
|  9900|1224431|     5| 11.891155|        5.0|
|  9900| 479448|     5| 11.645497|        5.0|
|  9900| 233239|     5| 15.080063|        5.0|
|  9900|1734457|     5| 0.7760112|        1.0|
|  9900|1734488|     1|  2.587336|        1.0|
|  9900|1583440|     5| 19.921112|        5.0|
|  9900|1734465|     5| 7.7800303|        5.0|
|  9900|1734498|     1| 12.380391|        5.0|
|  9900|1734490|     1|-12.746338|        1.0|
|  9900|1734473|     1|-7.9787073|        1.0|
|  9900| 782409|     1|  7.576628|        5.0|
|  9900| 5539

In [12]:
rmse_evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="pred_rating")
rmse = rmse_evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 2.5735300275968322


In [15]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
f1_evaluator = MulticlassClassificationEvaluator(
    labelCol="rating", predictionCol="pred_rating", metricName="f1")
accuracy = acc_evaluator.evaluate(predictions)
print("F1:", accuracy)

F1: 0.638907525981512


In [31]:
for threshold in [0,1,2,3,4,5]:
    adjust_predictions = predictions.withColumn("pred_rating", when(predictions.prediction > threshold, float(5)) \
      .when(predictions.prediction <= threshold,float(1)))
    rmse = rmse_evaluator.evaluate(adjust_predictions)
    f1 = f1_evaluator.evaluate(adjust_predictions)
    print("Threshold = %f, Root-mean-square error = %f, F1 = %f" %(threshold, rmse, f1))

Threshold = 0.000000, Root-mean-square error = 1.631726, F1 = 0.758992
Threshold = 1.000000, Root-mean-square error = 1.693102, F1 = 0.784772
Threshold = 2.000000, Root-mean-square error = 1.776153, F1 = 0.790665
Threshold = 3.000000, Root-mean-square error = 2.020344, F1 = 0.764271
Threshold = 4.000000, Root-mean-square error = 2.625721, F1 = 0.621995
Threshold = 5.000000, Root-mean-square error = 3.489669, F1 = 0.194589


KeyboardInterrupt: 

In [None]:
for threshold in [1.5, 1.6, 1.7, 1.8, 1.9, 2.1, 2.2, 2.3, 2.4, 2.5]:
    adjust_predictions = predictions.withColumn("pred_rating", when(predictions.prediction > threshold, float(5)) \
      .when(predictions.prediction <= threshold,float(1)))
    rmse = rmse_evaluator.evaluate(adjust_predictions)
    f1 = f1_evaluator.evaluate(adjust_predictions)
    print("Threshold = %f, Root-mean-square error = %f, F1 = %f" %(threshold, rmse, f1))

Threshold = 1.500000, Root-mean-square error = 1.724656, F1 = 0.789517
Threshold = 1.600000, Root-mean-square error = 1.732672, F1 = 0.790118
Threshold = 1.700000, Root-mean-square error = 1.741250, F1 = 0.790659
Threshold = 1.800000, Root-mean-square error = 1.751504, F1 = 0.790896
Threshold = 1.900000, Root-mean-square error = 1.763064, F1 = 0.790896
Threshold = 2.100000, Root-mean-square error = 1.791142, F1 = 0.790043
Threshold = 2.200000, Root-mean-square error = 1.807210, F1 = 0.789200
Threshold = 2.300000, Root-mean-square error = 1.825065, F1 = 0.788064
Threshold = 2.400000, Root-mean-square error = 1.845620, F1 = 0.786378
Threshold = 2.500000, Root-mean-square error = 1.867507, F1 = 0.784384


In [12]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="rating", predictionCol="pred_rating", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

Accuracy: 0.5765061447332364
