In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# creating a spark session
spark = SparkSession.builder.appName("Data wrangling with Spark SQL").getOrCreate()

# setting the directory for the data
data_dir = 'C:/Users/John/PycharmProjects/customer-attrition/data/'

# Reading in the JSON file as a spark dataframe
df = spark.read.json(data_dir + 'sparkify_log_small.json')

df.createOrReplaceTempView('df_table')


In [44]:
# SELECT distinct pages for the blank user and distinct pages for all users
# Right join the results to find pages that blank visitor did not visit

spark.sql('''
    SELECT *
    FROM (SELECT DISTINCT page
    FROM df_table
    WHERE userId = "") AS blank_pages
    RIGHT JOIN (SELECT DISTINCT page
    FROM df_table) AS all_pages
    ON blank_pages.page = all_pages.page
    WHERE blank_pages.page IS NULL
''').show()

+----+----------------+
|page|            page|
+----+----------------+
|null|Submit Downgrade|
|null|       Downgrade|
|null|          Logout|
|null|   Save Settings|
|null|        Settings|
|null|        NextSong|
|null|         Upgrade|
|null|           Error|
|null|  Submit Upgrade|
+----+----------------+



In [5]:
#Which page did user id ""(empty string) NOT visit?
spark.sql('''
    SELECT *
    FROM (SELECT DISTINCT T1.page
    FROM df_table as T1
    EXCEPT
    SELECT DISTINCT T2.page
    FROM df_table as T2 
    WHERE T2.userID = "") AS T3
    ORDER BY T3.page ASC
''').show()

+----------------+
|            page|
+----------------+
|       Downgrade|
|           Error|
|          Logout|
|        NextSong|
|   Save Settings|
|        Settings|
|Submit Downgrade|
|  Submit Upgrade|
|         Upgrade|
+----------------+



In [6]:
# How many female users do we have in the data set?
spark.sql('''
    SELECT COUNT(*)
    FROM (SELECT DISTINCT T.userId, T.gender
    FROM df_table as T
    WHERE T.gender = 'F')
''').show()

+--------+
|count(1)|
+--------+
|     462|
+--------+



In [9]:
# This is a much better and easy way.
spark.sql('''
    SELECT COUNT(DISTINCT userId)
    FROM df_table
    WHERE gender == 'F'
''').show()

+----------------------+
|count(DISTINCT userId)|
+----------------------+
|                   462|
+----------------------+



In [22]:
# How many songs were played from the most played artist?
spark.sql('''
    SELECT COUNT(T1.song)
    FROM df_table as T1
    WHERE T1.artist = (SELECT T2.artist
    FROM df_table as T2
    WHERE T2.artist != ""
    GROUP BY T2.artist
    ORDER BY count(1) DESC
    LIMIT 1)
''').show()

+-----------+
|count(song)|
+-----------+
|         83|
+-----------+



In [32]:
spark.sql('''
    SELECT artist, COUNT(artist) as plays
    FROM df_table
    GROUP BY artist
    ORDER BY plays DESC
    LIMIT 1
''').show()

+--------+-----+
|  artist|plays|
+--------+-----+
|Coldplay|   83|
+--------+-----+



In [34]:
play_counts = spark.sql('''
        SELECT Artist, COUNT(Artist) AS plays
        FROM df_table
        GROUP BY Artist
        ''')

In [36]:
play_counts.createOrReplaceTempView('artist_counts')

In [49]:
play_counts.show()

+--------------------+-----+
|              Artist|plays|
+--------------------+-----+
|      The Black Keys|   40|
|        STRATOVARIUS|    1|
|      The Chameleons|    1|
|Dashboard Confess...|    3|
|      Jarabe De Palo|    3|
|        Ziggy Marley|    1|
|        Yann Tiersen|   10|
|  The Watts Prophets|    1|
|            Goldfish|    1|
|           Kate Nash|    3|
|              DJ Taz|    1|
|    Jane's Addiction|    1|
|         Eva Cassidy|    4|
|               Rufio|    1|
|           Los Lobos|    4|
|         Silverstein|    1|
|        Rhett Miller|    1|
|              Nebula|    1|
|Yonder Mountain S...|    1|
|        Generation X|    1|
+--------------------+-----+
only showing top 20 rows



In [53]:
spark.sql("SELECT a2.Artist, a2.plays FROM \
          (SELECT max(plays) AS max_plays FROM artist_counts) AS a1 \
          JOIN artist_counts AS a2 \
          ON a1.max_plays = a2.plays \
          ").show()

+--------+-----+
|  Artist|plays|
+--------+-----+
|Coldplay|   83|
+--------+-----+



