## SessionRecommender Demo

## Intialization
import necessary libraries

In [1]:
from zoo.models.recommendation import *
from zoo.models.recommendation.utils import *
from zoo.common.nncontext import init_nncontext
import os
import sys
import datetime as dt

import matplotlib
matplotlib.use("agg")
import matplotlib.pyplot as plt
%pylab inline

Prepending /Users/rbilw002/.local/share/virtualenvs/srPython-S2LVl484/lib/python3.6/site-packages/bigdl/share/conf/spark-bigdl.conf to sys.path
Adding /Users/rbilw002/.local/share/virtualenvs/srPython-S2LVl484/lib/python3.6/site-packages/zoo/share/lib/analytics-zoo-bigdl_0.8.0-spark_2.4.3-0.5.1-jar-with-dependencies.jar to BIGDL_JARS
Prepending /Users/rbilw002/.local/share/virtualenvs/srPython-S2LVl484/lib/python3.6/site-packages/zoo/share/conf/spark-analytics-zoo.conf to sys.path
Populating the interactive namespace from numpy and matplotlib


Initialize NN context, it will get a SparkContext with optimized configuration for BigDL performance

In [2]:
sc = init_nncontext("SessionRecommender Example")
sqlContext = SQLContext(sc)

## Data Preparation
Download and read session data, understand the dimension.

In [3]:
raw_df = sqlContext.read.option("header", "false") \
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") \
.json("./atcHistory.json")

In [4]:
raw_df.show()

