In [1]:
import pandas as pd
from pyspark.sql import SparkSession, DataFrame
from elasticsearch import Elasticsearch, helpers
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col
from functools import reduce
import uuid

In [2]:
spark = SparkSession.builder.config("spark.submit.deployMode","client").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/12/09 20:29:20 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/12/09 20:29:20 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/12/09 20:29:20 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/12/09 20:29:20 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


In [None]:
spark.sparkContext.getConf().getAll()

In [None]:
raw_reviews = spark.read.json('/user/amm9801_nyu_edu/project/Books.json')
raw_reviews.count()

In [None]:
raw_clothing = spark.read.json('/user/sg6482_nyu_edu/project/Clothing_Shoes_and_Jewelry.json')

In [None]:
spark.conf.set("spark.sql.caseSensitive", "true")
raw_electronics = spark.read.json('/user/sa6142_nyu_edu/project/electronics/Electronics.json')
spark.conf.set("spark.sql.caseSensitive", "false")

In [None]:
raw_clothing.count()

In [None]:
raw_electronics.count()

In [None]:
raw_reviews

In [None]:
raw_metadata = spark.read.json('project/meta_Books.json')
raw_metadata.count()

In [None]:
raw_metadata

In [None]:
metadata = raw_metadata.select('asin', 'title', 'description', 'brand')

In [None]:
raw_reviews.createOrReplaceTempView("view_raw_reviews")

In [None]:
raw_clothing.createOrReplaceTempView("view_raw_clothing")

In [None]:
raw_electronics.createOrReplaceTempView("view_raw_electronics")

In [None]:
reviews_filtered = spark.sql("select asin, reviewerId, overall from\
                            (select *, count(*) over (partition by reviewerId) as c\
                            from view_raw_reviews) where c >= 5")

In [None]:
clothing_filtered = spark.sql("select asin, reviewerId, overall from\
                            (select *, count(*) over (partition by reviewerId) as c\
                            from view_raw_clothing) where c >= 5")

In [None]:
electronics_filtered = spark.sql("select asin, reviewerId, overall from\
                            (select *, count(*) over (partition by reviewerId) as c\
                            from view_raw_electronics) where c >= 5")

In [None]:
electronics_filtered.count()

In [None]:
clothing_filtered.count()

In [None]:
reviews_filtered.write.option("header",True).csv("/user/amm9801_nyu_edu/project/reviews_filtered")

In [None]:
clothing_filtered.write.option("header",True).csv("/user/sg6482_nyu_edu/project/clothing_filtered")

In [None]:
electronics_filtered.write.option("header",True).csv("/user/sa6142_nyu_edu/project/electronics_filtered")

In [None]:
reviews_filtered = spark.read.option("header", True).csv("/user/amm9801_nyu_edu/project/reviews_filtered")

In [None]:
clothing_filtered = spark.read.option("header", True).csv("/user/sg6482_nyu_edu/project/clothing_filtered")

In [None]:
electronics_filtered = spark.read.option("header", True).csv("/user/sa6142_nyu_edu/project/electronics_filtered")

In [None]:
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

In [None]:
merged_reviews = unionAll(reviews_filtered, clothing_filtered, electronics_filtered)

In [None]:
merged_reviews.write.option("header",True).csv("project/merged_reviews")

In [None]:
merged_reviews = spark.read.option("header", True).csv("/user/amm9801_nyu_edu/project/merged_reviews")

In [None]:
merged_reviews.count()

In [None]:
merged_reviews.printSchema()

In [None]:
stringindexer = StringIndexer().setInputCol("reviewerId").setOutputCol("reviewerIdIdx")

In [None]:
model_reviewerId = stringindexer.fit(merged_reviews)

In [None]:
merged_reviews_transformed = model_reviewerId.transform(merged_reviews)

In [None]:
stringindexer = StringIndexer().setInputCol("asin").setOutputCol("asinIdx")

In [None]:
model_asin = stringindexer.fit(merged_reviews_transformed)

In [None]:
merged_reviews_transformed = model_asin.transform(merged_reviews_transformed)

In [None]:
merged_reviews_transformed = merged_reviews_transformed\
                                .withColumn("reviewerIdIdx", col("reviewerIdIdx").cast('int'))\
                                .withColumn("asinIdx", col("asinIdx").cast('int'))\
                                .withColumn("overall", col("overall").cast('float'))

In [None]:
merged_reviews_transformed.printSchema()

In [None]:
merged_reviews_transformed.write.option("header", True)\
    .csv("/user/amm9801_nyu_edu/project/merged_reviews_transformed")

In [None]:
merged_reviews_transformed = spark.read.csv("/user/amm9801_nyu_edu/project/merged_reviews_transformed",\
                                            inferSchema=True, header=True)

In [None]:
merged_reviews_transformed.count()

In [None]:
merged_reviews_transformed.printSchema()

In [None]:
(training, validation) = merged_reviews_transformed.randomSplit([0.8, 0.2])

In [None]:
training.write.option("header", True).csv("/user/amm9801_nyu_edu/project/training")

In [None]:
validation.write.option("header", True).csv("/user/amm9801_nyu_edu/project/validation")

In [None]:
training = spark.read.csv("/user/amm9801_nyu_edu/project/training", inferSchema=True, header=True)

In [None]:
training.printSchema()

In [None]:
training.count()

In [None]:
validation = spark.read.csv("/user/amm9801_nyu_edu/project/validation", inferSchema=True, header=True)

In [None]:
als = ALS(maxIter=10, regParam=0.05, rank=48, userCol="reviewerIdIdx", itemCol="asinIdx", ratingCol="overall",
          coldStartStrategy="drop")

In [None]:
model = als.fit(training)

In [None]:
model.save("/user/amm9801_nyu_edu/project/als_model_merged")

In [3]:
model = ALSModel.load("/user/amm9801_nyu_edu/project/als_model_merged")

                                                                                

In [4]:
itemfactors = spark.createDataFrame(model.itemFactors.rdd)

                                                                                

In [7]:
itemfactors

DataFrame[id: bigint, features: array<double>]

In [9]:
itemfactors.write.option("header", True).parquet("/user/amm9801_nyu_edu/project/itemfactors")

                                                                                

In [10]:
itemfactors = spark.read.parquet("/user/amm9801_nyu_edu/project/itemfactors", inferSchema=True, header=True)

In [11]:
itemfactors.count()

4351089

In [12]:
itemfactors.printSchema()

root
 |-- id: long (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: double (containsNull = true)

