In [0]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, to_date, from_unixtime, lead
from pyspark.sql.types import IntegerType, StringType, DateType
from pyspark.sql import Window
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import round as Fround
from pyspark.sql.functions import max as Fmax
from pyspark.sql.functions import min as Fmin
import datetime

pd.set_option('max_colwidth', None)
pd.set_option("display.max_rows", None)

In [0]:
def import_data_into_dataframe(file_location, file_type,
                               infer_schema="false",
                               first_row_is_header="false",
                               delimiter=","):
    """Imports the specified file into a spark dataframe

    Args:
        file_location (str): Path to the file location
        file_type (str): File type
        infer_schema (str, optional): Should pyspark infer schema. Defaults to "false".
        first_row_is_header (str, optional): Is the first row a header. Defaults to "false".
        delimiter (str, optional): The delimiter used in the file. Defaults to ",".

    Returns:
        pyspark.sql.dataframe.DataFrame: The pyspark dataframe
    """

    spark = SparkSession.builder.getOrCreate()

    data_df = spark.read.format(file_type) \
        .option("inferSchema", infer_schema) \
        .option("header", first_row_is_header) \
        .option("sep", delimiter) \
        .load(file_location)

    return data_df


def plot_bar_graph(xlist, ylist, title="Bar graph", xaxis_title="", yaxis_title=""):
    """Helper function to plot a plotly bar graph

    Args:
        xlist (Iterable): Values for the xaxis
        ylist (iterable): Values for the yaxis
        title (str, optional): The title for the plot. Defaults to "Bar graph".
        xaxis_title (str, optional): The title for the xaxis. Defaults to "".
        yaxis_title (str, optional): The title for the yaxis. Defaults to "".
    """

    fig = go.Figure([go.Bar(x=xlist, y=ylist, text=ylist, textposition='outside')])
    fig.update_traces(marker_color='rgb(158,202,225)', marker_line_color='rgb(8,48,107)', marker_line_width=1.5,
                      opacity=0.6)
    fig.update_layout(title_text=title, title_font_size=30, xaxis_title=xaxis_title, yaxis_title=yaxis_title,
                      yaxis_showgrid=False, yaxis_visible=False, xaxis_tickmode='linear', xaxis_tick0=0, xaxis_dtick=1)
    fig.show()


def count_null_values_for_each_column(spark_df):
    """Creates a dictionary of the number of nulls in each column

    Args:
        spark_df (pyspark.sql.dataframe.DataFrame): The spark dataframe for which the nulls need to be counted

    Returns:
        dict: A dictionary with column name as key and null count as value
    """

    null_counts = {}
    for col_ in spark_df.columns:
        null_counts[col_] = spark_df.filter(f"{col_} is null").count()

    return null_counts


def count_empty_strings_for_each_string_column(spark_df):
    """Creates a dictionary of counts of empty strings in columns of type string

    Args:
        spark_df (pyspark.sql.dataframe.DataFrame): The spark dataframe for which empty strings need to be counted

    Returns:
        dict: A dictionary with column name as key and empty string counts as value
    """

    empty_string_values = dict()
    for string_column in get_columns_of_type(spark_df, 'string'):
        empty_string_values[string_column] = spark_df.where(f"{string_column} is not null").where(
            pyspark_func_to_trim_strings()(spark_df[string_column]) == '').count()

    return empty_string_values


def count_column_types(spark_df):
    """Returns a pandas dataframe containing the datatype and the number of columns of that datatype

    Args:
        spark_df (pyspark.sql.dataframe.DataFrame): The spark dataframe for which the types are to be counted

    Returns:
        pandas.DataFrame: A pandas dataframe
    """

    return pd.DataFrame(spark_df.dtypes).groupby(1, as_index=False)[0].agg(
        {'count': 'count', 'names': lambda x: " | ".join(set(x))}).rename(columns={1: "type"})


def get_columns_of_type(spark_df, type_name):
    """Returns a list of columns of the specified datatype

    Args:
        spark_df (pyspark.sql.dataframe.DataFrame): The spark dataframe
        type_name (str): The datatype

    Returns:
        list: A list containing the names of columns
    """
    return list(map(lambda x: x[0], filter(lambda x: x[1] == type_name, spark_df.dtypes)))


def pyspark_func_to_trim_strings():
    return udf(lambda x: x.strip())


def clean_dataset(spark_df):
    """Removes unnecessary rows from a pyspark dataframe

    Args:
        spark_df (pyspark.sql.dataframe.DataFrame): The spark dataframe

    Returns:
        pyspark.sql.dataframe.DataFrame: The cleaned spark dataframe
    """

    spark_df_new = spark_df.where(pyspark_func_to_trim_strings()(spark_df['userId']) != '')

    return spark_df_new


def get_counts_as_pandas_df(column_to_groupBy, spark_df):
    """Groups by a the given column and returns a count of rows for each value

    Args:
        column_to_groupBy (str): Name of the column to group by
        spark_df (pyspark.sql.dataframe.DataFrame): The pyspark dataframe

    Returns:
        pandas.DataFrame: A pandas DataFrame with counts of each value in the given column
    """

    return spark_df.filter(f"{column_to_groupBy} is not null").groupBy([column_to_groupBy]).count().orderBy(
        col('count').desc()).toPandas()


