# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

# Importing Libraries and Creating Spark Session

In [1]:
# import libraries
import pyspark
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import (sum, col, when, lit, round, from_unixtime, split, 
                                   concat, regexp_replace, percentile,
                                   count, countDistinct, max, min, avg, 
                                   date_format, date_part, date_trunc, to_timestamp
                                  )

import datetime
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt


In [2]:
# Apply the changes to the current session
import os
os.environ['JAVA_HOME'] = '/workspace/jdk'
os.environ['PATH'] = f"{os.environ['JAVA_HOME']}/bin:{os.environ['PATH']}"

In [3]:
# create a Spark session
spark = SparkSession.builder.appName("Sparkify Project").getOrCreate()

/opt/venv/lib/python3.10/site-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/16 02:21:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
def display_pandas_dataframe(df_filtered, number_limit = 5):
    df_limited = df_filtered.limit(number_limit)
    return df_limited.toPandas()

# Load Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [None]:
##reading json
df = spark.read.json("mini_sparkify_event_data.json")

In [None]:
##displaying a sample of data
df.limit(3).show()

In [None]:
##pltting in a pandas df to a better visualization 
display_pandas_dataframe(df)

In [None]:
# print the schema
df.printSchema()

The dataframe has 18 columns, 12 columns being a string value, and the other 6 columns beign numerical values. 

In [None]:
# print the schema
df.count()

There are 286,5K lines in the Dataframe. 

# Clean Dataset

1. Looking for null values

In [None]:
#list of columns
list_of_columns = df.columns

#creating dict to store null values
dict_null_values = {col_name: df.filter(col(f'{col_name}').isNull()).count() for col_name in list_of_columns}
print(dict_null_values)

There are a few null values in the dataset, mainly in artist, firstName, lastName, gender, leght, location, registration, song and userAgent column. Let's check some rows containing these null records.

In [None]:
display_pandas_dataframe(df.filter(col('artist').isNull()))

In [None]:
(df.withColumn('is_none_artist', 
               when(col('artist').isNull(), lit('Null value'))
               .otherwise(lit('Not null value'))
              )
   .withColumn('is_none_song', 
               when(col('song').isNull(), lit('Null value'))
               .otherwise(lit('Not null value'))
              )
   .withColumn('is_none_length', 
               when(col('length').isNull(), lit('Null value'))
               .otherwise(lit('Not null value'))
              )
).groupBy('is_none_artist', 'is_none_song', 'is_none_length').count().show()

The null values in artist, song and length records seems to be related with events not including an interaction with any music. So, we will fill these null values with this information.

In [None]:
df = (df
      .withColumn('artist', when(col('artist').isNull(), lit('no_music_info')).otherwise(col('artist')))
      .withColumn('song', when(col('song').isNull(), lit('no_music_info')).otherwise(col('song')))
     )

In [None]:
display_pandas_dataframe(df.filter(col('location').isNull()))

It seems that a relation between null values in First Name, Last Name, Location, Gender, registration, userAgent with Logged Out sessions.

In [None]:
(df.withColumn('is_none_gender', 
               when(col('gender').isNull(), lit('Null value'))
               .otherwise(lit('Not null value'))
              )
   .withColumn('is_none_location', 
               when(col('location').isNull(), lit('Null value'))
               .otherwise(lit('Not null value'))
              )
   .withColumn('is_none_registration', 
               when(col('location').isNull(), lit('Null value'))
               .otherwise(lit('Not null value'))
              )
   .withColumn('is_none_userAgent', 
               when(col('userAgent').isNull(), lit('Null value'))
               .otherwise(lit('Not null value'))
              )
).groupBy('auth',
          'is_none_gender', 'is_none_location', 
          'is_none_registration', 'is_none_userAgent').count().show()

How we can not know of kow is the user acessing the app in Logged Out Sessions, or Guest Sessions, information about these types of interactions can not be helpful to the Churn Analysis. Also, the represent a small portion of dataset. Due to this, there records will be dropped.

In [None]:
df = df.filter(col('auth').isin(['Logged In', 'Cancelled']))

In [None]:
#list of columns
list_of_columns = df.columns

#creating dict to store null values
dict_null_values = {col_name: df.filter(col(f'{col_name}').isNull()).count() for col_name in list_of_columns}
print(dict_null_values)

Right now, there is no  null value in te dataset, besides the length column, that has null values in cases of song/artist with no music information. 

2. Checking Data Types

In [None]:
display_pandas_dataframe(df).dtypes

There is only one problem with dtypes: datetime information should be in datetime format. This must be true even in registration columns, as in ts column. Let's do this convertion.

