# Spark


In [1]:
import os
import shutil

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, StringType

In [2]:
spark = (
    SparkSession.builder.appName("Book Recommender")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.repl.eagerEval.maxNumRows", 10)
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/17 12:04:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark_df = spark.read.parquet("outputs/work_df")

                                                                                

In [5]:
spark_df.printSchema()

root
 |-- index: long (nullable = true)
 |-- userId: short (nullable = true)
 |-- bookId: short (nullable = true)
 |-- isbn13: long (nullable = true)
 |-- title: string (nullable = true)
 |-- language: string (nullable = true)
 |-- rating: long (nullable = true)



In [6]:
spark_df = spark_df.withColumns(
    {
        "userId": F.col("userId").cast(IntegerType()),
        "bookId": F.col("bookId").cast(IntegerType()),
        "rating": F.col("rating").cast(IntegerType()),
    }
)


In [7]:
spark_df.printSchema()

root
 |-- index: long (nullable = true)
 |-- userId: integer (nullable = true)
 |-- bookId: integer (nullable = true)
 |-- isbn13: long (nullable = true)
 |-- title: string (nullable = true)
 |-- language: string (nullable = true)
 |-- rating: integer (nullable = true)



In [8]:
book_ratings = spark_df.select("userId", "bookId", "rating")
book_ratings

                                                                                

userId,bookId,rating
179,2872,10
277,2872,9
358,2872,7
382,2872,10
585,2872,6
611,2872,8
638,2872,8
648,2872,5
1009,2872,8
1024,2872,8


### Spark ML


In [9]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.evaluation import RegressionEvaluator

#### Train-Test Split


In [10]:
(train, test) = book_ratings.randomSplit([0.8, 0.2], seed=42)

#### Simple ALS Model


In [11]:
als = ALS(
    userCol="userId",
    itemCol="bookId",
    ratingCol="rating",
    nonnegative=True,
    coldStartStrategy="drop",
    seed=42,
)


In [12]:
evaluator = RegressionEvaluator(
    metricName="rmse", labelCol="rating", predictionCol="prediction"
)


In [13]:
simple_model = als.fit(train)

23/04/17 12:07:47 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

In [14]:
predictions = simple_model.transform(test)

In [15]:
predictions

                                                                                

userId,bookId,rating,prediction
1395,6654,10,8.268562
2563,463,7,8.162191
2563,3749,8,6.869559
1884,8389,10,7.128739
5140,7982,6,5.1622477
847,4519,7,8.53499
5055,2366,8,5.5020676
2463,2366,10,9.577781
4443,5300,7,6.251077
4230,8638,10,9.2928505


In [16]:
rmse = evaluator.evaluate(predictions)
rmse

                                                                                

1.8919501331030912

In [17]:
param_grid = (
    ParamGridBuilder()
    .addGrid(als.maxIter, [5, 10])
    .addGrid(als.rank, [3, 5, 10])
    .addGrid(als.regParam, [0.1, 0.5])
    .build()
)


In [18]:
print(f"Num. Models: {len(param_grid)}")

Num. Models: 12


In [19]:
cv = CrossValidator(
    estimator=als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    parallelism=4,
    numFolds=3,
)


In [20]:
tuned_model = cv.fit(train)

23/04/17 12:08:24 WARN BlockManager: Block rdd_293_0 already exists on this machine; not re-adding it
                                                                                

In [21]:
best_model = tuned_model.bestModel

In [22]:
predictions_2 = best_model.transform(test)

In [None]:
predictions_2

userId,bookId,rating,prediction
1395,6654,10,5.2757545
2563,463,7,6.8804245
2563,3749,8,6.3984694
1884,8389,10,7.8649216
5140,7982,6,6.5331054
847,4519,7,8.411961
5055,2366,8,6.6758804
2463,2366,10,8.322912
4443,5300,7,6.313528
4230,8638,10,7.664136


In [23]:
rmse_2 = evaluator.evaluate(predictions_2)
rmse_2

                                                                                

1.7229040610635111

In [24]:
best_model

ALSModel: uid=ALS_eab784e17b3c, rank=3

## Save the model


In [25]:
if os.path.exists("alsrecommend.model"):
    shutil.rmtree("alsrecommend.model")
best_model.save("alsrecommend.model")

                                                                                

In [26]:
spark.stop()