def get_summary_of_category_column(column_name, spark_df, top_n=10, print_result=False):
    """Returns a dataframe of most common values in a column and their counts

    Args:
        column_name (str): The name of the column
        spark_df (pyspark.sql.dataframe.DataFrame): The pyspark dataframe.=
        top_n (int, optional): The top n number of results. Defaults to 10.
        print_result (bool, optional): Should the result be printed. Defaults to False.

    Returns:
        pandas.DataFrame: A pandas DataFrame containing the frequencies
    """

    counts = get_counts_as_pandas_df(column_name, spark_df)

    if print_result:
        print(f"There are {counts.shape[0]} unique {column_name}/s and the most frequent are: ")
        print(counts.head(top_n))

    return counts.head(top_n)


def create_summary_plots(dataframes, titles, rows, cols, super_title, height=1300):
    """Create a plot of all summaries

    Args:
        dataframes (list): A list of the dataframes
        titles (list): A list of titles for each dataframe
        rows (int): Number of rows for the subplot
        cols (int): Number of columns for the subplot
        super_title (string): The main title of the plot
        height (int, optional): The Height of the plot. Defaults to 1300.
    """

    fig = make_subplots(rows=rows, cols=cols, subplot_titles=titles)

    bar_traces = []

    for cnt_df in dataframes:
        bar_traces.append(
            go.Bar(x=cnt_df.iloc[:, 0].apply(lambda x: x if len(x) < 25 else x[:25] + '...'), y=cnt_df.iloc[:, 1],
                   text=cnt_df.iloc[:, 1], textposition='outside'))

    for i in range(1, rows + 1):
        for j in range(1, cols + 1):
            fig.add_trace(bar_traces.pop(0), row=i, col=j)
            fig.update_yaxes(visible=False, showgrid=False, row=i, col=j)
            fig.update_xaxes(showgrid=False, row=i, col=j)

    fig.update_traces(marker_color='rgb(158,202,225)', marker_line_color='rgb(8,48,107)', marker_line_width=1.5,
                      opacity=0.6)
    fig.update_layout(title_text=super_title, title_font_size=50, height=height, showlegend=False)

    fig.show()


def get_counts_for_unique_users(column_name, spark_df, print_result=False):
    """Generates a dataframe having counts for a particular column for unique users

    Args:
        column_name (str): Name of the column
        spark_df (pyspark.sql.dataframe.DataFrame): The pyspark dataframe
        print_result (bool, optional): Flag to print the result. Defaults to False.

    Returns:
        pandas.DataFrame: Resultant DataFrame
    """

    pd_df = spark_df.select("userId", column_name).distinct().groupBy(column_name).count().toPandas()

    if print_result:
        print(pd_df)

    return pd_df


def spark_func_to_mark_user_cancellation_event():
    return udf(lambda x: 1 if x == "Cancellation Confirmation" else 0)


def add_column_to_flag_cancellation_event(spark_df):
    return spark_df.withColumn("user_cancelled", spark_func_to_mark_user_cancellation_event()("page"))


def add_column_to_mark_rows_for_churned_users(spark_df):
    windowval = Window.partitionBy("userId").orderBy(col("ts").desc()).rangeBetween(Window.unboundedPreceding, 0)

    return spark_df.withColumn('churned', Fsum('user_cancelled').over(windowval)).orderBy("userId", "ts")


def mark_users_as_churners(spark_df):
    """Creates a pyspark dataframe with userId and their corresponding churn nature. 1 if churned, 0 otherwise.

    Args:
        spark_df (pyspark.sql.dataframe.DataFrame): The pyspark dataframe containing event logs

    Returns:
        pyspark.sql.dataframe.DataFrame: Resultant pyspark dataframe
    """

    return spark_df.select("userId", "churned").distinct()


def split_into_train_test_80_20(spark_df):
    """Splits the data into train and test of 80-20 split

    Args:
        spark_df (pyspark.sql.dataframe.DataFrame): The pyspark dataframe to split

    Returns:
        tuple: A tuple of train and test pyspark dataframes
    """
    users_marked = mark_users_as_churners(spark_df)

    train_users = users_marked.sampleBy("churned", fractions={0: 0.8, 1: 0.8}, seed=10)
    test_users = users_marked.subtract(train_users)

    train_data = spark_df.join(train_users, spark_df.userId == train_users.userId, "inner").drop(
        train_users.userId).drop(train_users.churned)
    test_data = spark_df.join(test_users, spark_df.userId == test_users.userId, "inner").drop(test_users.userId).drop(
        test_users.churned)

    return train_data, train_users, test_data, test_users


def filter_for_event_and_group_by_userId_and_sessionId(event_name, spark_df):
    return spark_df.filter(f'page == "{event_name}"').groupBy("userId", "sessionId")


def get_stat_per_session_for_users(event_name, alias_name, spark_df):
    return filter_for_event_and_group_by_userId_and_sessionId(event_name, spark_df).count().groupBy(
        "userId").mean().select("userId", Fround(col("avg(count)"), 2).alias(alias_name))


def compare_churner_nonchurners(stat_df, title, users_marked_df):
    """Creates a plot to compare a stat between churners and nonchurners

    Args:
        stat_df (pyspark.sql.dataframe.DataFrame): A pyspark dataframe of the stat
        title (str): Title of the plot
        users_marked_df (pyspark.sql.dataframe.DataFrame): A pyspark dataframe indicating users as churners or non churners
        :rtype: None
    """

    joined = users_marked_df.alias("A").join(stat_df.alias("B"), col("A.userId") == col("B.userId"), "left").drop(
        col("B.userId")).toPandas()
    joined.churned = joined.churned.apply(lambda x: 'churner' if x == 1 else "non-churner")
    joined.fillna(0, inplace=True)
    fig = go.Figure()
    fig.add_trace(go.Box(x=joined.iloc[:, 1], y=joined.iloc[:, 2], jitter=0.3, pointpos=-1.8, boxpoints='all',
                         marker_color='rgb(7,40,89)', line_color='rgb(7,40,89)'))
    fig.update_layout(title_text=title, title_font_size=15, width=500, height=500)
    fig.show()