In [None]:
df = (df.withColumn('registration_datetime', 
                    from_unixtime(col('registration')/1000, 
                                  "yyyy-MM-dd HH:mm:ss")
                   )
         .withColumn('ts_datetime', 
                    from_unixtime(col('ts')/1000, 
                                  "yyyy-MM-dd HH:mm:ss")
                   )
     )

In [None]:
df.withColumn('registration_datetime', from_unixtime(col('registration')/1000, "yyyy-MM-dd HH:mm:ss")).select('registration_datetime', 'registration', 'ts', 'ts_datetime').limit(2).show()

In [None]:
df = (df
      .withColumn('ts', col('ts_datetime'))
      .withColumn('registration', col('registration_datetime'))
      .drop('registration_datetime', 'ts_datetime')
     )

In [None]:
display_pandas_dataframe(df).dtypes

In [None]:
display_pandas_dataframe(df.groupBy('userAgent').count().orderBy(col('count').desc()), 
                         number_limit = 100)

3. Changing specific columns not so useful: first_name and last_name, location, userAgent

In [None]:
df = (df.withColumn('city', split(col('location'), ",").getItem(0))
        .withColumn('state', split(col('location'), ",").getItem(1))
      .drop('location')
     )

In [None]:
display_pandas_dataframe(df.groupBy('city').count())

In [None]:
df = (df.withColumn('fullName', concat(concat(col('firstName'), lit(" ")), col('lastName')))
      .drop('firstName', 'lastName')
     )

In [None]:
df = (df.withColumn('new_userAgent', regexp_replace(col('userAgent'),"\"|\'", ""))
        .withColumn('new_userAgent', regexp_replace(col('new_userAgent'),"Mozilla/5.0", ""))
        .withColumn('operacional_system', when(col('new_userAgent').like('%Windows%'), lit('Windows'))
                                         .when(col('new_userAgent').like('%Ubuntu%'), lit('Ubuntu/Linux'))
                                         .when(col('new_userAgent').like('%Linux%'), lit('Ubuntu/Linux'))
                                         .when(col('new_userAgent').like('%Mobi%'), lit('Mobi'))
                                         .when(col('new_userAgent').like('%Mac%'), lit('Mac'))
                   )
         .withColumn('userAgent', col('operacional_system'))
         .drop('new_userAgent', 'operacional_system')
     )

In [None]:
display_pandas_dataframe(df)

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

In [None]:
#summary statistics of numerical values. 
#Columns as registration and ts will not be included because, 
#besides the fact that they are numerical, the meaning of these columns is a datetime.
df.select('itemInSession', 'length', 'sessionId', 'status').describe().show()

There are sessions in the dataset from just one interaction in the dataset to sessions with a thousand of interactions.

The length of song has a mean of 249 s, and the derivation is high, with a stddev of 100. Also, there are a few musics with less than 1 second, which must indicate a bug or wrong field.A lenght of 1321 seems to not be real too. 

The sessions are from 1 to 2474, and, as expected, the status value has the major records with 200.

As we found a lot of max and mins stranges at item length, let's see the distribution. 

In [None]:
sns.distplot(df.select('length').toPandas().dropna())

In [None]:
df.select('length').select(round(percentile(col('length'), 1), 2).alias('max'),
                          round(percentile(col('length'), 0.999), 2).alias('p_99.9'),
                          round(percentile(col('length'), 0.99), 2).alias('_p99'),
                          round(percentile(col('length'), 0.95), 2).alias('_p95'),
                          round(percentile(col('length'), 0.90), 2).alias('_p90'),
                          round(percentile(col('length'), 0.50), 2).alias('_p50')
                          ).show()

As expected, the existence of outliers is clear, based on this right-skewed long-tail distribution. Also, the distributions indicate that the percentile 99 had a length of 591, far away from the max 3024 observed in max. 

In [None]:
(df.filter(col('artist')!='no_music_info')
   .groupBy('artist')
   .count()
   .orderBy(col('count').desc())
   .withColumn('total', sum(col('count')).over(Window.partitionBy(lit(1))))
   .withColumn('percent', round(col('count')/col('total'), 4))
).show()

There is no an artist or song that has a higher percent of interactions in the app. However, there is a few artists that has a bbigger interactions than all the others. The top 2 are: Kings Of Leon, Coldplay, with more than 1,5% of songs played togheter. 

In [None]:
(df.groupBy('gender')
   .count()
   .orderBy(col('count').desc())
   .withColumn('total', sum(col('count')).over(Window.partitionBy(lit(1))))
   .withColumn('percent', round(col('count')/col('total'), 4))
).show()

The public between Females and Males in weel distributed.

