# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [2]:
! pip install pyspark_dist_explore



In [3]:
# import libraries packages needed for the project
import datetime
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, BooleanType, StructType, StructField
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import avg, max, count, when, desc, asc, udf, col, isnan, sort_array, format_string
from pyspark.ml.linalg import Vectors
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier, NaiveBayes, LogisticRegression
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer,VectorAssembler,OneHotEncoder
from pyspark_dist_explore import hist
from pyspark.sql import Window
from pyspark.sql import SQLContext
import matplotlib.pyplot as plt

%matplotlib inline

In [4]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName('Sparkify Project') \
    .getOrCreate()
    


# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [5]:
#load the dataset as asked above
sparkify_event_data = 'mini_sparkify_event_data.json'
df = spark.read.json(sparkify_event_data)
df.persist()

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

In [12]:
def new_df(df):
    print('Clean the data from NAN values')
    df_cleaned = df.dropna(how='any', subset=['gender'])
    print('Exploratory data analysis of gender data')
    gender_df = df_cleaned.select('userId', 'gender').distinct()\
                     .withColumn('check_male', (df_cleaned.gender == 'M').cast(IntegerType()))\
                     .sort('userId')\
                     .select('userId', 'check_male')                  
    gender_df.show(5)
   
    
    print('Exploratory data analysis of pages data...')
    pages = [row[0] for row in df_cleaned.select('page').distinct().collect()]
    pages.remove('Cancel')
    pages.remove('Cancellation Confirmation')
    windowval = Window.partitionBy('userId')
    page_df = df_cleaned
    for page in pages:
        #replace any spaces in page names to without spaces
        page_df = page_df.withColumn(page.replace(' ', ''), Fsum((df_cleaned.page == page).cast(IntegerType())).over(windowval))
    other_pages = []
    for page in pages:
        other_pages.append(page.replace(' ', ''))
    page_df = page_df.select('userId', *other_pages).dropDuplicates().sort('userId')
    page_df.show(5)
    
    print('Exploratory data analysis of songs count for session...')
    songs_df = df_cleaned.select('userId','sessionId','itemInSession') \
                            .groupby('userId','sessionId') \
                            .agg({'itemInSession':'max'}) \
                            .groupby('userId') \
                            .agg( Fsum('max(itemInSession)'), avg('max(itemInSession)'), count('sessionId') ) \
                            .withColumnRenamed('avg(max(itemInSession))','songs_session_a_v') \
                            .withColumnRenamed('sum(max(itemInSession))', 'songs_c') \
                            .withColumnRenamed('count(sessionId)', 'sessions_c') \
                            .sort('userId')
    songs_df.show(5)
    
    print('Exploratory data analysis of lifetime of user from registration timestamp...')
    lifetime_df = df_cleaned.groupby('userId') \
                  .agg({'registration':'min', 'ts':'max'})\
                  .withColumn('lifetime_in_days', (col('max(ts)')-col('min(registration)'))/1000/60/60/24)\
                  .select('userId', 'lifetime_in_days') \
                  .sort('userId')
    lifetime_df.show(5)
    
    print('Exploratory data analysis of distinct status as columns...')
    st_list = [str(row[0]) for row in df_cleaned.select('status').distinct().collect()]
    st_df = df_cleaned
    for status in st_list:
        st_df = st_df.withColumn('status_'+status, Fsum((df_cleaned.status == status).cast(IntegerType())).over(windowval))
    st_df = st_df.select('userId', 'status_307', 'status_404', 'status_200').distinct().sort('userId')
    st_df.show(5)
    
    print('Exploratory data analysis of level of user, paid user gets 1...')
    lev_df = df_cleaned.select('userId', 'level').distinct() \
                      .withColumn('u_lev_paid', (df_cleaned.level == 'paid').cast(IntegerType())) \
                      .groupby('userId') \
                      .agg({'u_lev_paid':'max'}) \
                      .select('userId', 'max(u_lev_paid)') \
                      .withColumnRenamed('max(u_lev_paid)', 'paid_success') \
                      .sort('userId')
    lev_df.show(5)
    
    print('Exploratory data analysis of label of user, for cancellation conformation level is 1...')
    phase_ev = udf(lambda x: 1 if x=='Cancellation Confirmation' else 0, IntegerType())
    lab_df = df_cleaned.withColumn('phase', phase_ev(df_cleaned['page']))
    lab_df = lab_df.withColumn('label', Fsum('phase').over(windowval))
    lab_df = lab_df.select('userId', 'label').dropDuplicates().sort('userId')
    lab_df.show(2)
    
   
    features_df = lab_df.select('userId', 'label').dropDuplicates().join(gender_df, 'userId').join(lev_df, 'userId') \
        .join(lifetime_df, 'userId').join(page_df, 'userId').join(songs_df, 'userId').join(st_df, 'userId').dropDuplicates() \
        .sort('userId')
    features = list(features_df.columns)
    features.remove('userId')
    features.remove('label')
    print(features)
    return(features_df, features)
features_df, features = new_df(df=df)

Clean the data from NAN values
Exploratory data analysis of gender data
+------+----------+
|userId|check_male|
+------+----------+
|    10|         1|
|   100|         1|
|100001|         0|
|100002|         0|
|100003|         0|
+------+----------+
only showing top 5 rows