def comparison_summary(event, alias_name, spark_df, users_marked_df, plot=False, plot_title=""):
    """[summary]

    Args:
        event (str): The name of the event
        alias_name (str): Alias name for the generated stat
        spark_df (pyspark.sql.dataframe.DataFrame): The pyspark dataframe of event logs
        users_marked_df (pyspark.sql.dataframe.DataFrame): The pyspark dataframe of marked users
        plot (bool, optional): Option to plot the stat. Defaults to False.
        plot_title (str, optional): Title of the plot. Defaults to "".

    Returns:
        pyspark.sql.dataframe.DataFrame: A pyspark dataframe containing the userId and stat
    """

    stat_summary = get_stat_per_session_for_users(event, alias_name, spark_df)

    if plot:
        stat_summary.show(10)
        compare_churner_nonchurners(stat_summary, plot_title, users_marked_df)

    return stat_summary


def event_count_per_user(event_name, alias_name, spark_df):
    return spark_df.filter(f"page == '{event_name}'").groupBy('userId').count().select("userId",
                                                                                       col("count").alias(alias_name))


def comparison_summary_for_user(event_name, alias_name, spark_df, users_marked_df, plot=False, plot_title=""):
    summary_df = event_count_per_user(event_name, alias_name, spark_df)

    if plot:
        summary_df.show(20)
        compare_churner_nonchurners(summary_df, plot_title, users_marked_df)

    return summary_df


def get_avg_number_of_artists_listened_per_session_per_user(spark_df, users_marked_df, plot=False, plot_title=""):
    """Creates a pyspark dataframe with avg number of artists per session for every user

    Args:
        spark_df (pyspark.sql.dataframe.DataFrame): The spark dataframe of event logs
        users_marked_df (pyspark.sql.dataframe.DataFrame): The spark dataframe of marked users
        plot (bool, optional): To enable plot. Defaults to False.
        plot_title (str, optional): Plot title. Defaults to "".

    Returns:
        pyspark.sql.dataframe.DataFrame: Pyspark dataframe
    """

    avg_num_of_artists_per_session = spark_df.select("userId", "sessionId", "artist").distinct().groupBy('userId',
                                                                                                         'sessionId').count().groupBy(
        "userId").mean("count").select("userId", Fround(col("avg(count)"), 2).alias("avg_num_of_artists_per_session"))

    if plot:
        avg_num_of_artists_per_session.show(10)
        compare_churner_nonchurners(avg_num_of_artists_per_session, plot_title, users_marked_df)

    return avg_num_of_artists_per_session


def get_number_of_times_each_user_changed_levels(spark_df, users_marked_df, plot=False):
    """Creates a dataframe with number of times a user changed level

    Args:
        spark_df (pyspark.sql.dataframe.DataFrame): The pyspark dataframe of event logs
        users_marked_df (pyspark.sql.dataframe.DataFrame): The spark dataframe of marked users
        plot (bool, optional): To enable plot. Defaults to False.

    Returns:
        pyspark.sql.dataframe.DataFrame: Pyspark dataframe
    """
    
    winfunc = Window.partitionBy("userId").orderBy('ts')

    num_of_times_user_changed_levels = spark_df.withColumn("leader", lead("level").over(winfunc)).select("userId",
                                                                                                         "level",
                                                                                                         "leader").withColumn(
        "same", col("level") != col("leader")).select("userId", col("same").cast(IntegerType())).groupBy("userId").sum(
        "same").select("userId", col("sum(same)").alias("num_times_user_changed_levels"))

    if plot:
        num_of_times_user_changed_levels.show(10)
        compare_churner_nonchurners(num_of_times_user_changed_levels, "number of times user changed levels",
                                    users_marked_df)

    return num_of_times_user_changed_levels


def get_user_gender(spark_df):
    return spark_df.select("userId", "gender").distinct()


