In [20]:
import pandas

import pyspark
from pyspark.ml.evaluation import RegressionEvaluator
import pyspark.sql.functions as F
from pyspark.ml.recommendation import ALS
from pyspark.sql import SQLContext

from pyspark.sql.types import StringType, IntegerType

In [4]:
spark = (pyspark.sql.SparkSession.builder
    .master("local")
    .getOrCreate())

In [5]:
comic_reviews = spark.read.json('data/all_comic_reviews_in_mtv.json')

In [6]:
movie_reviews = spark.read.json('data/all_movietv_jsons/*.json')

In [46]:
def new_id_dictionary(df, column, suffix_val):
    """Take in column with unique indexes, return dictionary with new index values. This is done to
     remove the default ASIN and user ID from Amazon reviews and create better unique ids.
    Args:
        df: source dataframe
        column: name of column with ids to replace
        suffix_val: new suffix value for unique codes. Example: all new user_ids could end
        with '00000'
    Returns:
        new_id_dict: New Spark dataframe with column of new unique ids
    
    """
    unique_vals = list(set([old_id[0] for old_id in df.select(column).collect()]))
    new_ids = [(str(i) + suffix_val) for i in range(1,len(unique_vals)+1)]
    new_id_dict = {k:v for k,v in zip(unique_vals, new_ids)}
    return new_id_dict

In [8]:
comic_reviews.show(1)

+----------+-------+--------------------+--------------+
|      asin|overall|          reviewText|    reviewerID|
+----------+-------+--------------------+--------------+
|0316107255|    4.0|PENGUIN DREAMS AN...|A3NQU1649SH0Q4|
+----------+-------+--------------------+--------------+
only showing top 1 row



In [82]:
new_user_ids = new_id_dictionary(comic_reviews, 'reviewerID', '0000')
new_comic_asins = new_id_dictionary(comic_reviews, 'asin', '2222')
new_mtv_asins = new_id_dictionary(movie_reviews, 'asin', '4444')

In [36]:
def add_new_id(old_id, new_id_dict):
    new_id = id_dict[old_id]
    return new_id

In [96]:
udfUserId = F.udf(lambda x: new_user_ids[x], StringType())

udfComicId = F.udf(lambda x: new_comic_asins[x], StringType())
udfMovieId = F.udf(lambda x: new_mtv_asins[x], StringType())

In [97]:
comic_reviews_updated = comic_reviews.withColumn("item_id", udfComicId("asin"))
comic_reviews_updated = comic_reviews_updated.withColumn("user_id", udfUserId("reviewerID"))

In [98]:
comic_r_final = comic_reviews_updated.select(['user_id','item_id', 'overall'])

In [99]:
comic_r_final.show(1)

+--------+--------+-------+
| user_id| item_id|overall|
+--------+--------+-------+
|34510000|40102222|    4.0|
+--------+--------+-------+
only showing top 1 row



In [106]:
movie_reviews_updated = movie_reviews.withColumn("user_id", udfUserId("reviewerID"))
movie_reviews_updated_2 = movie_reviews_updated.withColumn("item_id", udfMovieId("asin"))

In [107]:
movies_r_final = movie_reviews_updated_2.select(['user_id', 'item_id', 'overall'])

In [108]:
movies_r_final.show(1)

+--------+---------+-------+
| user_id|  item_id|overall|
+--------+---------+-------+
|79890000|392924444|    4.0|
+--------+---------+-------+
only showing top 1 row



In [109]:
all_reviews = comic_r_final.union(movies_r_final)

In [117]:
all_reviews_als = all_reviews.select([col("user_id").cast(IntegerType()), col("item_id").cast(IntegerType()), col("overall")])

In [118]:
all_reviews_als.persist()

DataFrame[user_id: int, item_id: int, overall: double]

In [119]:
(train, test) = all_reviews_als.randomSplit([.8,.2])

In [120]:
# Build the recommendation model using ALS
als = ALS(userCol='user_id', itemCol='item_id', ratingCol='overall', nonnegative=True)

als_model = als.fit(train)

In [128]:
pred = als_model.transform(test)
pred_dropna = pred.dropna()

In [None]:
pred_dropna.show(5)

In [131]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="overall",
                                predictionCol="prediction")

evaluator_2 = RegressionEvaluator(metricName="mae", labelCol="overall",
                                predictionCol="prediction")

rmse = evaluator.evaluate(pred_dropna)
mae = evaluator_2.evaluate(pred_dropna)
print(f"RMSE: {rmse}")
print(f"MAE: {mae}")

RMSE: 1.2748690218854049
MAE: 0.9887515321072159


In [132]:
pred.show(10)

+---------+--------+-------+----------+
|  user_id| item_id|overall|prediction|
+---------+--------+-------+----------+
| 71860000| 2732222|    4.0| 1.4231931|
|  6190000| 2732222|    5.0|  3.699666|
| 98160000| 2732222|    5.0| 3.7960944|
|101490000| 2732222|    4.0|  4.645656|
| 21300000| 2922222|    5.0|  4.544185|
| 48820000| 2922222|    4.0|  3.974867|
| 88340000| 2922222|    5.0|  4.616192|
|100430000| 3202222|    5.0| 3.2581654|
| 67620000| 7324444|    4.0|       NaN|
| 36350000|10144444|    3.0|       NaN|
+---------+--------+-------+----------+
only showing top 10 rows



In [172]:
query = """
SELECT
    item_id,
    count
FROM
    (SELECT item_id, COUNT(*) as count FROM table GROUP BY item_id)
WHERE count < AVG(count)"""

In [173]:
all_reviews_als.createOrReplaceTempView('table')
temp_user_view = spark.sql(query).toPandas()
temp_user_view

Py4JJavaError: An error occurred while calling o1790.collectToPython.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: avg(input[1, bigint, false])
	at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:261)
	at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.doGenCode(interfaces.scala:87)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:105)
	at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:526)
	at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:508)
	at org.apache.spark.sql.catalyst.expressions.BinaryComparison.doGenCode(predicates.scala:563)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:105)
	at org.apache.spark.sql.execution.FilterExec.org$apache$spark$sql$execution$FilterExec$$genPredicate$1(basicPhysicalOperators.scala:139)
	at org.apache.spark.sql.execution.FilterExec$$anonfun$13.apply(basicPhysicalOperators.scala:179)
	at org.apache.spark.sql.execution.FilterExec$$anonfun$13.apply(basicPhysicalOperators.scala:163)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:163)
	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:189)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.consume(HashAggregateExec.scala:40)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultFunction(HashAggregateExec.scala:483)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:662)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:166)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:40)
	at org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:125)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:85)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:544)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:598)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3257)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3254)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3254)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
