# Data Wrangling with Spark

This is the code used in the previous screencast. Run each code cell to understand what the code does and how it works.

These first three cells import libraries, instantiate a SparkSession, and then read in the data set

In [37]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

In [38]:
spark = SparkSession \
    .builder \
    .appName("Wrangling Data") \
    .getOrCreate()

In [39]:
path = "data/sparkify_log_small.json"
user_log = spark.read.json(path)

In [40]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [42]:
user_log.select("page").where(user_log.userId == "").dropDuplicates().sort("page").show()

+-----+
| page|
+-----+
|About|
| Help|
| Home|
|Login|
+-----+



In [45]:
user_log.select("gender").dropDuplicates().show()

+------+
|gender|
+------+
|     F|
|  null|
|     M|
+------+



In [49]:
user_log.select("userId", "gender").dropDuplicates().filter(user_log.gender == "F").count()

462

In [51]:
#most played artist
#creo una columna con el conteo de artistas
user_group = user_log.groupby("artist")

In [64]:
user_group.show()

AttributeError: 'GroupedData' object has no attribute 'show'

In [68]:
user_log.filter(user_log.page == "Home").avg("userId")

AttributeError: 'DataFrame' object has no attribute 'avg'

In [58]:
user_log.select("song").where(user_log.artist=="The Black Keys").count()

40

In [10]:
user_log.countDistinct("artist")

AttributeError: 'DataFrame' object has no attribute 'countDistinct'

# Data Exploration 

The next cells explore the data set.

In [None]:
user_log.take(5)

In [11]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [None]:
user_log.describe().show()

In [None]:
user_log.describe("artist").show()

In [None]:
user_log.describe("sessionId").show()

In [None]:
user_log.count()

In [11]:
user_log.countDistinct("sessionID")

AttributeError: 'DataFrame' object has no attribute 'countDistinct'

In [12]:
user_log.select("page").dropDuplicates().sort("page").show()

+----------------+
|            page|
+----------------+
|           About|
|       Downgrade|
|           Error|
|            Help|
|            Home|
|           Login|
|          Logout|
|        NextSong|
|   Save Settings|
|        Settings|
|Submit Downgrade|
|  Submit Upgrade|
|         Upgrade|
+----------------+



In [20]:
user_log.select("artist").where(user_log.artist != "null").show(10)

+--------------------+
|              artist|
+--------------------+
|       Showaddywaddy|
|          Lily Allen|
|Cobra Starship Fe...|
|          Alex Smoke|
|              Redman|
|     Ulrich Schnauss|
|               Jay-Z|
|         Evanescence|
|     Scissor Sisters|
|        3 Doors Down|
+--------------------+
only showing top 10 rows



In [26]:
user_log.filter(user_log.artist == "Alex Smoke").show(10)

+----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+-------------------+------+-------------+--------------------+------+
|    artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|               song|status|           ts|           userAgent|userId|
+----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+-------------------+------+-------------+--------------------+------+
|Alex Smoke|Logged In|   Sophee|     F|            8|  Barker|405.99465| paid|San Luis Obispo-P...|   PUT|NextSong|1513009647284|     2372|Don't See The Point|   200|1513720905284|"Mozilla/5.0 (Win...|  2373|
+----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+-------------------+-----

In [36]:
user_log.filter(user_log.artist.contains("Springsteen")).select("artist", "location").show()

+-----------------+--------------------+
|           artist|            location|
+-----------------+--------------------+
|Bruce Springsteen|Phoenix-Mesa-Scot...|
|Bruce Springsteen|Atlanta-Sandy Spr...|
|Bruce Springsteen|Miami-Fort Lauder...|
|Bruce Springsteen|       Blackfoot, ID|
+-----------------+--------------------+



In [34]:
user_log.select(["userId", "firstname", "page", "song"]).where(user_log.userId == "1046").collect()

[Row(userId='1046', firstname='Kenneth', page='NextSong', song='Christmas Tears Will Fall'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Be Wary Of A Woman'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Public Enemy No.1'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Reign Of The Tyrants'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Father And Son'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='No. 5'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Seventeen'),
 Row(userId='1046', firstname='Kenneth', page='Home', song=None),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='War on war'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Killermont Street'),
 Row(userId='1046', firstname='Kenneth', page='NextSong', song='Black & Blue'),
 Row(userId='1046', firstname='Kenneth', page='Logout', song=None),
 Row(userId='1046', firstname='Kenneth'

# Calculating Statistics by Hour

In [None]:
get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0). hour)

In [None]:
user_log = user_log.withColumn("hour", get_hour(user_log.ts))

In [None]:
user_log.head()

In [None]:
songs_in_hour = user_log.filter(user_log.page == "NextSong").groupby(user_log.hour).count().orderBy(user_log.hour.cast("float"))

In [None]:
songs_in_hour.show()

In [None]:
songs_in_hour_pd = songs_in_hour.toPandas()
songs_in_hour_pd.hour = pd.to_numeric(songs_in_hour_pd.hour)

In [None]:
plt.scatter(songs_in_hour_pd["hour"], songs_in_hour_pd["count"])
plt.xlim(-1, 24);
plt.ylim(0, 1.2 * max(songs_in_hour_pd["count"]))
plt.xlabel("Hour")
plt.ylabel("Songs played");

# Drop Rows with Missing Values

As you'll see, it turns out there are no missing values in the userID or session columns. But there are userID values that are empty strings.

In [None]:
user_log_valid = user_log.dropna(how = "any", subset = ["userId", "sessionId"])

In [None]:
user_log_valid.count()

In [None]:
user_log.select("userId").dropDuplicates().sort("userId").show()

In [None]:
user_log_valid = user_log_valid.filter(user_log_valid["userId"] != "")

In [None]:
user_log_valid.count()

# Users Downgrade Their Accounts

Find when users downgrade their accounts and then flag those log entries. Then use a window function and cumulative sum to distinguish each user's data as either pre or post downgrade events.

In [None]:
user_log_valid.filter("page = 'Submit Downgrade'").show()

In [None]:
user_log.select(["userId", "firstname", "page", "level", "song"]).where(user_log.userId == "1138").collect()

In [None]:
flag_downgrade_event = udf(lambda x: 1 if x == "Submit Downgrade" else 0, IntegerType())

In [None]:
user_log_valid = user_log_valid.withColumn("downgraded", flag_downgrade_event("page"))

In [None]:
user_log_valid.head()

In [None]:
from pyspark.sql import Window

In [None]:
windowval = Window.partitionBy("userId").orderBy(desc("ts")).rangeBetween(Window.unboundedPreceding, 0)

In [None]:
user_log_valid = user_log_valid.withColumn("phase", Fsum("downgraded").over(windowval))

In [None]:
user_log_valid.select(["userId", "firstname", "ts", "page", "level", "phase"]).where(user_log.userId == "1138").sort("ts").collect()