def aggregate_features(spark_df, users_marked_df, enable_plot=False):
    """Creates features for the given dataframe of event logs

    Args:
        spark_df (pyspark.sql.dataframe.DataFrame): The spark dataframe of event logs
        users_marked_df (pyspark.sql.dataframe.DataFrame): The spark dataframe of userId marked as churners or nonchurners
        enable_plot (bool, optional): Option to plot stats. Defaults to False.

    Returns:
        pyspark.sql.dataframe.DataFrame: A pyspark dataframe of the feature matrix
    """

    avg_num_of_songs_per_session = comparison_summary("NextSong", "avg_num_of_songs_per_session", spark_df,
                                                      users_marked_df, enable_plot, "songs per session")

    avg_num_of_adverts_per_session = comparison_summary("Roll Advert", "avg_num_of_adverts_per_session", spark_df,
                                                        users_marked_df, enable_plot, "adverts per session")

    avg_num_of_visits_to_the_home_page_per_session = comparison_summary("Home", "avg_num_of_visits_to_home_per_session",
                                                                        spark_df, users_marked_df,
                                                                        enable_plot,
                                                                        "number of visits to the home page")

    avg_num_of_visits_to_the_about_page_per_session = comparison_summary("About",
                                                                         "average_number_of_visits_to_the_about_page_per_session",
                                                                         spark_df, users_marked_df,
                                                                         enable_plot,
                                                                         "number of visits to the About page per session")

    avg_num_of_visits_to_the_help_page_per_session = comparison_summary("Help",
                                                                        "average_number_of_visits_to_the_help_page_per_session",
                                                                        spark_df, users_marked_df,
                                                                        enable_plot,
                                                                        "number of visits to the Help page per session")

    avg_num_of_visits_to_the_settings_page_per_session = comparison_summary("Settings",
                                                                            "avg_num_of_visits_to_the_settings_page_per_session",
                                                                            spark_df, users_marked_df,
                                                                            enable_plot,
                                                                            "number of visits to the Settings page")

    avg_num_of_times_the_settings_changed_per_session = comparison_summary("Save Settings",
                                                                           "avg_num_of_times_settings_changed_per_session",
                                                                           spark_df, users_marked_df,
                                                                           enable_plot,
                                                                           "number of times settings was changed")

    avg_num_of_thumbs_up_per_session = comparison_summary("Thumbs Up", "avg_num_of_thumbs_up_per_session", spark_df,
                                                          users_marked_df, enable_plot,
                                                          "number of thumbs up")

    avg_num_of_thumbs_down_per_session = comparison_summary("Thumbs Down", "avg_num_of_thumbs_down_per_session",
                                                            spark_df, users_marked_df, enable_plot,
                                                            "number of thumbs down")

    avg_num_of_add_to_playlist_per_session = comparison_summary("Add to Playlist",
                                                                "avg_num_of_add_to_playlist_per_session", spark_df,
                                                                users_marked_df, enable_plot,
                                                                "number of add to playlist")

    avg_num_of_addfriends_per_session = comparison_summary("Add Friend", "avg_num_of_addfriends_per_session", spark_df,
                                                           users_marked_df, enable_plot,
                                                           "number of Add Friend")

    avg_number_of_errors_per_session = comparison_summary("Error", "avg_number_of_errors_per_session", spark_df,
                                                          users_marked_df, enable_plot, "number of Errors")

    avg_num_of_visits_to_upgrade_page = comparison_summary("Upgrade", "avg_num_of_visits_to_upgrade_page", spark_df,
                                                           users_marked_df, enable_plot,
                                                           "number of Upgrade")

    avg_number_of_visits_to_downgrade_page = comparison_summary("Downgrade", "avg_number_of_visits_to_downgrade_page",
                                                                spark_df, users_marked_df, enable_plot,
                                                                "number of downgrades")

    number_of_downgrade_submits_per_user = comparison_summary_for_user("Submit Downgrade",
                                                                       "num_of_downgrades_submitted",
                                                                       spark_df,
                                                                       users_marked_df,
                                                                       enable_plot,
                                                                       "number of downgrades submitted")

    number_of_upgrade_submits_per_user = comparison_summary_for_user("Submit Upgrade", "num_of_upgrades_submitted",
                                                                     spark_df,
                                                                     users_marked_df,
                                                                     enable_plot, "number of upgrades submitted")

    avg_num_of_artists_per_session = get_avg_number_of_artists_listened_per_session_per_user(spark_df,
                                                                                             users_marked_df,
                                                                                             plot=enable_plot,
                                                                                             plot_title="Number of artists per session")

    num_of_times_user_changed_levels = get_number_of_times_each_user_changed_levels(spark_df,
                                                                                    users_marked_df,
                                                                                    plot=enable_plot)

    users_gender = get_user_gender(spark_df)

    final = users_marked_df.alias("A").join(avg_num_of_add_to_playlist_per_session.alias("B"),
                                            col("A.userId") == col("B.userId"), "left").drop(col("B.userId"))
    final = final.join(avg_num_of_addfriends_per_session, final.userId == avg_num_of_addfriends_per_session.userId,
                       "left").drop(avg_num_of_addfriends_per_session.userId)
    final = final.join(avg_num_of_adverts_per_session, final.userId == avg_num_of_adverts_per_session.userId,
                       'left').drop(avg_num_of_adverts_per_session.userId)
    final = final.join(avg_num_of_artists_per_session, final.userId == avg_num_of_artists_per_session.userId,
                       'left').drop(avg_num_of_artists_per_session.userId)
    final = final.join(avg_num_of_songs_per_session, final.userId == avg_num_of_songs_per_session.userId, 'left').drop(
        avg_num_of_songs_per_session.userId)
    final = final.join(avg_num_of_thumbs_down_per_session, final.userId == avg_num_of_thumbs_down_per_session.userId,
                       'left').drop(avg_num_of_thumbs_down_per_session.userId)
    final = final.join(avg_num_of_thumbs_up_per_session, final.userId == avg_num_of_thumbs_up_per_session.userId,
                       'left').drop(avg_num_of_thumbs_up_per_session.userId)
    final = final.join(avg_num_of_times_the_settings_changed_per_session,
                       final.userId == avg_num_of_times_the_settings_changed_per_session.userId, 'left').drop(
        avg_num_of_times_the_settings_changed_per_session.userId)
    final = final.join(avg_num_of_visits_to_the_about_page_per_session,
                       final.userId == avg_num_of_visits_to_the_about_page_per_session.userId, 'left').drop(
        avg_num_of_visits_to_the_about_page_per_session.userId)
    final = final.join(avg_num_of_visits_to_the_help_page_per_session,
                       final.userId == avg_num_of_visits_to_the_help_page_per_session.userId, 'left').drop(
        avg_num_of_visits_to_the_help_page_per_session.userId)
    final = final.join(avg_num_of_visits_to_the_home_page_per_session,
                       final.userId == avg_num_of_visits_to_the_home_page_per_session.userId, 'left').drop(
        avg_num_of_visits_to_the_home_page_per_session.userId)
    final = final.join(avg_num_of_visits_to_the_settings_page_per_session,
                       final.userId == avg_num_of_visits_to_the_settings_page_per_session.userId, 'left').drop(
        avg_num_of_visits_to_the_settings_page_per_session.userId)
    final = final.join(avg_num_of_visits_to_upgrade_page, final.userId == avg_num_of_visits_to_upgrade_page.userId,
                       'left').drop(avg_num_of_visits_to_upgrade_page.userId)
    final = final.join(avg_number_of_errors_per_session, final.userId == avg_number_of_errors_per_session.userId,
                       'left').drop(avg_number_of_errors_per_session.userId)
    final = final.join(avg_number_of_visits_to_downgrade_page,
                       final.userId == avg_number_of_visits_to_downgrade_page.userId, 'left').drop(
        avg_number_of_visits_to_downgrade_page.userId)
    final = final.join(num_of_times_user_changed_levels, final.userId == num_of_times_user_changed_levels.userId,
                       'left').drop(num_of_times_user_changed_levels.userId)
    final = final.join(number_of_downgrade_submits_per_user,
                       final.userId == number_of_downgrade_submits_per_user.userId, 'left').drop(
        number_of_downgrade_submits_per_user.userId)
    final = final.join(number_of_upgrade_submits_per_user, final.userId == number_of_upgrade_submits_per_user.userId,
                       'left').drop(number_of_upgrade_submits_per_user.userId)
    final = final.join(users_gender, final.userId == users_gender.userId, "left").drop(users_gender.userId)
    
    final = final.fillna(0) # filling all null with 0

    return final