In [None]:
(df.groupBy('level')
   .agg(count(col('userId')).alias('count'),
        countDistinct(col('userId')).alias('users')
       )
   .orderBy(col('count').desc())
   .withColumn('total', sum(col('count')).over(Window.partitionBy(lit(1))))
   .withColumn('total_users', sum(col('users')).over(Window.partitionBy(lit(1))))
   .withColumn('percent', round(col('count')/col('total'), 4))
   .withColumn('percent_users', round(col('users')/col('total_users'), 4))
).show()

Plus than 75% of interactions are made on paid accounts. However, this doens't means that we has more paid accounts in the app, they are just the public with more interactions.
More the 54% of users, actually, are in free accounts.

In [None]:
pandas_fd = (df.groupBy('page', 'level').count().orderBy((col('count').desc())).toPandas())
sns.barplot(data=pandas_fd, x='count', y='page', hue='level', palette = 'pastel')

Paid and Free accounts in general has similar interactions, but with a lower frequency in free acounts. Also, there  are a few pages that are more comom percentually in free accounts, for example, Roll Advert.

In [None]:
(df.groupBy('city', 'state')
   .count()
   .orderBy(col('count').desc())
   .withColumn('total', sum(col('count')).over(Window.partitionBy(lit(1))))
   .withColumn('percent', round(col('count')/col('total'), 4))
).show()

In [None]:
(df.groupBy('userAgent')
   .count()
   .orderBy(col('count').desc())
   .withColumn('total', sum(col('count')).over(Window.partitionBy(lit(1))))
   .withColumn('percent', round(col('count')/col('total'), 4))
).show()

Finally, plus than 10% of the interactions are in LA, and more than a half of interactions are in Windows systems.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

In [None]:
(df.groupBy('auth')
   .count()
   .orderBy(col('count').desc())
   .withColumn('total', sum(col('count')).over(Window.partitionBy(lit(1))))
   .withColumn('percent', round(col('count')/col('total'), 4))
).show()

There is 52 records with the flag Cancelled in auth information. Let's take a look in pages info.

In [None]:
(df.groupBy('page', 'auth')
   .count()
   .orderBy(col('count').desc())
   .withColumn('total', sum(col('count')).over(Window.partitionBy(lit(1))))
   .withColumn('percent', round(col('count')/col('total'), 4))
).show()

In [None]:
(df.filter(col('page')=='Cancellation Confirmation')
   .groupBy('level', 'auth')
   .count()
   .orderBy(col('count').desc())
   .withColumn('total', sum(col('count')).over(Window.partitionBy(lit(1))))
   .withColumn('percent', round(col('count')/col('total'), 4))
).show()

All the auth Cancelled records are registered in the same page equal to Cancellation Confirmed. It seems to be a reliable information of Cancelled account. 
Let's create the churn column. We will create a column 'is_churn_event' and a 'is_churn user'. The first one will just give the info of churn in a event level, while the second one will give information in a user level.

In [None]:
df_churn = (df.withColumn('is_churn_event', 
                          when(col('page')=='Cancellation Confirmation', lit(1))
                          .otherwise(lit(0))
             )
              .withColumn('is_user_churn', 
                          max(col('is_churn_event')
                             ).over(Window.partitionBy(col('userId')))
             )
           )

In [None]:
display_pandas_dataframe((df_churn.filter(col('is_churn_event')==1)))

In [None]:
display_pandas_dataframe(df_churn.filter(col('sessionId')==174).filter(col('userId')==125), number_limit=15)

Let's take a look in Downgrade Event.

In [None]:
(df_churn.filter(col('page').isin('Downgrade'))
         .groupBy('is_user_churn', 'is_churn_event', 'page')
         .agg(countDistinct(col('userId')).alias('users'))
).show()

In [None]:
(df_churn.groupBy('is_user_churn')
         .agg(countDistinct(col('userId')).alias('users'))
).show()

Besides every user, the tax churn rate is 23%. Between users that visited the Downgrade page, the rate is 35%. Let's create a column to indicates the downgrade too.

In [None]:
df_churn = (df_churn.withColumn('is_downgrade_event', 
                          when(col('page')=='Downgrade', lit(1))
                          .otherwise(lit(0))
             )
              .withColumn('times_user_downgrade', 
                          sum(col('is_downgrade_event')
                             ).over(Window.partitionBy(col('userId')))
             )
           )

In [None]:
display_pandas_dataframe(df_churn)

Also, a user just downgrade their plan if he had once a paid plan. Let's create a column to indicate if the user had onde the paid account.

In [None]:
df_churn = (df_churn.withColumn('had_paid_account', 
                          when(col('level')=='paid', lit(1))
                          .otherwise(lit(0))
             )
              .withColumn('had_paid_account', 
                          max(col('had_paid_account')
                             ).over(Window.partitionBy(col('userId')))
             )
           )