+---------+--------------------+--------------------+-------------------+
| AGENT_ID|             ATC_SEQ|          PURCH_HIST|         SESSION_ID|
+---------+--------------------+--------------------+-------------------+
|  1209985|[4383.0, 554.0, 2...|           [20068.0]|  85747132003130873|
|144083740|        [84.0, 46.0]|              [46.0]|5053574821831114024|
|144449078|   [11133.0, 3992.0]|[10893.0, 3992.0,...|7169459224267076906|
|190853148|[113.0, 419.0, 70...|[2573.0, 3705.0, ...|2718032582536236849|
|190853148|[419.0, 587.0, 21...|[2573.0, 3705.0, ...|3173245714239814804|
|190853148|[587.0, 419.0, 21...|[2573.0, 3705.0, ...|5334404265190632876|
|190853148|[419.0, 19984.0, ...|[2573.0, 3705.0, ...|1168841746852385152|
|190853148|[224.0, 113.0, 41...|[2573.0, 3705.0, ...|2050313387125242837|
|190853148|       [113.0, 70.0]|[2573.0, 3705.0, ...|4957432675248974378|
|190853148|      [419.0, 211.0]|[2573.0, 3705.0, ...|4366140795879791705|
|190853148|[419.0, 587.0, 54...|[2573.

In [5]:
item_count = max(raw_df.select("ATC_SEQ").rdd.map(lambda x: max(x[0])).collect())
user_count = float(raw_df.select("AGENT_ID").distinct().count())

Create udf functions to preprocess training data

In [6]:
from pyspark.sql.functions import col, udf, explode, size
from pyspark.sql.types import DoubleType, ArrayType

In [7]:
def slide(seq):
    win = len(seq) + 1
    out = []
    for i in range(1, win):
        sliced = seq[slice(i)]
        out.append(sliced)
    return out

In [8]:
slide_udf = udf(lambda x: slide(x), "array<array<double>>")

In [9]:
def get_label(seq):
    return seq[-1]

In [10]:
get_label_udf = udf(lambda x: get_label(x), DoubleType())

In [11]:
def get_feature(seq):
    win = len(seq) - 1
    return seq[:win]

In [12]:
get_feature_udf = udf(lambda x: get_feature(x), ArrayType(DoubleType()))

In [13]:
def prepad(seq, length):
    win = len(seq)
    if win < length:
        return [0.0] * (length - win) + seq
    else:
        return seq[-length:]

In [14]:
prepad_udf = udf(lambda x: prepad(x, 5), ArrayType(DoubleType()))

Slide atc window to generate more training data, get label, and prepad the atc sequence to a fixed length

In [15]:
session_df = raw_df.withColumn("ATC_SEQ", explode(slide_udf("ATC_SEQ"))) \
    .where(size("ATC_SEQ") > 1) \
    .withColumn("label", get_label_udf("ATC_SEQ")) \
    .withColumn("ATC_SEQ", get_feature_udf("ATC_SEQ")) \
    .withColumn("ATC_SEQ", prepad_udf("ATC_SEQ"))

In [16]:
session_df.show(20, False)

+---------+---------------------------------+-----------------------------------------+-------------------+-------+
|AGENT_ID |ATC_SEQ                          |PURCH_HIST                               |SESSION_ID         |label  |
+---------+---------------------------------+-----------------------------------------+-------------------+-------+
|1209985  |[0.0, 0.0, 0.0, 0.0, 4383.0]     |[20068.0]                                |85747132003130873  |554.0  |
|1209985  |[0.0, 0.0, 0.0, 4383.0, 554.0]   |[20068.0]                                |85747132003130873  |20068.0|
|144083740|[0.0, 0.0, 0.0, 0.0, 84.0]       |[46.0]                                   |5053574821831114024|46.0   |
|144449078|[0.0, 0.0, 0.0, 0.0, 11133.0]    |[10893.0, 3992.0, 6317.0, 1261.0, 7786.0]|7169459224267076906|3992.0 |
|190853148|[0.0, 0.0, 0.0, 0.0, 113.0]      |[2573.0, 3705.0, 27483.0, 9931.0, 3805.0]|2718032582536236849|419.0  |
|190853148|[0.0, 0.0, 0.0, 113.0, 419.0]    |[2573.0, 3705.0, 27483.0, 9

In [18]:
sample = session_df.select("ATC_SEQ", "PURCH_HIST", "label").rdd.map(lambda x: Sample.from_ndarray(np.array(x[0]), np.array(x[2])))

In [19]:
sample.take(1)

[Sample: features: [JTensor: storage: [   0.    0.    0.    0. 4383.], shape: [5], float], labels: [JTensor: storage: [554.], shape: [1], float]]

In [1]:
from session_recommender import SessionRecommender

Prepending /Users/rbilw002/.local/share/virtualenvs/srPython-S2LVl484/lib/python3.6/site-packages/bigdl/share/conf/spark-bigdl.conf to sys.path
Adding /Users/rbilw002/.local/share/virtualenvs/srPython-S2LVl484/lib/python3.6/site-packages/zoo/share/lib/analytics-zoo-bigdl_0.8.0-spark_2.4.3-0.5.1-jar-with-dependencies.jar to BIGDL_JARS
Prepending /Users/rbilw002/.local/share/virtualenvs/srPython-S2LVl484/lib/python3.6/site-packages/zoo/share/conf/spark-analytics-zoo.conf to sys.path


In [2]:
sr = SessionRecommender(item_count=5,
                        item_embed=300,
                        mlp_hidden_layers=[20,20],
                        rnn_hidden_layers=[200,200],
                        include_history=True,
                        seq_length=5,
                        his_length=5)

creating: createZooKerasInput
creating: createZooKerasEmbedding
creating: createZooKerasGRU
creating: createZooKerasGRU
creating: createZooKerasDense
creating: createZooKerasInput
creating: createZooKerasEmbedding
creating: createZooKerasFlatten
creating: createZooKerasDense
creating: createZooKerasDense
creating: createZooKerasDense
creating: createZooKerasMerge
creating: createZooKerasActivation
creating: createZooKerasModel
creating: createZooSessionRecommender


Py4JError: An error occurred while calling o37.createZooSessionRecommender. Trace:
py4j.Py4JException: Method createZooSessionRecommender([class java.lang.Integer, class java.util.ArrayList, class java.util.ArrayList, class java.lang.Boolean, class java.lang.Integer, class java.lang.Integer, class java.lang.String, class com.intel.analytics.zoo.pipeline.api.keras.models.Model]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