In [0]:
# File location and type
file_location = "/FileStore/tables/medium_sparkify_event_data.json"
file_type = "json"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

df_data_1 = import_data_into_dataframe(file_location, file_type, infer_schema, first_row_is_header, delimiter)

df_data_1.show(5)

In [0]:
print(f"There are {len(df_data_1.columns)} columns and {df_data_1.count()} rows. The schema of the dataset is as shown below:")
df_data_1.printSchema()

In [0]:
null_counts = count_null_values_for_each_column(df_data_1)

In [0]:
plot_bar_graph(list(null_counts.keys()), list(null_counts.values()), "Number of null values in each column", "column")

From the bar graph, it can be seen that some columns have identical number of missing values. This probably means that depending on the type of event, only certain number of fields are filled. Let's check this.

In [0]:
print("The pages for which artist is null")
set(df_data_1.filter("artist is null").select("page").distinct().collect())

In [0]:
print("The pages for which artist is not null")
set(df_data_1.select("page").distinct().collect()) - set(df_data_1.filter("artist is null").select("page").distinct().collect())

Here, it can be seen that the artist column is null for all events except for the "NextSong" event.

In [0]:
print("The pages for which firstName is not null")
set(df_data_1.select('page').distinct().collect()) - set(df_data_1.filter("firstName is null").select("page").distinct().collect())

In [0]:
print("The pages for which firstName is null")
set(df_data_1.filter("firstName is null").select("page").distinct().collect())

Here, the first name is null for events such as visiting the "About", "Help", "Login" page.

So far we have looked into the overall dataset for any nulls. Now let's look at the different datatypes and look for erroneous values.

In [0]:
count_column_types(df_data_1)

Unnamed: 0,type,count,names
0,bigint,5,ts | registration | sessionId | status | itemInSession
1,double,1,length
2,string,12,firstName | gender | level | page | auth | userAgent | userId | song | lastName | artist | location | method


In [0]:
empty_string_values = count_empty_strings_for_each_string_column(df_data_1)

In [0]:
plot_bar_graph(list(empty_string_values.keys()), list(empty_string_values.values()), "Number of empty values in each column of type string", "column")

In [0]:
df_data_1.where("userId is not null").where(pyspark_func_to_trim_strings()(df_data_1['userId']) == '').show(10)

After observing the bar graph and looking into some of the rows where userId is empty, we can conclude that userId is empty for the same events as the ones for which firstName, lastName etc are null.

In [0]:
# now let's look into numberic columns
df_data_1.describe(get_columns_of_type(df_data_1, 'bigint')).show()

In [0]:
df_data_1.describe(get_columns_of_type(df_data_1, 'double')).show()

The status column contains the http request codes, I do not think it could be of much use to us.

The length column is the duration of the song, again I do not think it could be of much use.

No other funny values.

Now, before we go further with EDA, let's removed all rows with an empty user id. The aim is to identify users who could churn, hence rows not related to an user is not of interest.

In [0]:
clean_data = clean_dataset(df_data_1)
clean_data.show(5)

In [0]:
clean_data.persist()

## EDA

In [0]:
clean_data.printSchema()

In [0]:
print('-'*50)
top_users = get_summary_of_category_column('userId', clean_data, print_result=True)
print('-'*50)
top_artists = get_summary_of_category_column('artist', clean_data, print_result=True)
print('-'*50)
auth_counts = get_summary_of_category_column('auth', clean_data, print_result=True)
print('-'*50)
page_counts = get_summary_of_category_column('page', clean_data, print_result=True)
print('-'*50)
location_counts = get_summary_of_category_column('location', clean_data, print_result=True)
print('-'*50)
registration_counts = get_summary_of_category_column('registration', clean_data, print_result=True)
print('-'*50)
song_counts = get_summary_of_category_column('song', clean_data, print_result=True)
print('-'*50)
userAgent_counts = get_summary_of_category_column('userAgent', clean_data, print_result=True)
print('-'*50)
gender_activity = get_summary_of_category_column("gender", clean_data, print_result=True)
print('-'*50)
level_activity = get_summary_of_category_column("level", clean_data, print_result=True)
print('-'*50)

