In [97]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import datetime

In [98]:
spark = SparkSession \
                    .builder \
                    .master("local") \
                    .appName("SparkPractice3") \
                    .getOrCreate()

In [99]:
spark.sparkContext.getConf().getAll()

[('spark.master', 'local'),
 ('spark.driver.host', 'host.docker.internal'),
 ('spark.eventLog.enabled', 'true'),
 ('spark.app.id', 'local-1571656623750'),
 ('spark.driver.port', '63814'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.eventLog.dir', 'file:/tmp/spark-events'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'SparkPractice3')]

In [100]:
spark

In [101]:
path = r".\songs_events.json"

In [102]:
df = spark.read.json(path)

In [103]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [104]:
df.describe()

DataFrame[summary: string, artist: string, auth: string, firstName: string, gender: string, itemInSession: string, lastName: string, length: string, level: string, location: string, method: string, page: string, registration: string, sessionId: string, song: string, status: string, ts: string, userAgent: string, userId: string]

In [105]:
df.show(n=1)

+------------+---------+---------+------+-------------+--------+--------+-----+--------------------+------+--------+-----------------+---------+---------+------+-------------+--------------------+------+
|      artist|     auth|firstName|gender|itemInSession|lastName|  length|level|            location|method|    page|     registration|sessionId|     song|status|           ts|           userAgent|userId|
+------------+---------+---------+------+-------------+--------+--------+-----+--------------------+------+--------+-----------------+---------+---------+------+-------------+--------------------+------+
|Miami Horror|Logged In|     Kate|     F|           88| Harrell|250.8273| paid|Lansing-East Lans...|   PUT|NextSong|1.540472624796E12|      293|Sometimes|   200|1541548876796|"Mozilla/5.0 (X11...|    97|
+------------+---------+---------+------+-------------+--------+--------+-----+--------------------+------+--------+-----------------+---------+---------+------+-------------+---------

In [106]:
df.take(1)

[Row(artist='Miami Horror', auth='Logged In', firstName='Kate', gender='F', itemInSession=88, lastName='Harrell', length=250.8273, level='paid', location='Lansing-East Lansing, MI', method='PUT', page='NextSong', registration=1540472624796.0, sessionId=293, song='Sometimes', status=200, ts=1541548876796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"', userId='97')]

In [107]:
df.describe("artist").show()

+-------+---------+
|summary|   artist|
+-------+---------+
|  count|      972|
|   mean|     null|
| stddev|     null|
|    min|      !!!|
|    max|zebrahead|
+-------+---------+



In [108]:
df.describe("sessionID").show()

+-------+------------------+
|summary|         sessionID|
+-------+------------------+
|  count|              1117|
|   mean|488.28558639212173|
| stddev|157.11506381542455|
|    min|                 8|
|    max|               674|
+-------+------------------+



In [109]:
df.count()

1117

In [110]:
df.select("page").dropDuplicates().sort("page").show()

+-------------+
|         page|
+-------------+
|        About|
|    Downgrade|
|         Help|
|         Home|
|        Login|
|       Logout|
|     NextSong|
|Save Settings|
|     Settings|
|      Upgrade|
+-------------+



In [111]:
df.select(["userId", "firstname", "page", "level", "song"]).where(df.userId == "80").collect()

[Row(userId='80', firstname='Tegan', page='Home', level='free', song=None),
 Row(userId='80', firstname='Tegan', page='NextSong', level='free', song="Baby I'm Yours"),
 Row(userId='80', firstname='Tegan', page='Home', level='paid', song=None),
 Row(userId='80', firstname='Tegan', page='NextSong', level='paid', song='Best Of Both Worlds (Remastered Album Version)'),
 Row(userId='80', firstname='Tegan', page='NextSong', level='paid', song='Call Me If You Need Me'),
 Row(userId='80', firstname='Tegan', page='NextSong', level='paid', song='Home'),
 Row(userId='80', firstname='Tegan', page='NextSong', level='paid', song='OMG'),
 Row(userId='80', firstname='Tegan', page='Home', level='paid', song=None),
 Row(userId='80', firstname='Tegan', page='NextSong', level='paid', song='Candle On The Water'),
 Row(userId='80', firstname='Tegan', page='NextSong', level='paid', song='Our Song'),
 Row(userId='80', firstname='Tegan', page='NextSong', level='paid', song='Baby Boy [feat. Beyonce]'),
 Row(use

In [112]:
get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).hour)

In [113]:
df = df.withColumn("hour", get_hour(df.ts))

In [114]:
df.head()

Row(artist='Miami Horror', auth='Logged In', firstName='Kate', gender='F', itemInSession=88, lastName='Harrell', length=250.8273, level='paid', location='Lansing-East Lansing, MI', method='PUT', page='NextSong', registration=1540472624796.0, sessionId=293, song='Sometimes', status=200, ts=1541548876796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"', userId='97', hour='3')

In [115]:
songs_in_hour = df.filter(df.page == "NextSong") \
                    .groupby(df.hour) \
                    .count() \
                    .orderBy(df.hour.cast("float"))

In [116]:
songs_in_hour.show()

+----+-----+
|hour|count|
+----+-----+
|   0|   43|
|   1|   34|
|   2|   29|
|   3|   27|
|   4|    5|
|   5|    2|
|   6|    6|
|   8|   10|
|   9|   15|
|  10|   13|
|  11|   12|
|  12|   30|
|  13|   55|
|  14|   63|
|  15|   44|
|  16|   50|
|  17|   70|
|  18|   61|
|  19|   86|
|  20|   78|
+----+-----+
only showing top 20 rows



In [117]:
song_in_hour_pd = songs_in_hour.toPandas()

In [118]:
song_in_hour_pd.head()

Unnamed: 0,hour,count
0,0,43
1,1,34
2,2,29
3,3,27
4,4,5


In [119]:
df_valid = df.dropna(how = "any", subset = ["userId", "sessionId"])

In [120]:
df_valid.count()

1117

In [121]:
df_valid = df_valid.filter(df_valid["userId"] != "")

In [122]:
df_valid.count()

1089

In [123]:
# beofre and after specific event


In [124]:
flag_downgrade_event = udf(lambda x: 1 if x == "Downgrade" else 0, IntegerType())

In [125]:
df_valid = df_valid.withColumn("downgraded", flag_downgrade_event("page"))

In [126]:
df_valid.head()

Row(artist='Miami Horror', auth='Logged In', firstName='Kate', gender='F', itemInSession=88, lastName='Harrell', length=250.8273, level='paid', location='Lansing-East Lansing, MI', method='PUT', page='NextSong', registration=1540472624796.0, sessionId=293, song='Sometimes', status=200, ts=1541548876796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"', userId='97', hour='3', downgraded=0)

In [131]:
from pyspark.sql import Window
from pyspark.sql.functions import desc
from pyspark.sql.functions import sum as Fsum

In [132]:
windowval = Window.partitionBy("userId").orderBy(desc("ts")).rangeBetween(Window.unboundedPreceding, 0)

In [133]:
df_valid = df_valid.withColumn("phase", Fsum("downgraded").over(windowval))

In [137]:
df_valid.select(["userId", "firstname", "ts", "page", "level", "phase"]) \
    .where(df_valid.userId == "80") \
    .sort("ts") \
    .collect()

[Row(userId='80', firstname='Tegan', ts=1541577563796, page='Home', level='free', phase=4),
 Row(userId='80', firstname='Tegan', ts=1541578087796, page='NextSong', level='free', phase=4),
 Row(userId='80', firstname='Tegan', ts=1542260277796, page='Home', level='paid', phase=4),
 Row(userId='80', firstname='Tegan', ts=1542260935796, page='NextSong', level='paid', phase=4),
 Row(userId='80', firstname='Tegan', ts=1542261224796, page='NextSong', level='paid', phase=4),
 Row(userId='80', firstname='Tegan', ts=1542261356796, page='NextSong', level='paid', phase=4),
 Row(userId='80', firstname='Tegan', ts=1542261662796, page='NextSong', level='paid', phase=4),
 Row(userId='80', firstname='Tegan', ts=1542261713796, page='Home', level='paid', phase=4),
 Row(userId='80', firstname='Tegan', ts=1542262057796, page='NextSong', level='paid', phase=4),
 Row(userId='80', firstname='Tegan', ts=1542262233796, page='NextSong', level='paid', phase=4),
 Row(userId='80', firstname='Tegan', ts=154226243479

In [95]:
spark.stop()

## General functions


**select()** : returns a new dataframe with the selected columns

**filter()** : filters rows using the given condition

**where()** : is just an alias for filter()

**groupBy()** : groups the DataFrame using the specified columns, so we can run aggregation on them

**sort()** : returns a new DataFrame sorted by the specified column(s). By default the second parameter 'ascending' is True

**dropDuplicates()** : returns a new dataframe with unique rows based on all or just a subset of columns

**withColumn()** : returns a new DataFrame by adding a column or replacing the existing column that has the same name. The first parameter is the name of the new column, the second is an expression of how to compute it

## Aggregate functions


Spark SQL provides built-in methods for the most common aggregations pyspark.sql.functions module, such as :


    count(), countDistinct(), avg(), max(), min(), etc. 
    
    
**These methods are not the same as the built-in methods in the Python Standard Library, where we can find min() for example as well, hence you need to be careful not to try to use them interchangeably.**

**In many cases, there are multiple ways to express the same aggregations.**

For example, if we would like to compute one type of aggregate for one or more columns of the dataframe we can just simply chain the aggregate method after a groupBy(). 

If we would like to use different functions on different columns agg() comes in handy. For example agg({"salary": "avg", "age": "max"}) computes the average salary and maximum age.

## User defined functions (UDF)


In Spark SQL we can define our own functions with the udf method from the **pyspark.sql.functions** module. 

The default type of the returned variable for UDFs is ****string****. If we would like to return an other type we need to explicitly do so by using the different types from the **pyspark.sql.types** module.

## Window functions

Window functions are a way of combining the values of ranges of rows in a dataframe. 

When defining the window we can choose how to sort and group (with the partitionBy method) the rows and how wide of a window we'd like to use (described by rangeBetween or rowsBetween).

https://spark.apache.org/docs/latest/api/python/index.html