In [None]:
from datetime import datetime
import pyspark
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import explode, struct, collect_list
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
import json
import subprocess
import sys

In [None]:
model_name = "spark_store_recs"
model_version = "v2"

## Read in table from BigQuery

In [None]:
sc = pyspark.SparkContext.getOrCreate()

In [None]:
# Use Cloud Dataprocs automatically propagated configurations to get the bucket and project for this cluster.
bucket = sc._jsc.hadoopConfiguration().get("fs.gs.system.bucket")
project = sc._jsc.hadoopConfiguration().get("fs.gs.project.id")

In [None]:
# Set an input directory for reading data from Bigquery.
todays_date = datetime.strftime(datetime.today(), "%Y-%m-%d-%H-%M-%S")
input_directory = "gs://{}/store_recs/development/training/{}".format(bucket, todays_date)

In [None]:
# Set the configuration for importing data from BigQuery.
# Specifically, make sure to set the project ID and bucket for Cloud Dataproc,
# and the project ID, dataset, and table names for BigQuery.

conf = {
    # Input Parameters
    "mapred.bq.project.id": project,
    "mapred.bq.gcs.bucket": bucket,
    "mapred.bq.temp.gcs.path": input_directory,
    "mapred.bq.input.project.id": project,
    "mapred.bq.input.dataset.id": "data_science",
    "mapred.bq.input.table.id": "store_recs_dev_train_90days"
}

In [None]:
# Read the data from BigQuery into Spark as an RDD.
table_data = spark.sparkContext.newAPIHadoopRDD(
    "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
    "org.apache.hadoop.io.LongWritable",
    "com.google.gson.JsonObject",
    conf=conf)

In [None]:
# Set the checkpoint directory
sc.setCheckpointDir('checkpoint/')

In [None]:
# Extract the JSON strings from the RDD.
table_json = table_data.map(lambda x: x[1])

In [None]:
# Load the JSON strings as a Spark Dataframe.
transactions_data = spark.read.json(table_json)

In [None]:
n_users = transactions_data.select('user').distinct().count()
n_stores = transactions_data.select('store_id').distinct().count()
n_interactions = transactions_data.count()
n_possible_interactions = n_users*n_stores
sparsity = 100 - (n_interactions/n_possible_interactions)*100

In [None]:
# Check the data is dense enough to learn user preferences
if sparsity > 99.50:
    sys.exit("Matrix is too sparse.")

In [None]:
transactions_data.persist().checkpoint()

## Transform data

In [None]:
# We will index the ids and change the data type of the ratings because spark.ml only accepts Int types.
# Make an indexer for users and items
item_indexer, user_indexer = [StringIndexer(inputCol = x, outputCol = x+"_index").fit(transactions_data)
            for x in list(set(transactions_data.columns) - set(["rating"]))]

# Apply the indexer models to the DataFrame
indexer_pipeline = Pipeline(stages = (item_indexer, user_indexer))
transactions_index = indexer_pipeline.fit(transactions_data).transform(transactions_data)

# Make rating an integer
transactions_index = transactions_index.withColumn("rating", transactions_index["rating"].cast(IntegerType()))

In [None]:
transactions_index.persist().checkpoint()

## Split data for testing

In [None]:
(train, test, validate) = transactions_index.randomSplit([0.6, 0.2, 0.2])

In [None]:
print("Train: %d, Test: %d, Validate: %d" % (train.count(), test.count(), validate.count()))

## Train model

In [None]:
als = ALS(seed=0, implicitPrefs = True,
           maxIter=20, regParam=0.1, alpha=1, rank=20, numUserBlocks = 50, numItemBlocks=50,
          userCol = "user_index", itemCol="store_id_index", ratingCol="rating",
          coldStartStrategy='nan', checkpointInterval=10, nonnegative=True)

In [None]:
model = als.fit(transactions_index)

In [None]:
from pyspark.ml.util import MLWriter
outpath = "gs://{}/store_recs/development/model/{}".format(bucket, todays_date)
model.write().save(outpath)

## Evaluate

In [None]:
evaluator = RegressionEvaluator(metricName = "mae", labelCol="rating", predictionCol="prediction")
mae = evaluator.evaluate(predictions)
print("MAE = " + str(mae))

## Make Predictions

In [None]:
predictions = model.transform(transactions_index)

In [None]:
predictions.persist().checkpoint()

## Recommend for all users

In [None]:
top_200 = model.recommendForAllUsers(200)

In [None]:
top_200.persist().checkpoint()

## Transform data back

In [None]:
# Explode recommendations
e_top_200 = top_200.withColumn('recommendations', explode('recommendations'))

In [None]:
# Expand store_index and rating from recommendations column
e_top_200 = e_top_200.withColumn("store_index", e_top_200.recommendations.store_id_index)
e_top_200 = e_top_200.withColumn("rating", e_top_200.recommendations.rating)

In [None]:
e_top_200.persist().checkpoint()

In [None]:
# Return original strings
top_200_strings1 = IndexToString(inputCol = 'store_index', outputCol = 'store_id_original',
                        labels = item_indexer.labels).transform(e_top_200)
top_200_strings = IndexToString(inputCol = 'user_index', outputCol = 'user_original',
                        labels = user_indexer.labels).transform(top_200_strings1)

In [None]:
top_200_strings.persist().checkpoint()

## Collapse back to nested format

In [None]:
top_200_recs = top_200_strings.select(
    'user_original', struct(
        'store_id_original', 'rating').alias('struct')).groupBy(
    "user_original").agg(
    collect_list("struct").alias('recommendations'))

## Write recs to GCS 

In [None]:
# Stage data formatted as newline-delimited JSON in Google Cloud Storage.
output_directory = "gs://{}/store_recs/development/predictions/{}".format(bucket, todays_date)
top_200_recs.write.format('json').save(output_directory)

## Write evaluation metrics to GCS

In [None]:
# Create a dataframe to send with date
from pyspark.sql import Row
todays_date = datetime.strftime(datetime.today(), "%Y-%m-%d-%H-%M-%S")
my_list = [(todays_date,mae,n_users,n_stores,sparsity,model_name,model_version)]
my_rdd = sc.parallelize(my_list)
date_metrics = my_rdd.map(lambda x: Row(date=x[0], mae=x[1], n_users=x[2],
                                        n_stores=x[3], sparsity=x[4], model_name=x[5], model_version=x[6]))
eval_metrics=sqlContext.createDataFrame(date_metrics)

In [None]:
eval_metrics = eval_metrics.coalesce(1) # Make one partition for easy storage navigation 

In [None]:
# Create new file in evaluation bucket 
output_directory = "gs://{}/store_recs/development/evaluation/".format(bucket)
eval_metrics.write.format('json').mode('append').save(output_directory)

In [None]:
# Shell out to bq CLI to perform BigQuery import - replace table with contents of bucket
output_dataset='data_science'
output_table='store_recs_eval'
output_files = "gs://{}/store_recs/development/evaluation/part-*".format(bucket)

subprocess.check_call(
    'bq load --source_format NEWLINE_DELIMITED_JSON '
    '--replace '
    '--autodetect '
    '{dataset}.{table} {files}'.format(
        dataset=output_dataset, table=output_table, files=output_files
    ).split())

## Clean up

In [None]:
# Remove the BigQuery table
subprocess.check_call(
"bq rm -f data_science.store_recs_dev_train", shell=True)

## Check out predictions

In [None]:
top_200_strings.createTempView('recs2')

In [None]:
recs_view = spark.sql("SELECT store_id_original \
                      FROM recs2 \
                      WHERE user_original = 7761389455514436379")

In [None]:
recs_view.show()