In [1]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.ml.evaluation import RegressionEvaluator

spark = (
    SparkSession.builder.config("spark.sql.debug.maxToStringFields", 100)
    .appName("reviews")
    .getOrCreate()
)

# data_filepath = "../data/cleaned_steam_reviews/game_id={578080,271590,359550,105600,4000,252490,252950,218620,945360,292030}"
data_filepath = "../data/cleaned_steam_reviews/game_id={294100,304390,812140,306130,391220,221380,262060,1289310,646570,552520}"
data_filepath = "../data/cleaned_steam_reviews/game_id={294100,304390,812140}"
# data_filepath = "../data/cleaned_steam_reviews"
steam_games_filepath = "../data/cleaned_steam_games"

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/28 15:44:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
steam_reviews = spark.read.parquet(data_filepath)
steam_games = spark.read.parquet(steam_games_filepath)

                                                                                

In [3]:
playtime = steam_reviews.select('app_id', 'author_steamid', 'author_playtime_forever') \
                            .where((col("author_steamid") != F.lit("null")) & (F.length(col("author_steamid")) == 17) & (col("author_steamid").rlike("^[0-9]+$"))) \
                            .where(steam_reviews["author_playtime_forever"].isNotNull() \
                                   & steam_reviews["app_id"].isNotNull())


In [4]:
d = {}
# Fill in the entries one by one

d["author_playtime_forever"] = playtime.approxQuantile("author_playtime_forever",[0.01,0.99],0.25)

# # looping through the columns, doing log(x+1) transformations
# for col in df.columns:
playtime_quantile = playtime.withColumn("author_playtime_forever", \
                F.log(F.when(playtime["author_playtime_forever"] < d["author_playtime_forever"][0],d["author_playtime_forever"][0])\
                .when(playtime["author_playtime_forever"] > d["author_playtime_forever"][1], d["author_playtime_forever"][1])\
                .otherwise(playtime["author_playtime_forever"] ) +1).alias("author_playtime_forever"))

mean = playtime_quantile.select(F.mean(playtime_quantile.author_playtime_forever)).collect()

playtime_capped = playtime_quantile.withColumn("author_playtime_forever", F.when(playtime_quantile.author_playtime_forever > mean[0][0]*2, mean[0][0]*2).otherwise(playtime_quantile.author_playtime_forever))
playtime_scaled = playtime_capped.withColumn("author_playtime_forever", playtime_capped.author_playtime_forever / (mean[0][0]*2))


                                                                                

In [5]:
playtime_scaled.show()

+------+-----------------+-----------------------+
|app_id|   author_steamid|author_playtime_forever|
+------+-----------------+-----------------------+
|294100|76561198892712976|    0.32939984568544967|
|294100|76561198053458049|     0.5159773746231011|
|294100|76561198113534698|    0.38116279094434025|
|294100|76561198085554674|     0.5713611140256404|
|294100|76561198096815495|    0.40600692664341137|
|294100|76561198067122940|     0.6012115490074816|
|294100|76561198394862196|     0.4244380936196737|
|294100|76561198013936546|     0.5485325509397957|
|294100|76561198027357642|    0.29842714632737116|
|294100|76561198102039006|     0.5494378742649005|
|294100|76561198029160051|     0.5061984091095405|
|294100|76561198066472711|     0.4431199678578947|
|294100|76561199117988906|     0.4339861542610619|
|294100|76561198098384447|     0.5516572504132496|
|294100|76561198019368906|     0.5821367917397706|
|294100|76561198317032242|     0.5126770016774996|
|294100|76561198079640725|     

In [6]:
playtime_data = (playtime_scaled
    .select(
        'app_id',
        'author_steamid',
        'author_playtime_forever',
    )
).cache()

In [7]:
(training, test) = playtime_data.randomSplit([0.8, 0.2])

In [9]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

