In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

In [None]:
# create spark session
spark = SparkSession.builder.appName("Wrangling Data").getOrCreate()

In [None]:
# load log data
path = "./data/sparkify_log_small.json"
user_log = spark.read.json(path)

In [None]:
user_log.printSchema()

## Which page did user id "" (empty string) NOT visit?

In [None]:
user_log.count()

In [None]:
user_log.select("page").dropDuplicates().sort("page").show()

In [None]:
blank_pages = user_log.filter(user_log.userId == "").select("page").dropDuplicates()
blank_pages.show()
all_pages = user_log.select("page").dropDuplicates()


In [None]:
for col in set(all_pages.collect()) - set(blank_pages.collect()):
    print(col.page)

## How many female users do we have in the data set?


In [None]:
user_log.select(["userId", "gender"]).dropDuplicates().groupby(user_log.gender).count().show()

## How many songs were played from the most played artist?


In [None]:
user_log.filter(user_log.page == "NextSong").groupby(user_log.artist).count().orderBy(desc("count")).show(1)

## How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.

In [None]:
mark_home = udf(lambda ishome: int(ishome == "Home"), IntegerType())

In [None]:
from pyspark.sql import  Window

In [None]:
user_window = Window.partitionBy("userId").orderBy(desc("ts")).rangeBetween(Window.unboundedPreceding, 0)

In [None]:
cusum = user_log.filter((user_log.page == "NextSong") | (user_log.page == "Home"))\
        .select("userId", "page", "ts")\
        .withColumn("homevisit", mark_home(col("page")))\
        .withColumn("period", Fsum("homevisit").over(user_window
        ))

In [None]:
function = udf(lambda ishome : int(ishome == 'Home'), IntegerType())

user_window = Window \
    .partitionBy('userId') \
    .orderBy(desc('ts')) \
    .rangeBetween(Window.unboundedPreceding, 0)

In [None]:
cusum = user_log.filter((user_log.page == 'NextSong') | (user_log.page == 'Home')) \
    .select('userID', 'page', 'ts') \
    .withColumn('homevisit', function(col('page'))) \
    .withColumn('period', Fsum('homevisit').over(user_window))