In [None]:
(df_churn.groupBy('is_user_churn', 'had_paid_account')
         .agg(avg(col('times_user_downgrade')).alias('downgrades_average'),
              countDistinct(col('userId')).alias('users'),
              countDistinct(when(col('times_user_downgrade')>0, col('userId'))).alias('users_w_downgrades')
            )
).show()

Based on the results above, the downgrades indicates a propensity to cancellate the account. However, there are a lot of customers that downgraded their plans (119 of 129) that do not churn the app. So, to churn rule, we will ust use the Cancellation Information, even that the downgrade column is created and can be use in the further analysis.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

In [None]:
(df_churn.groupBy('is_user_churn')
         .agg(round(sum(col('length')), 0).alias('time_listen'),
              countDistinct(col('userId')).alias('users'),
              round((sum(col('length'))/countDistinct(col('userId'))), 0).alias('avg_time_listen'),
             )
).show()

In [None]:
(df_churn.groupBy('is_user_churn')
         .agg(count(col('song')).alias('play_times'),
              countDistinct(col('song')).alias('distinct_songs'),
              round(count(col('song'))/countDistinct(col('userId')), 0).alias('avg_time_listen'),
              round(countDistinct(col('song'))/countDistinct(col('userId')), 0).alias('avg_distinct_songs'),
              round(countDistinct(col('artist'))/countDistinct(col('userId')), 0).alias('avg_distinct_artist'),
             )
).show()

In general, users who churned has a lower average time listening songs. Besides the time listening, the number of songs played are lower too. In terms of distinct songs, actually the users who churn has a higher average of distinct songs played, indicating that they used te app to know musics, but heir freuency by song is not big as the users who not churn.

In [None]:
(df_churn.groupBy('is_user_churn', 'userId')
         .agg(countDistinct(col('page')).alias('pages_interaction'))
         .groupBy('is_user_churn')
         .agg(avg(col('pages_interaction')))
).show()

In [None]:
(df_churn.filter(col('page')!='NextSong')
         .groupBy('is_user_churn', 'userId', 'page')
         .agg(count(col('page')).alias('pages_interaction'))
         .groupBy('is_user_churn', 'page')
         .agg(sum(col('pages_interaction')).alias('pages_interaction'))
         .withColumn('total_interactions', 
                     sum(col('pages_interaction')).over(Window.partitionBy(col('is_user_churn')))
                    )
         .withColumn('percent', col('pages_interaction')/col('total_interactions'))
         .groupBy('page')
         .agg(round(sum(when(col('is_user_churn')==1, col('percent')))*100, 2).alias('churn_user_percent'),
              round(sum(when(col('is_user_churn')==0, col('percent')))*100, 2).alias('not_churn_user_percent')
             )
         .orderBy(col('churn_user_percent').desc())
).show()

Based on table above, it seems that both users has similars interactions between pages, excepting the page Roll Advert and Thumbs Down, that seems to be more frequent to churn users. On the other hand, users hnow not churn, has more interactions, in share, with Thumbs Up and sharing with Friends.

In [None]:
(df_churn.groupBy('is_user_churn', 'userId')
         .agg(countDistinct(date_format(col('ts'), 'yyyy-MM-dd')).alias('days_interacting'))
         .groupBy('is_user_churn')
         .agg(avg(col('days_interacting')))
).show()

In [None]:
(df_churn.groupBy('is_user_churn', 'userId')
         .agg(countDistinct(date_part(lit('WEEK'), col('ts'))).alias('days_interacting'))
         .groupBy('is_user_churn')
         .agg(avg(col('days_interacting')))
).show()

The number of days and week interacting besides who not churn is higher than user who churn. 

In [None]:
(df_churn.groupBy('is_user_churn', 'gender')
         .agg(countDistinct(col('userId')).alias('users'))
         .withColumn('user_per_gender', sum(col('users')).over(Window.partitionBy(col('gender'))))
         .withColumn('percent', col('users')/col('user_per_gender'))
).show()

It seems to not have a relation between churn and gender.

In [None]:
(df_churn.withColumn('month_cohort', date_trunc('month', col('registration')))
 .groupBy('month_cohort')
 .agg((countDistinct(when(col('is_user_churn')==1, col('userId')))/countDistinct(col('userId'))).alias('churn_rate'),
      countDistinct(col('userId')).alias('users')
     )
 .orderBy(col('month_cohort'))
).show()

Even with some months the churn rate being high, this is more related with the fact that the numer of users is small, than related to the fact that the churn is really bad.

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

In [None]:
##firstly let's remove the cancellation record
df_churn = df_churn.filter(col('is_churn_event')!=1)