In [59]:
spark.sql('''
    SELECT A2.Artist, A2.plays
    FROM (SELECT MAX(plays) AS max_plays
    FROM artist_counts) AS A1
    LEFT JOIN artist_counts AS A2
    ON A1.max_plays = A2.plays
''').show()
# we are using the same thing twice just by taking the max from one of them and using left join. Cool!

+--------+-----+
|  Artist|plays|
+--------+-----+
|Coldplay|   83|
+--------+-----+



In [12]:
# Here is another solution
spark.sql('''
        SELECT Artist, COUNT(Artist) AS plays
        FROM df_table
        GROUP BY Artist
        ORDER BY plays DESC
        LIMIT 1
        ''').show()

+--------+-----+
|  Artist|plays|
+--------+-----+
|Coldplay|   83|
+--------+-----+



In [16]:
spark.sql('''
    SELECT COUNT(DISTINCT T.song)
    FROM df_table as T
    WHERE T.artist = (SELECT artist
    FROM df_table
    WHERE artist != ""
    GROUP BY artist
    ORDER BY count(1) DESC
    LIMIT 1)
''').show()

+--------------------+
|count(DISTINCT song)|
+--------------------+
|                  24|
+--------------------+



In [56]:
home_visit = udf(lambda x: 1 if x == 'Home' else 0, IntegerType())

In [57]:
from pyspark.sql import Window
window_val = Window.partitionBy('userId').orderBy(desc('ts')).rangeBetween(Window.unboundedPreceding,0)

In [59]:
df_temp = df.filter((df.page == 'NextSong') | (df.page == 'Home')).select('userId', 'page', 'ts')\
.withColumn('homevisit', home_visit('page')).withColumn('period', Fsum('homevisit').over(window_val))

In [70]:
df_temp.filter(df_temp.page == 'NextSong').groupBy('userId','period').agg({'period':'count'})\
.agg({'count(period)':'avg'}).show()

+------------------+
|avg(count(period))|
+------------------+
| 6.898347107438017|
+------------------+



In [75]:
spark.udf.register('home_visit', lambda x: 1 if x == 'Home' else 0, IntegerType())

<function __main__.<lambda>(x)>

In [78]:
from pyspark.sql import Window
window_val = Window.partitionBy('userId').orderBy(desc('ts')).rangeBetween(Window.unboundedPreceding,0)

In [55]:
# Lets tackle the fifth question
# How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.

is_home = spark.sql('''
    SELECT userId, page, ts, CASE WHEN page = 'Home' THEN 1 ELSE 0 END AS is_home
    FROM df_table
    WHERE page = 'NextSong' or page = 'Home'
''')

is_home.createOrReplaceTempView('is_home_table')

In [59]:
cumulative_sum = spark.sql('''
    SELECT *, SUM(is_home) OVER (PARTITION BY userId ORDER BY ts DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS period
    FROM is_home_table
''')

cumulative_sum.createOrReplaceTempView('period_table')

In [62]:
spark.sql('''
    SELECT AVG(T.count_results)
    FROM (SELECT userId, page, period, COUNT(*) as count_results
    FROM period_table
    GROUP BY userId, page, period
    HAVING page = 'NextSong') as T
''').show()

+------------------+
|avg(count_results)|
+------------------+
| 6.898347107438017|
+------------------+



In [60]:
spark.sql('''
    SELECT *
    FROM period_table
''').show()

+------+--------+-------------+-------+------+
|userId|    page|           ts|is_home|period|
+------+--------+-------------+-------+------+
|  1436|NextSong|1513783259284|      0|     0|
|  1436|NextSong|1513782858284|      0|     0|
|  2088|    Home|1513805972284|      1|     1|
|  2088|NextSong|1513805859284|      0|     1|
|  2088|NextSong|1513805494284|      0|     1|
|  2088|NextSong|1513805065284|      0|     1|
|  2088|NextSong|1513804786284|      0|     1|
|  2088|NextSong|1513804555284|      0|     1|
|  2088|NextSong|1513804196284|      0|     1|
|  2088|NextSong|1513803967284|      0|     1|
|  2088|NextSong|1513803820284|      0|     1|
|  2088|NextSong|1513803651284|      0|     1|
|  2088|NextSong|1513803413284|      0|     1|
|  2088|NextSong|1513803254284|      0|     1|
|  2088|NextSong|1513803057284|      0|     1|
|  2088|NextSong|1513802824284|      0|     1|
|  2162|NextSong|1513781246284|      0|     0|
|  2162|NextSong|1513781065284|      0|     0|
|  2162|NextS