# Sparkify - data preparation large dataset

#### This notebook shows the functions and steps taken to load and prepare the 12GB dataset on AWS EMR.

In [1]:
## LIBRARIES
import pyspark
from pyspark import SparkConf
from pyspark import SparkFiles
from pyspark.sql import SparkSession

from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import TimestampType

from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, desc, countDistinct, first
from pyspark.sql.functions import udf
from pyspark.sql.functions import to_date, year, month, dayofmonth, dayofweek, hour, date_format, substring,datediff
from pyspark.sql.functions import sum as spark_sum, avg as spark_avg

import datetime


VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1709152527769_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
## Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
## Checking if the session is on
spark

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f2c530018b0>

In [4]:
## Reading the full sparkify dataset (I downloaded it from Udacity public link and uploaded it in my own bucket)
event_data = "s3://sparkifylargedataset/sparkify_event_data.json"
df = spark.read.json(event_data)
df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(artist='Popol Vuh', auth='Logged In', firstName='Shlok', gender='M', itemInSession=278, lastName='Johnson', length=524.32934, level='paid', location='Dallas-Fort Worth-Arlington, TX', method='PUT', page='NextSong', registration=1533734541000, sessionId=22683, song='Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1749042')

In [6]:
#loading my functions for data preparation

def remove_missing_userid(df):
    condition = df.userId != ''
    df= df.filter(condition)
    
    return df


# function for churn labeling
def create_churn_label(df):
    ''' This function takes a dataframe and creates churn labels based on the 
    churn definition: Cancellation Confirmation value in 'pages' is considered churn.
    
    Input = Spark dataframe
    
    Output = Spark dataframe with new column 'churn_lab' for each user who churned
    '''
    #labeling all 'pages' with Cancellation Confirmation
    create_churn = udf(lambda x: 1 if x =="Cancellation Confirmation" else 0, IntegerType())
    df = df.withColumn('churn_lab_temp', create_churn('page'))
    
    #extract a list of churn users from the new df
    churn_userid = df.select('userId').where(col('churn_lab_temp')==1).groupby('userId').count()
    churn_userid_list = [row['userId'] for row in churn_userid.collect()]
    
    #new column based on the churn list
    df = df.withColumn('churn_lab', when((df.userId).isin(churn_userid_list), 1).otherwise(0))
    
    #dropping column
    df = df.drop('churn_lab_temp')
    
    return df   


def create_features(df):
    '''This function takes a clean spark dataframe of sparkify records and 
    creates new features for each userId based on different aggregations, then combine all the new feature 
    into a final df called user_df. This user_df is saved into a csv file
  
    Input: df - spark dataframe
    Output: user_df - spark dataframe including one row per userId and new features'''
    
    
    #new features and aggregation by feature
   #allows to aggregate many features in one step
    agg_df1 = df.groupBy('userId').agg(
                             first('gender').alias('gender'), #takes the first occurence of gender for each user
                             countDistinct('artist').alias('unique_artist_count'), #counting the unique artist per userId
                             first('state1').alias('state'), #retrieving state information for each userId
                             spark_sum('length').alias('total_session_length'),
                             spark_avg('length').alias('avg_session_length'),
                             first('churn_lab').alias('churn_label'),
                             countDistinct('song').alias('unique_song_count'),
                             spark_sum('itemInSession').alias('total_items'),
                             spark_avg('itemInSession').alias('avg_items')
       
       
       
   )
   #aggregate by month
    agg_df2 = df.groupBy('userId','month').agg(
                              spark_sum("length").alias("session_length_per_month"),
                              spark_sum('itemInSession').alias('items_per_month'),
                              count('*').alias('count_sessions_per_month')
    )
    
    agg_df2 = agg_df2.groupBy('userId').agg(
                              spark_avg('session_length_per_month').alias('avg_session_length_per_month'),
                              spark_avg('items_per_month').alias('avg_items_per_month'),
                              spark_avg('count_sessions_per_month').alias('avg_count_session_per_month')
    )
    #aggregate by day
    agg_df3 = df.groupBy('userId','ts_todate').agg(
                              spark_sum("length").alias("session_length_per_day"),
                              spark_sum('itemInSession').alias('items_per_day'),
                              count('*').alias('count_sessions_per_day')
    )
    agg_df3 = agg_df3.groupBy('userId').agg(
                              spark_avg('session_length_per_day').alias('avg_session_length_per_day'),
                              spark_avg('items_per_day').alias('avg_items_per_day'),
                              spark_avg('count_sessions_per_day').alias('avg_count_session_per_day')
    )
   
    # aggregate counts per page per user
    page_counts_per_user = df.groupBy('userId', 'page').count()
    agg_df4 = page_counts_per_user.groupBy('userId').pivot('page').sum('count')
    agg_df4 = agg_df4.na.fill(0)



    
    #join all agg_df
    
    user_df = agg_df1.join(agg_df2,['userid']) 
    user_df = user_df.join(agg_df3,['userid'])
    user_df = user_df.join(agg_df4,['userid'])

    return user_df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