In [None]:
def get_user_info(df, expression, col_names):
    agregacoes = []
    for col_name in col_names:
    # adding expressions
        agregacoes.append(expression(col(col_name)).alias(f"{expression.__name__}_{col_name}"))

    # Realiza o agrupamento e aplica as agregações
    df_agg = df.groupBy('userId').agg(*agregacoes)
    return df_agg

### 1. Song/Artist Features

In [None]:
df_churn = (df_churn.withColumn('date_event', date_format(col('ts'), 'yyyy-MM-dd'))
                    .withColumn('hour_event', date_part(lit('M'), col('ts')))
           )

In [None]:
only_song_events = (df_churn.filter(col('song')!='no_music_info'))
no_song_events = (df_churn.filter(col('song')=='no_music_info'))

unique_song_artist = get_user_info(only_song_events, countDistinct, ['artist', 'song'])
song = get_user_info(only_song_events, count, ['song'])
no_song = get_user_info(no_song_events, count, ['song']).withColumnRenamed('count_song', 'no_song_events')
unique_song_length = get_user_info(only_song_events, sum, ['length'])
days_listining = get_user_info(only_song_events, countDistinct, ['date_event', 'hour_event'])

In [None]:
##agregating all songs info:
song_features_agg = (unique_song_artist
                     .join(song, on =['userId'], how = 'full')
                     .join(no_song, on =['userId'], how = 'full')
                     .join(unique_song_length, on =['userId'], how = 'full')
                     .join(days_listining, on =['userId'], how = 'full')
                     .withColumn('songs_per_day', col('count_song')/col('countDistinct_date_event'))
                     .withColumn('songs_per_hour', col('count_song')/col('countDistinct_hour_event'))
                     .withColumn('length_per_day', col('sum_length')/col('countDistinct_date_event'))
                     .withColumn('minutes_per_day', col('countDistinct_hour_event')/col('countDistinct_date_event'))
                    )

In [None]:
display_pandas_dataframe(song_features_agg, number_limit = 15)

### 2. Interaction in App Features

In [None]:
df_sessions_info = (df.groupBy('userId', 'sessionId')
                      .agg(min(col('ts')).alias('start_date_session'),
                           max(col('ts')).alias('end_date_sessions')
                          )
                      .withColumn('start_date_session', to_timestamp(col('start_date_session')))
                      .withColumn('end_date_sessions', to_timestamp(col('end_date_sessions')))
                      .withColumn('session_duration', 
                                  col('end_date_sessions')-col('start_date_session'))
                    .withColumn("session_duration_minutes",
                                (col("end_date_sessions").cast("long") - col("start_date_session").cast("long")) / 60
                                )
                   )

df_user_interation_info = (df.groupBy('userId')
                      .agg(min(col('ts')).alias('start_date'),
                           max(col('ts')).alias('end_date')
                          )
                      .withColumn('start_date', to_timestamp(col('start_date')))
                      .withColumn('end_date', to_timestamp(col('end_date')))
                    .withColumn("days_between_sessions",
                                (col("end_date").cast("long") - col("start_date").cast("long")) / (60*60*24)
                                )
                      .drop('start_date', 'end_date')
                   )

In [None]:
unique_sessions = get_user_info(df_churn, countDistinct, ['sessionId'])
unique_logs = get_user_info(df_churn, count, ['userId'])
sessions_duration = get_user_info(df_sessions_info, sum, ['session_duration_minutes'])
user_downgrade = get_user_info(df_churn, avg, ['times_user_downgrade'])

session_features_agg = (unique_sessions
                        .join(unique_logs, on =['userId'], how = 'full')
                        .join(sessions_duration, on =['userId'], how = 'full')
                        .join(df_user_interation_info, on =['userId'], how = 'full')
                        .join(user_downgrade, on =['userId'], how = 'full')
                        .withColumn('session_frequency', col('countDistinct_sessionId')/col('days_between_sessions'))
                        .withColumnRenamed('count_userId', 'count_logs')
                        .withColumn('logs_per_sessions', col('count_logs')/col('countDistinct_sessionId'))
                       )

In [None]:
display_pandas_dataframe(session_features_agg, number_limit = 15)

### 3. User Features

In [None]:
df_churn = (df_churn
            .withColumn('gender', when(col('gender')=='F', lit(1)).otherwise(0))
            .withColumn('userAgent_Mac', when(col('userAgent')=='Mac', lit(1)).otherwise(0))
            .withColumn('userAgent_Ubuntu_Linux', when(col('userAgent')=='Ubuntu/Linux', lit(1)).otherwise(0))
            .withColumn('userAgent_Mobi', when(col('userAgent')=='Mobi', lit(1)).otherwise(0))
            .withColumn('userAgent_Windows', when(col('userAgent')=='Windows', lit(1)).otherwise(0))
           )