In [0]:
count_df_to_plot = [top_artists, song_counts, page_counts, location_counts, auth_counts, userAgent_counts, gender_activity, level_activity]
titles = []
for cnt_df in count_df_to_plot:
    titles.append(cnt_df.columns[0])

In [0]:
create_summary_plots(count_df_to_plot, titles, 4, 2, "Data at a glance", 2500)

Observations of the above plots:
- The most frequented action is "NextSong", this makes sense as users use the service to listen to music
- New York,Newark,Jersey City are the source of most traffic to this service
- For some strange reason, the user agent is mostly always Mozilla
- Most events occur with the user logged in
- Most activity is from males
- There is four times more activity from paid users then free users

Now, lets investigate how our user demographics are distributed

In [0]:
print('-'*50)
level_counts = get_counts_for_unique_users("level", clean_data, print_result=True)
print('-'*50)
gender_counts = get_counts_for_unique_users("gender", clean_data, print_result=True)
print('-'*50)

In [0]:
create_summary_plots([gender_counts, level_counts], ["Users", "Levels"], 1, 2, "User Summary", 500)

Observations:
- There are more males (250) than females (198) in this dataset - Total of 448 users
- There are more free (370) accounts than paid (321), adding upto 691. This number is higher than the number of users because user can ugrade or downgrade. And as a result, if a user has changed levels, then the same user would be counted twice in this bar chart.

In [0]:
datetime.datetime.fromtimestamp(1538352011000/1000.0).day

In [0]:
convert_to_datetime = udf(lambda x: datetime.datetime.fromtimestamp(x/1000.0))
convert_to_datetime_string = udf(lambda x: str(x))
convert_to_hour = udf(lambda x: x.hour)
convert_to_year = udf(lambda x: x.year)
convert_to_month = udf(lambda x: x.month)
convert_to_day = udf(lambda x: x.day)

In [0]:
# Add time details

clean_data = clean_data.withColumn("timestamp", convert_to_datetime("ts")).withColumn("dt_string", convert_to_datetime_string("timestamp")).withColumn("year", convert_to_year("timestamp")).withColumn("month", convert_to_month("timestamp"))\
.withColumn("hour", convert_to_hour("timestamp")).withColumn("day", convert_to_day("timestamp"))
clean_data.select(['ts', 'timestamp', 'dt_string', 'year', 'month', 'hour', 'day']).show(20)

In [0]:
song_per_hour_of_day = clean_data.filter(clean_data.page == "NextSong").groupBy("hour").count().toPandas()
song_per_hour_of_day.hour = pd.to_numeric(song_per_hour_of_day.hour)
song_per_hour_of_day.head()

Unnamed: 0,hour,count
0,7,15069
1,15,21953
2,11,16290
3,3,15581
4,8,15045


In [0]:
plot_bar_graph(song_per_hour_of_day["hour"], song_per_hour_of_day["count"], "Number of songs played", "hour of day")

In [0]:
song_per_month = clean_data.filter(clean_data.page == "NextSong").groupBy("month").count().toPandas()
song_per_month.month = pd.to_numeric(song_per_month.month)
song_per_month.head()

Unnamed: 0,month,count
0,10,217942
1,11,214931
2,12,4


In [0]:
plot_bar_graph(song_per_month["month"], song_per_month["count"], "Number of songs played", "month")

In [0]:
clean_data.select("year","month").distinct().show()

The data we have in hand is of the last 3 months of 2018.

In [0]:
total_records = clean_data.count()
day_wise_counts = clean_data.groupBy("year","month",'day').count().toPandas()
for colm in day_wise_counts.columns:
  day_wise_counts[colm] = pd.to_numeric(day_wise_counts[colm])
day_wise_counts = day_wise_counts.sort_values(by=['year','month','day'])
day_wise_counts['total%'] = day_wise_counts['count'].cumsum() / total_records * 100
day_wise_counts

Unnamed: 0,year,month,day,count,total%
5,2018,10,1,6977,1.321389
3,2018,10,2,8821,2.992017
2,2018,10,3,9888,4.864727
11,2018,10,4,9893,6.738383
8,2018,10,5,10353,8.69916
10,2018,10,6,5956,9.82718
9,2018,10,7,2380,10.277933
7,2018,10,8,9743,12.123181
4,2018,10,9,10065,14.029413
0,2018,10,10,10611,16.039053


## Marking users as churners and nonchurners

In [0]:
mark_cancellation = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0)

In [0]:
clean_data.unpersist()
clean_data = add_column_to_flag_cancellation_event(clean_data)

In [0]:
clean_data.select("userId", "user_cancelled").show(50)

In [0]:
clean_data.groupBy("user_cancelled").count().show()

There are 99 cancellation events

In [0]:
clean_data = add_column_to_mark_rows_for_churned_users(clean_data)

In [0]:
clean_data.persist()
clean_data.select("userId", "sessionId", "dt_string", "page", "churned").show()

Now that we have marked the rows as belong to a churned or nonchurned user, we will now split the data into train and test before we go ahead with feature engineering.

In [0]:
users_marked = mark_users_as_churners(clean_data)
users_marked.groupBy("churned").count().show()

There are 99 users who churned.

In [0]:
train_data, train_users, test_data, test_users = split_into_train_test_80_20(clean_data)
train_info = train_users.groupBy("churned").count().toPandas()
test_info = test_users.groupBy("churned").count().toPandas()