author_indexer = StringIndexer(inputCol="author_steamid", outputCol="author_index", handleInvalid="keep")
app_indexer = StringIndexer(inputCol="app_id", outputCol="app_index", handleInvalid="keep")
als = ALS(maxIter=2, regParam=0.01, userCol="author_index", itemCol="app_index", ratingCol="author_playtime_forever", coldStartStrategy="drop", implicitPrefs=True)
pipeline = Pipeline(stages=[author_indexer, app_indexer, als])

model = pipeline.fit(training)

predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="author_playtime_forever", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))


23/11/28 15:47:13 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
23/11/28 15:47:13 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
23/11/28 15:47:14 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
23/11/28 15:47:15 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
23/11/28 15:47:16 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
23/11/28 15:47:17 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
23/11/28 15:47:18 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
23/11/28 15:47:18 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
23/11/28 15:47:19 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
23/11/28 15:47:20 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
23/11/28 15:47:20 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
23/11/28 15:47:21 WARN DAGScheduler: Broadcasting larg

Root-mean-square error = 0.4941011670521637


In [13]:
# Let's say the new author_steamid is 'new_user' and they have played app_id 1, 2, and 3.
new_user_data = spark.createDataFrame(
    [('new_user', '294100'), ('new_user', '812140')],
    ['author_steamid', 'app_id']
)

# Since 'new_user' is not in the training set, we will assign a new ID manually.
# For example, if the highest ID in your training set was 100, you could assign 101.
# This is a workaround and predictions for this user may not be reliable.
new_user_data = new_user_data.withColumn("author_index", F.lit(101))

app_indexer_model = model.stages[1]
als_model = model.stages[2]

# Now we need to transform the app_id using the StringIndexer model used in training.
# This will add a new column 'app_index' to our DataFrame.
new_user_data_transformed = app_indexer_model.transform(new_user_data)

# Now we can use the trained ALS model to make predictions for the new user.
# Note that the 'app_indexer_model' should be the StringIndexerModel obtained from the pipeline for 'app_id', 
# which has already been fit to the training data.
new_user_predictions = als_model.transform(new_user_data_transformed)

# Show the predictions
new_user_predictions.select("author_index", "app_index", "prediction").show()


23/11/28 16:00:17 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
23/11/28 16:00:18 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
                                                                                

+------------+---------+----------+
|author_index|app_index|prediction|
+------------+---------+----------+
|         101|      1.0| 0.9418529|
|         101|      0.0| 0.8790058|
+------------+---------+----------+



In [63]:
# # Build the recommendation model using ALS on the training data
# # Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
# als = ALS(maxIter=2, regParam=0.01, 
#           userCol="author_steamid", itemCol="app_id", ratingCol="author_playtime_forever",
#           coldStartStrategy="drop",
#           implicitPrefs=True)
# model = als.fit(training)

# # Evaluate the model by computing the RMSE on the test data
# predictions = model.transform(test)
# evaluator = RegressionEvaluator(metricName="rmse", labelCol="watching_percentage",
#                                 predictionCol="prediction")

# rmse = evaluator.evaluate(predictions)
# print("Root-mean-square error = " + str(rmse))

23/11/28 15:23:26 ERROR Instrumentation: java.lang.IllegalArgumentException: requirement failed: Column author_steamid must be of type numeric but was actually of type string.
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.util.SchemaUtils$.checkNumericType(SchemaUtils.scala:78)
	at org.apache.spark.ml.recommendation.ALSParams.validateAndTransformSchema(ALS.scala:259)
	at org.apache.spark.ml.recommendation.ALSParams.validateAndTransformSchema$(ALS.scala:257)
	at org.apache.spark.ml.recommendation.ALS.validateAndTransformSchema(ALS.scala:616)
	at org.apache.spark.ml.recommendation.ALS.transformSchema(ALS.scala:753)
	at org.apache.spark.ml.recommendation.ALS.$anonfun$fit$1(ALS.scala:715)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:714

IllegalArgumentException: requirement failed: Column author_steamid must be of type numeric but was actually of type string.