user_features_agg = get_user_info(df_churn, max, ['gender', 'userAgent_Mac', 'userAgent_Ubuntu_Linux', 
                                                'userAgent_Mobi', 'userAgent_Windows', 'had_paid_account'
                                               ]
                                 )

### 4. Pages Features

In [None]:
pivot_pages = (df_churn.groupBy("userId")
               .pivot("page")
               .count()
               .fillna(0)
              )

new_column_names = [col_name.replace(" ", "_") for col_name in pivot_pages.columns]
pivot_pages = pivot_pages.toDF(*new_column_names)

pages_features_agg = pivot_pages

### 5. Getting all features Toghether

In [None]:
all_features_agg = (song_features_agg
                    .join(session_features_agg, on = ['userId'], how = 'full')
                    .join(user_features_agg, on = ['userId'], how = 'full')
                    .join(pages_features_agg, on = ['userId'], how = 'full')
                   )

In [None]:
display_pandas_dataframe(all_features_agg, 20)

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [None]:
##defining the label 
y_label = df_churn.select('userId', 'is_user_churn').dropDuplicates()

In [None]:
print(f'We will use {len(all_features_agg.columns)} features to model a churn prediction.')

In [None]:
y_label.groupBy('is_user_churn').count().show()

### 5.1 Correlation Analysis

In [None]:
corr_df = all_features_agg.toPandas()

# Plot correlation heatmap
plt.figure(figsize = [21, 15])
plt.title('Features Correlation', y=1.05, size=20)
plt.yticks(rotation = 45)

ax = sns.heatmap(corr_df.iloc[:, 1:].corr().round(2), 
                annot=True, 
                square=False,
                cbar=False, 
                linewidth=0.2,
                cmap = 'Blues',
                vmin = -1, 
                vmax = 1)

bottom, top = ax.get_ylim()
ax.set_ylim(bottom + 0.5, top - 0.5)
plt.show()

There are a lot of fetures that are strongly correlated and these features will be removed from the modeling step. Besides this, we need o redue the number of features in model.


1. Next Songs (highly correlated to all songs features).
2. Count Logs (highly correlated to all songs features and pages features).
3. Sum Sessions Duration Minutes (highly correlated to all songs features and pages features).
4. Downgrade (highly correlated to how many times a user downgrade)
5. Sum Length and Songs Per Hour (highly correlated to distinct songs played)
6. Cancel event, after analyse tat this feature create an overfit model

In [None]:
new_all_features_agg = all_features_agg.drop('NextSong', 'sum_sessions_duration_minutes', 'count_logs', 'Cancel', 'Downgrade',
                                             'songs_per_hour', 'sum_length', 'count_song', 'sum_session_duration_minutes'
                                            ).fillna(0)

In [None]:
corr_df = new_all_features_agg.toPandas()

# Plot correlation heatmap
plt.figure(figsize = [21, 15])
plt.title('Features Correlation', y=1.05, size=20)
plt.yticks(rotation = 45)

ax = sns.heatmap(corr_df.iloc[:, 1:].corr().round(2), 
                annot=True, 
                square=False,
                cbar=False, 
                linewidth=0.2,
                cmap = 'Blues',
                vmin = -1, 
                vmax = 1)

bottom, top = ax.get_ylim()
ax.set_ylim(bottom + 0.5, top - 0.5)
plt.show()

### 5.3. Applying Models

In [None]:
##creating dataset 
data = new_all_features_agg.join(y_label, on = ['userId'], how = 'left').withColumnRenamed('is_user_churn', 'label').drop('userId')

In [None]:
data.groupBy('label').count().show()

In [None]:
data.columns

In [None]:
import time
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier, LogisticRegression, DecisionTreeClassifier, RandomForestClassifier 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

#, Normalizer, MinMaxScaler

In [None]:
def split_data(data, seed=0):
    """
    Description:
        Data will be divided into training and testing subsets. I use a fixed split ratio of 70:30.
    
    Args:
        df (DataFrame): the model dataframe with features.
        seed (int): a seed value of the random number generator.
        
    Returns:
        train (DataFrame): the training subset.
        test (DataFrame): the testing subset.
    """
    
    train, test = data.randomSplit([0.7, 0.3], seed=seed);
    return train, test