In [0]:
print(f"There are {len(train_data.columns)} columns and {train_data.count()} rows in the train data set")
print(f"The train data set has information about {train_data.select('userId').distinct().count()} users")
print(f"Out of which {train_info[train_info.churned == 0]['count'][0]} are non churners and {train_info[train_info.churned == 1]['count'][1]} are churners")
print(f"There are {len(test_data.columns)} columns and {test_data.count()} rows in the test data set")
print(f"The test data set has information about {test_data.select('userId').distinct().count()} users")
print(f"Out of which {test_info[test_info.churned == 0]['count'][0]} are non churners and {test_info[test_info.churned == 1]['count'][1]} are churners")

In [0]:
clean_data.unpersist()
train_data.persist()
train_users.persist()

Feature Engineering: Based on the event logs, we will engineer the following features
- Average Number of songs per session
- Average number of adverts served per session
- Average number of visits to the home page per session
- Average number of visits to the about page per session
- Average number of visits to the help page per session
- Average number of visits to the settings page per session
- Average number of times settings changed per session
- Average number of thumbs up per session
- Average number of thumbs down per session
- Average number of add to playlists per session
- Average number of AddFriends per session
- Average number of Errors faced per session
- Average number of visits to the upgrade page
- Average number of visits to the downgrade page
- Average number of different artists listened per session
- Number of different levels the user switched
- Number of submit upgrades
- Number of submit downgrades

Average number of songs per session : NextSong

In [0]:
avg_num_of_songs_per_session = comparison_summary("NextSong", "avg_num_of_songs_per_session", train_data, train_users, True, "songs per session")

Average number of adverts served per session : Roll Advert

In [0]:
avg_num_of_adverts_per_session = comparison_summary("Roll Advert", "avg_num_of_adverts_per_session", train_data, train_users, True, "adverts per session")

Average number of visits to the home page per session : Home

In [0]:
avg_num_of_visits_to_the_home_page_per_session = comparison_summary("Home", "avg_num_of_visits_to_home_per_session", train_data, train_users, True, "number of visits to the home page")

Average number of visits to the about page per session : About

In [0]:
avg_num_of_visits_to_the_about_page_per_session = comparison_summary("About", "average_number_of_visits_to_the_about_page_per_session", train_data, train_users, True, "number of visits to the About page per session")

Average number of visits to the help page per session : Help

In [0]:
avg_num_of_visits_to_the_help_page_per_session = comparison_summary("Help", "average_number_of_visits_to_the_help_page_per_session", train_data, train_users, True, "number of visits to the Help page per session")

Average number of visits to the settings page per session : Settings

In [0]:
avg_num_of_visits_to_the_settings_page_per_session = comparison_summary("Settings", "avg_num_of_visits_to_the_settings_page_per_session", train_data, train_users, True, "number of visits to the Settings page")

Average number of times settings changed per session : Save Settings

In [0]:
avg_num_of_times_the_settings_changed_per_session = comparison_summary("Save Settings", "avg_num_of_times_settings_changed_per_session", train_data, train_users, True, "number of times settings was changed")

Average number of thumbs up per session : Thumbs Up

In [0]:
avg_num_of_thumbs_up_per_session = comparison_summary("Thumbs Up", "avg_num_of_thumbs_up_per_session", train_data, train_users, True, "number of thumbs up")

Average number of thumbs down per session : Thumbs Down

In [0]:
avg_num_of_thumbs_down_per_session = comparison_summary("Thumbs Down", "avg_num_of_thumbs_down_per_session", train_data, train_users, True, "number of thumbs down")

Average number of add to playlists per session : Add to Playlist

In [0]:
avg_num_of_add_to_playlist_per_session = comparison_summary("Add to Playlist", "avg_num_of_add_to_playlist_per_session", train_data, train_users, True, "number of add to playlist")

Average number of AddFriends per session : Add Friend

In [0]:
avg_num_of_addfriends_per_session = comparison_summary("Add Friend", "avg_num_of_addfriends_per_session", train_data, train_users, True, "number of Add Friend")

Average number of Errors faced per session : Error

In [0]:
avg_number_of_errors_per_session = comparison_summary("Error", "avg_number_of_errors_per_session", train_data, train_users, True, "number of Errors")

Average number of visits to the upgrade page : Upgrade

In [0]:
avg_num_of_visits_to_upgrade_page = comparison_summary("Upgrade", "avg_num_of_visits_to_upgrade_page", train_data, train_users, True, "number of Upgrade")

Average number of visits to the downgrade page : Downgrade

In [0]:
avg_number_of_visits_to_downgrade_page = comparison_summary("Downgrade", "avg_number_of_visits_to_downgrade_page", train_data, train_users, True, "number of downgrades")

Number of downgrades : Submit Downgrade

In [0]:
number_of_downgrade_submits_per_user = comparison_summary_for_user("Submit Downgrade", "num_of_downgrades_submitted", train_data, train_users, True, "number of downgrades submitted")

Number of upgrades : Submit Upgrade

In [0]:
number_of_upgrade_submits_per_user = comparison_summary_for_user("Submit Upgrade", "num_of_upgrades_submitted", train_data, train_users, True, "number of upgrades submitted")

Average number of different artists listened per session

In [0]:
avg_num_of_artists_per_session = get_avg_number_of_artists_listened_per_session_per_user(train_data, train_users, plot=True, plot_title="Number of artists per session")

Number of different levels the user switched

In [0]:
num_of_times_user_changed_levels = get_number_of_times_each_user_changed_levels(train_data, train_users, plot=True)

In [0]:
users_gender = get_user_gender(train_data)
users_gender.show(30)

Aggregating these features into a single dataframe

