In [None]:
import datetime
import logging

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType
from pyspark.sql.functions import desc
from pyspark.sql.functions import concat, col, lit
from pyspark.sql import Row, functions as F 
from pyspark.sql.window import Window
from pyspark.mllib.recommendation import ALS,MatrixFactorizationModel, Rating
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [None]:
def split_json_lines(bigline):
    split_line = bigline[1:-1].split('}{')
    all_lines = []
    for s in split_line:
        all_lines.append('{' + s + '}')
    print("Returning {} lines".format(len(all_lines)))
    return all_lines

In [None]:
logger = logging.getLogger(__name__)
spark = SparkSession \
    .builder \
    .appName("ReadInData") \`
    .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
logger.setLevel(logging.INFO)
logger.info("Reading in kinesis data")
schemaString = (
   "authorKey contentKey pathRoot publishedDate referrer"
   "siteKey timeStamp userAgent userId userStatus")

schema = StructType(
    [StructField(field_name, StringType(), True)
     for field_name in schemaString.split()]
)
userdata_df = spark.createDataFrame([], schema)
userdata7_df = spark.createDataFrame([], schema)

In [None]:
for day in range(23, 24):
    global userdata7_df
    DATA_PREFIX = "s3a://247machinelearning-curated/kinesis_analytics/{date.year}/{date.month:02d}/{date.day:02d}/*"
    logger.info("Reading in data from %d days ago", day)
    START_DATE = datetime.date(2017, 10, 1)
    date = START_DATE + datetime.timedelta(days=day - 1)
    raw_text_rdd = sc.textFile(DATA_PREFIX.format(date=date))
    raw_text_rdd.coalesce(32)
    split_text_rdd = raw_text_rdd.flatMap(split_json_lines)
    tmp_df = spark\
        .read\
        .json(split_text_rdd)
        userdata_df = userdata_df.union(tmp_df)
    tmp_df.unpersist()
    raw_text_rdd.unpersist()
    split_text_rdd.unpersist()
userdata_df.repartition(32).alias('userdata7')
userdata_df.cache()
logger.info("Done reading in data")

In [None]:
userdata_df.createOrReplaceTempView("userdata")

In [None]:
userKey_counts = spark.sql("SELECT userId, COUNT(*) AS user_hits "
    "FROM userdata GROUP BY userId ORDER BY user_hits DESC")
user_df = userKey_counts.filter( \
    userKey_counts.user_id <> "null").select( \
    "user_id", "user_hits", \
 F.row_number().over( \
     Window.orderBy(desc("user_hits"))).alias("user_id_int"))

user_df.createOrReplaceTempView("user")

contentKey_counts = spark.sql("SELECT content_key, COUNT(*) \
    AS content_hits FROM userdata GROUP BY content_key ORDER BY content_hits DESC") 



In [None]:
als_df = spark.sql("SELECT user_id_int, content_key_int, user_content_hits \
FROM user AS u \
INNER JOIN user_content AS UCC ON u.user_id = UCC.user_id \ 
INNER JOIN content AS C on UCC.content_key = C.content_key")

In [None]:
als_df.cache()

Build training and test data sets

In [None]:
training, test = als_df.randomSplit([0.8, 0.2], seed = 24)

Set the parameters for fitting the model using the Alternating Least Squares algorithm

In [None]:
als = ALS(maxIter = 10, regParam = 0.01, userCol = "user_id_int", itemCol = "content_key_int", 
          ratingCol = "user_content_hits", implicitPrefs = True, 
          coldStartStrategy="drop")

Train the model using the training data set

In [None]:
model = als.fit(training)

Generate predictions for the training data set and the test data set

In [None]:
train_predictions = moodel.transform(training)
predictions = model.transform(test)

In [None]:
evaluator = RegressionEvaluator(metricName = "rmse", labelCol = "user_content_hits", predictionCol = "prediction")

In [None]:
rmse = evaluator.evaluate(predictions)
rmse_train = evaluator.evaluate(train_predictions)

Training RMSE: 8.34113047446   
Test RMSE: 8.3101470766

In [None]:
single_user = test.filter(test.user_id_int = 8).select("*")
ten_users = test.filter(test.user_id_int < 11).select("*")

In [None]:
single_user_rec = model.transform(single_user)
ten_user_rec = model.transform(ten_users)

In [None]:
single_user_rec.createOrReplaceTempView("single_user")
single_user_rec.orderBy("prediction", ascending = False).show()
ten_user_rec.createOrReplaceTempView("ten_user")

Subset data in order to build recommendations for the top 10 users, based on user hits

In [None]:
ten_users = test.filter(test.user_id_int < 11).select("*")

Show data fed into ALS algorithm for top ten users

`>>> ten_user.show()`


|user_id_int|content_key_int|user_content_hits|
|-----------|---------------|-----------------|
|          6|              8|               96|
|          7|              8|               33|
|          9|            135|                2|
|          8|             31|                2|
|          8|            407|                1|
|          3|            697|               19|
|          9|             94|                1|
|          6|              1|               45|
|          7|              1|               38|
|          9|              3|               28|
|          4|             10|                2|
|          1|            127|                1|
|          8|            172|                2|
|          3|            132|                8|
|          8|             12|               13|
|          8|            132|                1|
|          9|            493|                1|
|          4|              6|              158|
|          7|             36|               36|
|          7|            113|               12|

Show ALS model prediction output from top 10 users

`>>> ten_user.show()`

|user_id_int|content_key_int|user_content_hits|  prediction|
|-----------|---------------|-----------------|------------|
|          8|             12|               13|   1.4914155|
|          9|              3|               28|   0.9609676|
|          6|              1|               45|  0.75494903|
|          7|              1|               38|   0.6643761|
|          8|             31|                2|   0.6480088|
|          7|              8|               33|   0.5364247|
|          7|             36|               36|  0.50630975|
|          7|            113|               12|  0.22422674|
|          6|              8|               96|  0.18645664|
|          8|            132|                1|  0.07000649|
|          1|            127|                1| 0.062984094|
|          9|            135|                2| 0.041216828|
|          3|            132|                8|  0.03759442|
|          8|            172|                2|  0.03156671|
|          8|            407|                1| 0.012930483|
|          9|            493|                1|0.0017545973|
|          3|            697|               19|9.2680776E-4|
|          4|             10|                2|-0.024427962|
|          9|             94|                1| -0.13382056|
|          4|              6|              158| -0.85929865|


In [None]:
content_unique = spark.sql("SELECT COUNT(DISTINCT content_key) AS contents FROM userdata ")
pathroot_unique = spark.sql("SELECT COUNT(DISTINCT path_root) AS path_roots FROM userdata ")
content_pathroot = userdata_df.select(concat(col("content_key"), lit(" "), col("path_root"))).distinct().collect()