Exploratory data analysis of pages data...
+------+---------------+----------+----+---------+----------+------+------------+-----+--------+-------------+---------+--------+--------+----+-------+-----+-------------+
|userId|SubmitDowngrade|ThumbsDown|Home|Downgrade|RollAdvert|Logout|SaveSettings|About|Settings|AddtoPlaylist|AddFriend|NextSong|ThumbsUp|Help|Upgrade|Error|SubmitUpgrade|
+------+---------------+----------+----+---------+----------+------+------------+-----+--------+-------------+---------+--------+--------+----+-------+-----+-------------+
|    10|              0|         4|  30|        7|         1|    11|           1|    2|       7|            9|       12|     673|      37|   1|      0|    0|       

In [13]:
#features_df, features = new_df(df=df)

In [6]:
import os
from flask import Flask
from flask import render_template, request, jsonify
app = Flask(__name__)
@app.route("/sparkify_pr")
def upload():
    return render_template("sparkify_pr.html")
    
@app.route('/new_df', methods=['POST'])
def new_df(df):
    print('Clean the data from NAN values')
    df_cleaned = df.dropna(how='any', subset=['gender'])
    print('Exploratory data analysis of gender data')
    gender_df = df_cleaned.select('userId', 'gender').distinct()\
                     .withColumn('check_male', (df_cleaned.gender == 'M').cast(IntegerType()))\
                     .sort('userId')\
                     .select('userId', 'check_male')                  
    gender_df.show(5)
   
    
    print('Exploratory data analysis of pages data...')
    pages = [row[0] for row in df_cleaned.select('page').distinct().collect()]
    pages.remove('Cancel')
    pages.remove('Cancellation Confirmation')
    windowval = Window.partitionBy('userId')
    page_df = df_cleaned
    for page in pages:
        #replace any spaces in page names to without spaces
        page_df = page_df.withColumn(page.replace(' ', ''), Fsum((df_cleaned.page == page).cast(IntegerType())).over(windowval))
    other_pages = []
    for page in pages:
        other_pages.append(page.replace(' ', ''))
    page_df = page_df.select('userId', *other_pages).dropDuplicates().sort('userId')
    page_df.show(5)
    
    print('Exploratory data analysis of songs count for session...')
    songs_df = df_cleaned.select('userId','sessionId','itemInSession') \
                            .groupby('userId','sessionId') \
                            .agg({'itemInSession':'max'}) \
                            .groupby('userId') \
                            .agg( Fsum('max(itemInSession)'), avg('max(itemInSession)'), count('sessionId') ) \
                            .withColumnRenamed('avg(max(itemInSession))','songs_session_a_v') \
                            .withColumnRenamed('sum(max(itemInSession))', 'songs_c') \
                            .withColumnRenamed('count(sessionId)', 'sessions_c') \
                            .sort('userId')
    songs_df.show(5)
    
    print('Exploratory data analysis of lifetime of user from registration timestamp...')
    lifetime_df = df_cleaned.groupby('userId') \
                  .agg({'registration':'min', 'ts':'max'})\
                  .withColumn('lifetime_in_days', (col('max(ts)')-col('min(registration)'))/1000/60/60/24)\
                  .select('userId', 'lifetime_in_days') \
                  .sort('userId')
    lifetime_df.show(5)
    
    print('Exploratory data analysis of distinct status as columns...')
    st_list = [str(row[0]) for row in df_cleaned.select('status').distinct().collect()]
    st_df = df_cleaned
    for status in st_list:
        st_df = st_df.withColumn('status_'+status, Fsum((df_cleaned.status == status).cast(IntegerType())).over(windowval))
    st_df = st_df.select('userId', 'status_307', 'status_404', 'status_200').distinct().sort('userId')
    st_df.show(5)
    
    print('Exploratory data analysis of level of user, paid user gets 1...')
    lev_df = df_cleaned.select('userId', 'level').distinct() \
                      .withColumn('u_lev_paid', (df_cleaned.level == 'paid').cast(IntegerType())) \
                      .groupby('userId') \
                      .agg({'u_lev_paid':'max'}) \
                      .select('userId', 'max(u_lev_paid)') \
                      .withColumnRenamed('max(u_lev_paid)', 'paid_success') \
                      .sort('userId')
    lev_df.show(5)
    
    print('Exploratory data analysis of label of user, for cancellation conformation level is 1...')
    phase_ev = udf(lambda x: 1 if x=='Cancellation Confirmation' else 0, IntegerType())
    lab_df = df_cleaned.withColumn('phase', phase_ev(df_cleaned['page']))
    lab_df = lab_df.withColumn('label', Fsum('phase').over(windowval))
    lab_df = lab_df.select('userId', 'label').dropDuplicates().sort('userId')
    lab_df.show(2)
    
   
    features_df = lab_df.select('userId', 'label').dropDuplicates().join(gender_df, 'userId').join(lev_df, 'userId') \
        .join(lifetime_df, 'userId').join(page_df, 'userId').join(songs_df, 'userId').join(st_df, 'userId').dropDuplicates() \
        .sort('userId')
    features = list(features_df.columns)
    features.remove('userId')
    features.remove('label')
    print(features)
    return(features_df, features)
    features_df, features = new_df(df=df)
#if __name__ == "__main__":
    #app.run(port=4010, debug=True)

In [8]:
#if __name__ == "__main__":
    #app.run(port=4018, debug=True)