In [0]:
final_train_df = aggregate_features(train_data, train_users, False)

In [0]:
display(final_train_df)

userId,churned,avg_num_of_add_to_playlist_per_session,avg_num_of_addfriends_per_session,avg_num_of_adverts_per_session,avg_num_of_artists_per_session,avg_num_of_songs_per_session,avg_num_of_thumbs_down_per_session,avg_num_of_thumbs_up_per_session,avg_num_of_times_settings_changed_per_session,average_number_of_visits_to_the_about_page_per_session,average_number_of_visits_to_the_help_page_per_session,avg_num_of_visits_to_home_per_session,avg_num_of_visits_to_the_settings_page_per_session,avg_num_of_visits_to_upgrade_page,avg_number_of_errors_per_session,avg_number_of_visits_to_downgrade_page,num_times_user_changed_levels,num_of_downgrades_submitted,num_of_upgrades_submitted,gender
100010,1.0,1.0,1.5,11.0,49.0,48.0,1.5,2.0,0.0,0.0,0.0,1.5,0.0,2.0,0.0,0.0,0,0,0,F
296,1.0,1.5,2.0,4.5,23.0,22.4,0.0,2.67,0.0,0.0,1.0,2.33,1.0,1.0,0.0,1.0,1,0,1,F
125,0.0,1.0,1.5,3.0,21.33,20.67,1.0,1.5,0.0,0.0,2.0,1.0,1.5,0.0,0.0,0.0,0,0,0,M
124,1.0,4.09,2.6,1.0,100.0,114.13,2.14,6.8,1.5,1.5,2.0,4.67,2.14,0.0,0.0,2.33,0,0,0,F
7,0.0,2.0,2.67,3.18,31.2,30.8,1.0,1.71,1.0,1.0,1.0,2.4,1.0,1.5,0.0,0.0,0,0,0,M
200037,1.0,2.0,1.0,1.8,18.0,17.43,1.5,1.25,0.0,1.0,0.0,2.0,1.0,0.0,0.0,0.0,0,0,0,M
169,0.0,0.0,6.0,2.17,14.88,14.0,0.0,1.2,1.0,0.0,0.0,1.5,1.0,2.0,0.0,0.0,0,0,0,M
205,0.0,5.0,5.25,0.0,190.4,209.6,2.5,12.5,1.0,1.5,2.75,8.6,1.5,0.0,1.0,3.33,0,0,0,F
272,0.0,1.5,0.0,6.0,48.5,48.5,2.0,3.5,0.0,0.0,0.0,2.5,0.0,1.0,0.0,0.0,0,0,0,F
232,0.0,4.36,4.3,3.17,77.35,81.57,1.7,5.11,1.0,1.2,2.1,5.0,1.63,1.5,1.0,2.8,3,1,2,F


In [0]:
final_test_df = aggregate_features(test_data, test_users, False)

In [0]:
display(final_test_df)

userId,churned,avg_num_of_add_to_playlist_per_session,avg_num_of_addfriends_per_session,avg_num_of_adverts_per_session,avg_num_of_artists_per_session,avg_num_of_songs_per_session,avg_num_of_thumbs_down_per_session,avg_num_of_thumbs_up_per_session,avg_num_of_times_settings_changed_per_session,average_number_of_visits_to_the_about_page_per_session,average_number_of_visits_to_the_help_page_per_session,avg_num_of_visits_to_home_per_session,avg_num_of_visits_to_the_settings_page_per_session,avg_num_of_visits_to_upgrade_page,avg_number_of_errors_per_session,avg_number_of_visits_to_downgrade_page,num_times_user_changed_levels,num_of_downgrades_submitted,num_of_upgrades_submitted,gender
138,0.0,2.5,2.33,3.36,31.36,33.38,1.33,3.38,1.0,0.0,1.4,2.5,1.5,1.0,1.0,0.0,0,0,0,M
15,0.0,3.35,2.46,0.0,97.0,101.94,1.33,5.17,0.0,1.0,1.5,4.4,1.09,0.0,1.0,2.36,0,0,0,M
200002,1.0,2.0,2.0,2.75,62.2,62.0,1.67,3.75,0.0,1.0,1.0,4.6,2.0,1.0,0.0,1.5,1,0,1,M
51,1.0,2.67,1.5,1.0,51.0,53.2,1.0,5.33,0.0,0.0,0.0,3.4,1.0,0.0,2.0,2.0,0,0,0,M
133,0.0,1.0,0.0,3.0,44.0,44.0,1.0,5.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,0,0,0,M
200010,0.0,1.33,2.5,2.0,28.8,28.0,2.5,2.67,0.0,0.0,0.0,2.8,1.0,1.0,0.0,0.0,0,0,0,F
3,0.0,0.0,0.0,1.0,25.0,24.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0,0,0,M
59,0.0,2.14,2.33,3.33,30.17,31.41,1.0,1.55,1.0,0.0,1.0,2.6,1.67,1.2,1.0,0.0,0,0,0,M
64,0.0,0.0,0.0,3.0,11.33,15.5,0.0,1.0,0.0,0.0,0.0,1.33,0.0,0.0,0.0,0.0,0,0,0,M
87,0.0,3.2,2.91,2.6,64.8,72.5,1.88,5.46,1.25,1.0,1.33,3.65,1.31,1.0,1.0,2.22,2,1,1,M


In [0]:
train_file_path = "/FileStore/tables/train_data.parquet"
test_file_path = "/FileStore/tables/test_data.parquet"
final_train_df.write.parquet(train_file_path)
final_test_df.write.parquet(test_file_path)