In [None]:
def calc_metrics(model, subset):
    """
    Description:
        The model's metrics computed.
    
    Args:
        model: the fitted model.
        subset: the testing/validation subset.
        
    Returns:
        metrics calculated: the fitted model's metrics.
    """
    
    evaluator_multi = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction')
    evaluator = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='prediction', metricName='areaUnderROC')

    predict = model.transform(subset).select('label', 'prediction')

    acc = evaluator_multi.evaluate(predict, {evaluator_multi.metricName: 'accuracy'})
    f1 = evaluator_multi.evaluate(predict, {evaluator_multi.metricName: 'f1'})
    weightedPrecision = evaluator_multi.evaluate(predict, {evaluator_multi.metricName: 'weightedPrecision'})
    weightedRecall = evaluator_multi.evaluate(predict, {evaluator_multi.metricName: 'weightedRecall'})
    auc = evaluator.evaluate(predict)
    
    metrics_calculated = pd.DataFrame(index=['F1', 'accuracy', 'weighted precision', 'weighted recall', 'AUC'],
                                     data={'metrics value': [f1, acc, weightedPrecision, weightedRecall, auc]})
    
    return metrics_calculated

In [None]:
def stating_process(data, seed):
    """
    Description:
        Starting ML process.
    
    Args:
        data (DataFrame): the model data.
        seed (int): a seed value of the random number generator.
    
    Returns:
        train (DataFrame): the training subset.
        test (DataFrame): the testing subset.
        df_features (DataFrame): df with features
        start (Datetime): start time process
    """
    print('Starting Pipeline.')
    start = time.time()
    df_features = data.drop('userId')

    # Split data into train and test
    train, test = split_data(df_features, seed)
    print('Train and Test data created.')
    
    return train, test, df_features, start

def creating_pipeline(df_features, classifier):
    """
    Description:
        Creating ML Pipeline.
    
    Args:
        df_features (DataFrame): the model data.
        classifier (): a machine learning classifier.
    
    Returns:
        scaler: the StandartScaler declared with features  to be used in pipeline.
        assembler: the VectorAssembler to be used in pipeline.
        pipeline: the pipelina with stages defined.
    """
    
    scaler = StandardScaler(inputCol='num_features', outputCol='features')
    assembler = VectorAssembler(inputCols=(df_features.drop('label')).columns, outputCol='num_features')
    pipeline = Pipeline(stages=[assembler, scaler, classifier])
    print('Create pipeline done.')
    
    return scaler, assembler, pipeline

In [None]:
def fit_model(data, classifier, seed=0):
    """
    Description:
        Fits the machine learning model and computes metrics.
    
    Args:
        data (DataFrame): the model data with features.
        classifier (Lib): a machine learning classifier. 
        seed (int): a seed value of the random number generator.
        
    Returns:
        model: the fitted model.
        metrics: the model's metrics.
    """
    
    train, test, df_features, start = stating_process(data, seed)
    print('Train and Test data created.')

    scaler, assembler, pipeline = creating_pipeline(df_features, classifier)

    # Training model
    model = pipeline.fit(train)
    print('Training done.')

    # Getting metrics
    metrics = calc_metrics(model, test)
    print('Metrics prepared.')

    duration = time.time() - start
    
    print(f'Model fitted. The proccess took {int(duration)} s.')
    
    return model, metrics

In [None]:
# Plot feature importances
def plot_feature_importance(data, model):
    """
    Description:
        Graph showing the importance of each feature in ML model.
    
    Args:
        data (DataFrame): the model data with features.
        model: the fitted model.
        height (int): the plot's figure height.
        title (str): the plot's title.
    
    Returns:
        None
    """
    
    features = data.drop('label')
    print(f'Getting feature importance from model: {model.stages[-1].uid}')
    
    if 'LogisticRegression' in model.stages[-1].uid:
        print('Is a Logisti model')
        feature_coeff = model.stages[-1].coefficients
        title = 'Model Coefficients'
    
    else:
        feature_coeff = model.stages[-1].featureImportances
        title = 'Feature Importance'
    
    metrics = pd.DataFrame([(str(col), coef) for col, coef in zip(features, feature_coeff)], columns=['Feature', 'FeatureImportances'])
    metrics['Feature'] = metrics['Feature'].str.extract(r'<(.*?)>')
    
    indices = metrics['Feature']
    values = metrics['FeatureImportances']

    # Crie um DataFrame a partir dos índices e valores
    feature_importances = pd.DataFrame({'Index': indices, 'Value': values})

    values = feature_importances.sort_values(by='Value', ascending=False)['Value']
    labels = feature_importances.sort_values(by='Value', ascending=False)['Index']

    plt.figure(figsize = [8, 10])
    plt.barh(np.arange(len(values)), values, height=0.6)
    ax = plt.gca()
    ax.set_yticks(np.arange(len(labels)))
    ax.set_yticklabels(labels)
    ax.invert_yaxis()  # labels read top-to-bottom
    ax.set_xlabel('FeatureImportances')
    ax.set_ylabel('Features')
    ax.set_title(title)
    plt.grid(True, axis='x', linewidth= 1)

    plt.show()