print('Here is the first row of the dataset')
df.take(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Here is the first row of the dataset
[Row(artist='Popol Vuh', auth='Logged In', firstName='Shlok', gender='M', itemInSession=278, lastName='Johnson', length=524.32934, level='paid', location='Dallas-Fort Worth-Arlington, TX', method='PUT', page='NextSong', registration=1533734541000, sessionId=22683, song='Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1749042')]

In [8]:
row = df.count()
print(f'The 12GB dataset has {row} rows')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The 12GB dataset has 26259199 rows

In [9]:
print('Here is the data schema')
print(df.printSchema())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Here is the data schema
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

None

In [10]:
print('Removing rows with missing UserId...')
df = remove_missing_userid(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Removing rows with missing UserId...

In [11]:
row2 = df.count()
print(f'New number of rows:{row2}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

New number of rows:26259199

In [12]:
result_df = df.filter(df["UserId"] == '')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
result_df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

In [14]:
#feature drop
print('Dropping unnecessary columns...')
cols = ('firstname','lastname','method','auth','level','sessionId','status','userAgent')
df = df.drop(*cols)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Dropping unnecessary columns...

In [15]:
print('This is the new schema after dropping the columns')
print(df.printSchema())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

This is the new schema after dropping the columns
root
 |-- artist: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- length: double (nullable = true)
 |-- location: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- song: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- userId: string (nullable = true)

None

In [16]:
print('Reformating dates and location data...')
    #change format dates
cols = ("ts","registration","ts_ts","registration_ts")
    
df = df.withColumn('ts_ts', (col('ts') / 1000.0).cast(TimestampType())) \
    .withColumn('ts_todate', to_date('ts_ts')) \
    .withColumn('registration_ts', (col('registration') / 1000.0).cast(TimestampType())) \
    .withColumn('registration_todate', to_date('registration_ts')) \
    .drop(*cols)

df = df.withColumn('month', month('ts_todate'))



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Reformating dates and location data...

In [17]:
df.take(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(artist='Popol Vuh', gender='M', itemInSession=278, length=524.32934, location='Dallas-Fort Worth-Arlington, TX', page='NextSong', song='Ich mache einen Spiegel - Dream Part 4', userId='1749042', ts_todate=datetime.date(2018, 10, 1), registration_todate=datetime.date(2018, 8, 8), month=10)]

In [18]:
#prepare location data
split_col_location = pyspark.sql.functions.split(df['location'], ', ')
df = df.withColumn('state', split_col_location.getItem(1))\

split_col_state = pyspark.sql.functions.split(df['state'], '-')
df = df.withColumn('state1', split_col_state.getItem(0))
print('Creating churn labels based on Cancellation confirmation for each user...')
df = create_churn_label(df)




VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Creating churn labels based on Cancellation confirmation for each user...

In [19]:
#create new features
print('Creating new features ...')

user_df = create_features(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Creating new features ...

In [20]:
print(user_df.printSchema())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- userId: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- unique_artist_count: long (nullable = false)
 |-- state: string (nullable = true)
 |-- total_session_length: double (nullable = true)
 |-- avg_session_length: double (nullable = true)
 |-- churn_label: integer (nullable = true)
 |-- unique_song_count: long (nullable = false)
 |-- total_items: long (nullable = true)
 |-- avg_items: double (nullable = true)
 |-- avg_session_length_per_month: double (nullable = true)
 |-- avg_items_per_month: double (nullable = true)
 |-- avg_count_session_per_month: double (nullable = true)
 |-- avg_session_length_per_day: double (nullable = true)
 |-- avg_items_per_day: double (nullable = true)
 |-- avg_count_session_per_day: double (nullable = true)
 |-- About: long (nullable = true)
 |-- Add Friend: long (nullable = true)
 |-- Add to Playlist: long (nullable = true)
 |-- Cancel: long (nullable = true)
 |-- Cancellation Confirmation: long (nullable = true)
 |-- Downgr

In [21]:
user_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

22278

The dataset to use for model training based on the former 12GB dataset has 22,278 rows, which reflects the number of users in the dataset.

This dataset is written and saved into an S3 bucket to be used for the next steps of the analysis.

In [22]:
user_df.write.json(path = "s3://sparkifylargedataset/user_data_12GB.json", mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
spark.stop()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…