# Web data

In this notebook we create a list of userId and churn predictions.
This will be displayed in the web app, it wouldn't make sense to calculate this in the web app.

We use the previously saved test dataframe to create predictions on.

## Read in dataset

In [1]:
# Imports
import findspark
findspark.init()
findspark.find()
import pyspark

In [2]:
# Imports for creating spark session
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('sparkify-capstone-web-data').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [3]:
# Imports for creating the web data
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import GBTClassificationModel
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType

In [4]:
# Read in dataset
path = "out/transformed.parquet"
df = spark.read.parquet(path)

# Load the model

Load the model, prepare for transformation and show some records to verify.

In [5]:
model = GBTClassificationModel.load("out/model")

In [6]:
model

GBTClassificationModel (uid=GBTClassifier_4a53f6bf6bbe) with 120 trees

In [7]:
# Since values are normalized we can shrink down to unique users
df = df.dropDuplicates(["userId"])

# Drop churn column we will predict this
df = df.drop("churn")

In [8]:
df.show()

+------+-----------+------------+-------------+---------------+------------+-------------+------------------+---------------+
|userId|level_index|gender_index|thumbs_up_sum|thumbs_down_sum|nextsong_sum|downgrade_sum|        length_sum|sessionId_count|
+------+-----------+------------+-------------+---------------+------------+-------------+------------------+---------------+
|   148|        1.0|         0.0|           10|              4|         307|            0| 76207.76939999999|             10|
|200049|        1.0|         0.0|            3|              3|          26|            0|        7730.19557|              3|
|300040|        1.0|         1.0|           52|             10|         524|            0| 130921.5059899999|              8|
|    85|        1.0|         0.0|          116|             33|        2223|           22| 550835.0803399995|             32|
|   137|        1.0|         0.0|            2|              0|          49|            0|11998.723940000002|         

In [13]:
df.dtypes

[('userId', 'int'),
 ('level_index', 'double'),
 ('gender_index', 'double'),
 ('thumbs_up_sum', 'bigint'),
 ('thumbs_down_sum', 'bigint'),
 ('nextsong_sum', 'bigint'),
 ('downgrade_sum', 'bigint'),
 ('length_sum', 'double'),
 ('sessionId_count', 'bigint')]

# Create the prediction dataframe

We will calculate the probability and predictions for each user.
This could have probably been done much easier if I left the user id in the feature set. But I didn't know
back then that the algorithm can choose the input columns. Anyway this is a good excercise.

In [9]:
# Use a struct to create the new dataframe.
schema = StructType([
    StructField("prediction", DoubleType(), False),
    StructField("userId", IntegerType(), False),
])

df_predictions = spark.createDataFrame([], schema)

In [14]:
# Using a udf was a bit tricky, when accessing df or model, some thread lock exceptions were thrown.
# Using a broadcasted variable did not help. So now we iterate over each row, create the feature vector with VectorAssembler,
# make the prediction and add the row to the new dataframe.

# Iterate user rows
for row in df.toPandas().iterrows():

    # Get userId
    userId = lit(row[1][0])

    # Prepare dictionary for feature dataframe
    features_dict = [{
        "level_index": float(row[1][1]),
        "gender_index": float(row[1][2]), 
        "thumbs_up_sum": int(row[1][3]),
        "thumbs_down_sum": int(row[1][4]),
        "nextsong_sum": int(row[1][5]),
        "downgrade_sum": int(row[1][6]),
        "length_sum": float(row[1][7]),
        "sessionId_count": int(row[1][8]),
    }]
    
    df_user_row = spark.createDataFrame(features_dict)
    
    # Create feature dataframe with VectorAssembler
    df_features = VectorAssembler(inputCols = \
                         ["level_index", "gender_index", "thumbs_up_sum", "thumbs_down_sum", \
                          "nextsong_sum", "downgrade_sum", "length_sum", "sessionId_count"], \
                         outputCol = "features").transform(df_user_row)
    
    # We now only need the features
    df_features = df_features.select("features")
    
    # Predict
    prediction = model.transform(df_features)
    # Add userId
    prediction = prediction.withColumn("userId", userId)
    
    # Drop unneeded columns
    prediction = prediction.drop("features")
    prediction = prediction.drop("rawPrediction")
    prediction = prediction.drop("probability")
    
    # Add to new dataframe
    df_predictions = df_predictions.union(prediction)

In [15]:
df_predictions.show()

+----------+--------+
|prediction|  userId|
+----------+--------+
|       0.0|   148.0|
|       0.0|200049.0|
|       0.0|300040.0|
|       1.0|    85.0|
|       0.0|   137.0|
|       0.0|   251.0|
|       0.0|200031.0|
|       0.0|300044.0|
|       0.0|    65.0|
|       0.0|200001.0|
|       0.0|    53.0|
|       0.0|   255.0|
|       0.0|   133.0|
|       1.0|   296.0|
|       1.0|100003.0|
|       1.0|200021.0|
|       0.0|    78.0|
|       0.0|100007.0|
|       1.0|100042.0|
|       0.0|100035.0|
+----------+--------+
only showing top 20 rows



In [16]:
# Write to parquet file for web app
df_predictions.write.parquet("out/predictions.parquet")

In [17]:
# Output the notebook to an html file
from subprocess import call
call(['python', '-m', 'nbconvert', 'web.ipynb'])

0