In [None]:
def fit_model_cv(data, classifier, paramGrid, numFolds=2, seed=0):
    """
    Description:
        Fits the cross-validation model for tuning hyperparameters.
    
    Args:
        data (DataFrame): the model data.
        classifier (lib): a machine learning classifier. 
        paramGrid: a ParamGridBuilder object with hyperparameters.
        numFolds: the number of folds in the cross-validation tuning.
    
    Returns:
        model_cv: the cross-validation model.
        scores_cv: the cross-validation scores.
    """
    
    train, test, df_features, start = stating_process(data, seed)
    
    
    scaler, assembler, pipeline = creating_pipeline(df_features, classifier)
    
    #Creating cv
    crossValidator = CrossValidator(estimator = pipeline,
                              estimatorParamMaps = paramGrid,
                              evaluator = MulticlassClassificationEvaluator(),
                              numFolds = numFolds)

    #Fit cv
    model_cv = crossValidator.fit(train)
    
    duration = time.time() - start  
    print(f'Cross validation done ({int(duration)} s).')

    #Calculating CV scores
    scores = model_cv.avgMetrics
    params_ = pd.DataFrame(
        [{x.name: y for x, y in e.items()} for e in model_cv.getEstimatorParamMaps()]
    )
    params_['score'] = scores
    
    scores_cv = params_.sort_values(by='score', ascending=False)
    
    return model_cv, scores_cv

Spark support some algorithms for Classification, based on this [documention](https://spark.apache.org/docs/latest/ml-classification-regression.html#classification).

In this project, four models woulb be tested:

- Gradient-boosted tree classifier
- Logistic regression
- Decision tree classifier
- Random forest classifier

#### Gradient Boost Tree Classifier

In [None]:
# Fit model
classifier = GBTClassifier()
model_gbt, metrics_gradient_boost = fit_model(data, classifier)

In [None]:
metrics_gradient_boost

In [None]:
plot_feature_importance(data, model_gbt)

In [None]:
# Create classifier
classifier = GBTClassifier()

# Define params grid
paramGrid_gbt = ParamGridBuilder() \
    .addGrid(classifier.maxIter, [5, 10, 20]) \
    .addGrid(classifier.maxDepth,[2, 3, 5]) \
    .addGrid(classifier.maxBins, [32, 20]) \
    .build()

# Fit cross-validation model
model_gbt_cv, scores_gbt_cv = fit_model_cv(data, classifier, paramGrid_gbt)

In [None]:
scores_gbt_cv

#### Logistic Regression

In [None]:
# Fit model
classifier = LogisticRegression(featuresCol="features", labelCol="label")
model_reg_log, metrics_reg_log = fit_model(data, classifier)

In [None]:
metrics_reg_log

In [None]:
plot_feature_importance(data, model_reg_log)

For Logistic Regression, the absolute magnitude of the coefficients in coef can be interpreted as an indicator of feature importance. Larger absolute coefficient values suggest a stronger influence of that feature on the model's predictions.

### Decision tree classifier

In [None]:
# Fit model
classifier = DecisionTreeClassifier(labelCol="label", featuresCol="features")
model_dec_tree, metrics_dec_tree = fit_model(data, classifier)

In [None]:
metrics_dec_tree

In [None]:
plot_feature_importance(data, model_dec_tree)

In [None]:
# Create classifier
classifier = DecisionTreeClassifier(labelCol="label", featuresCol="features")

# Define params grid
paramGrid_dec_tree = ParamGridBuilder() \
    .addGrid(classifier.maxDepth,[5, 10]) \
    .addGrid(classifier.maxBins, [32, 20]) \
    .build()

# Fit cross-validation model
model_dec_tree, scores_dec_tree = fit_model_cv(data, classifier, paramGrid_dec_tree)

In [None]:
scores_dec_tree

### Random forest classifier

In [None]:
# Fit model
classifier = RandomForestClassifier()
model_rand_for, metrics_rand_for = fit_model(data, classifier)

In [None]:
model_rand_for.stages[-1:]

In [None]:
metrics_rand_for

In [None]:
plot_feature_importance(data, model_rand_for)

In [None]:
# Create classifier
classifier = RandomForestClassifier()

# Define params grid
paramGrid_rand_for = ParamGridBuilder()\
    .addGrid(classifier.maxBins, [10, 20, 25])\
    .addGrid(classifier.maxDepth, [3, 5, 10])\
    .addGrid(classifier.numTrees, [4, 10, 20])\
    .build()

# Fit cross-validation model
model_rand_for, scores_rand_for = fit_model_cv(data, classifier, paramGrid_rand_for)

In [None]:
scores_rand_for

In [None]:
from subprocess import call
call(['python', '-m', 'nbconvert', 'Churn_Prediction.ipynb'])