## Continuation of spark_lab_2

In [1]:
# Which page did user id "" (empty string) NOT visit?

# Data Wrangling User Logs
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
spark = SparkSession \
    .builder \
    .appName("Wrangling Data Quiz") \
    .getOrCreate()
path = "/Users/makbulhussain/Downloads/sparkify_log_small.json"
user_log = spark.read.json(path)


23/08/10 07:04:24 WARN Utils: Your hostname, Makbuls-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.10.30.219 instead (on interface en0)
23/08/10 07:04:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/10 07:04:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/08/10 07:04:26 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

In [13]:
# using sql syntax
user_log.createOrReplaceTempView("user_log")
# Equivalent SQL query

# filter for users with blank user id

sql_query = """
    SELECT DISTINCT page AS blank_pages
    FROM user_log
    WHERE userId = ''
"""

# Execute the SQL query
result = spark.sql(sql_query)

# Show the result
result.show()

+-----------+
|blank_pages|
+-----------+
|       Home|
|      About|
|      Login|
|       Help|
+-----------+



In [9]:
from pyspark.sql.functions import col, desc, udf, col

In [10]:
blank_pages_df = user_log.filter(user_log.userId == '') \
    .select(col('page') \
    .alias('blank_pages')) \
    .dropDuplicates()

In [11]:
blank_pages_df.show()

+-----------+
|blank_pages|
+-----------+
|       Home|
|      About|
|      Login|
|       Help|
+-----------+



In [20]:
# What type of user does the empty string user id most likely refer to?

sql_query = """
    SELECT DISTINCT page AS blank_pages, count(*) as count
    FROM user_log
    WHERE userId = '' 
GROUP BY blank_pages
order by count desc
"""

# Execute the SQL query
result = spark.sql(sql_query)

# Show the result
result.show()

+-----------+-----+
|blank_pages|count|
+-----------+-----+
|       Home|  187|
|      Login|  126|
|      About|   15|
|       Help|    8|
+-----------+-----+



In [29]:
# How many female users do we have in the data set?

sql_query = """
    SELECT COUNT(DISTINCT userId)
FROM user_log
WHERE gender = 'F'

"""

# Execute the SQL query
result = spark.sql(sql_query)

# Show the result
result.show()


+----------------------+
|count(DISTINCT userId)|
+----------------------+
|                   462|
+----------------------+



In [32]:
# How many songs were played from the most played artist?


sql_query = """
    SELECT Artist, COUNT(Artist) AS Playcount
FROM user_log
WHERE page = 'NextSong'
GROUP BY Artist
ORDER BY Playcount DESC
LIMIT 2


"""

# Execute the SQL query
result = spark.sql(sql_query)

# Show the result
result.show()


+-------------+---------+
|       Artist|Playcount|
+-------------+---------+
|     Coldplay|       83|
|Kings Of Leon|       69|
+-------------+---------+



In [34]:
user_log.filter(user_log.page == 'NextSong') \
    .select('Artist') \
    .groupBy('Artist') \
    .agg({'Artist':'count'}) \
    .withColumnRenamed('count(Artist)', 'Playcount') \
    .sort(desc('Playcount')) \
    .show(1)

+--------+---------+
|  Artist|Playcount|
+--------+---------+
|Coldplay|       83|
+--------+---------+
only showing top 1 row



In [44]:
# How many songs do users listen to on average between visiting our home page? 
# Please round your answer to the closest integer.
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.window import Window

user_window = Window \
    .partitionBy('userID') \
    .orderBy(desc('ts')) \
    .rangeBetween(Window.unboundedPreceding, 0)

ishome = udf(lambda ishome : int(ishome == 'Home'), IntegerType())

# Filter only NextSong and Home pages, add 1 for each time they visit Home
# Adding a column called period which is a specific interval between Home visits
cusum = user_log.filter((user_log.page == 'NextSong') | (user_log.page == 'Home')) \
    .select('userID', 'page', 'ts') \
    .withColumn('homevisit', ishome(col('page'))) \
    .withColumn('period', Fsum('homevisit') \
    .over(user_window)) 
    
cusum.take(3)

[Row(userID='', page='Home', ts=1513846494284, homevisit=1, period=1),
 Row(userID='', page='Home', ts=1513845761284, homevisit=1, period=2),
 Row(userID='', page='Home', ts=1513845132284, homevisit=1, period=3)]

In [38]:
# This will only show 'Home' in the first several rows due to default sorting

cusum.show(300)



+------+--------+-------------+---------+------+
|userID|    page|           ts|homevisit|period|
+------+--------+-------------+---------+------+
|      |    Home|1513846494284|        1|     1|
|      |    Home|1513845761284|        1|     2|
|      |    Home|1513845132284|        1|     3|
|      |    Home|1513845055284|        1|     4|
|      |    Home|1513844251284|        1|     5|
|      |    Home|1513844026284|        1|     6|
|      |    Home|1513843602284|        1|     7|
|      |    Home|1513843098284|        1|     8|
|      |    Home|1513842367284|        1|     9|
|      |    Home|1513841138284|        1|    10|
|      |    Home|1513841121284|        1|    11|
|      |    Home|1513839824284|        1|    12|
|      |    Home|1513838865284|        1|    13|
|      |    Home|1513838857284|        1|    14|
|      |    Home|1513838835284|        1|    15|
|      |    Home|1513838141284|        1|    16|
|      |    Home|1513838110284|        1|    17|
|      |    Home|151

                                                                                

In [39]:
# See how many songs were listened to on average during each period
cusum.filter((cusum.page == 'NextSong')) \
    .groupBy('userID', 'period') \
    .agg({'period':'count'}) \
    .agg({'count(period)':'avg'}) \
    .show()

+------------------+
|avg(count(period))|
+------------------+
| 6.898347107438017|
+------------------+



In [97]:
user_log.head(2)

[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046'),
 Row(artist='Lily Allen', auth='Logged In', firstName='Elizabeth', gender='F', itemInSession=7, lastName='Chase', length=195.23873, level='free', location='Shreveport-Bossier City, LA', method='PUT', page='NextSong', registration=1512718541284, sessionId=5027, song='Cheryl Tweedy', status=200, ts=1513720878284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1000')]

In [96]:
# Calculate the period using a window function (SQL)
user_log_with_period = spark.sql("""
    SELECT *,
           SUM(CASE WHEN page = 'Home' THEN 1 ELSE 0 END) OVER (PARTITION BY userID ORDER BY ts ASC) AS period
    FROM user_log
""")

# Filter only 'NextSong' events (SQL)
next_song_logs = user_log_with_period.filter("page = 'NextSong'")

# Group by 'userID' and 'period', and count 'NextSong' events
result = next_song_logs.groupBy("userID", "period").count()

# Calculate the average of songs listened during each period
overall_avg_result = result.groupBy().agg(
    avg("count").alias("avg_songs_per_period")
)

# Show the result
overall_avg_result.show()

+--------------------+
|avg_songs_per_period|
+--------------------+
|  6.9558333333333335|
+--------------------+



In [103]:

user_log_with_period.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- period: long (nullable = true)



## Window function

In [108]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a Spark session
spark = SparkSession.builder.appName("WindowExample").getOrCreate()

# Sample data
data = [
    Row(userID='user1', ts=100, value=10),
    Row(userID='user1', ts=200, value=15),
    Row(userID='user1', ts=300, value=5),
    Row(userID='user2', ts=150, value=7),
    Row(userID='user2', ts=250, value=12),
    Row(userID='user2', ts=350, value=8),
]

# Create a DataFrame
df = spark.createDataFrame(data)


from pyspark.sql.window import Window
from pyspark.sql.functions import col, sum

# Define the window specification
window_spec = Window.partitionBy('userID').orderBy(col('ts').asc())

# Add a new column with the rolling sum using the window specification
df_with_window = df.withColumn('rolling_sum', sum(col('value')).over(window_spec))
df_with_window.show()


23/08/10 10:02:10 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
[Stage 275:>                                                        (0 + 8) / 8]

+------+---+-----+-----------+
|userID| ts|value|rolling_sum|
+------+---+-----+-----------+
| user1|100|   10|         10|
| user1|200|   15|         25|
| user1|300|    5|         30|
| user2|150|    7|          7|
| user2|250|   12|         19|
| user2|350|    8|         27|
+------+---+-----+-----------+



                                                                                

In [109]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("WindowExample").getOrCreate()

# Sample data
data = [
    ("user1", 100, 10),
    ("user1", 200, 15),
    ("user1", 300, 5),
    ("user2", 150, 7),
    ("user2", 250, 12),
    ("user2", 350, 8),
]

# Define the schema
schema = ["userID", "ts", "value"]

# Create a DataFrame
df = spark.createDataFrame(data, schema)

# Register the DataFrame as a temporary view
df.createOrReplaceTempView("df_table")
from pyspark.sql import functions as F

# Add a new column with the rolling sum using SQL window function
df_with_window_sql = spark.sql("""
    SELECT *,
           SUM(value) OVER (PARTITION BY userID ORDER BY ts ASC) AS rolling_sum
    FROM df_table
""")

df_with_window_sql.show()


+------+---+-----+-----------+
|userID| ts|value|rolling_sum|
+------+---+-----+-----------+
| user1|100|   10|         10|
| user1|200|   15|         25|
| user1|300|    5|         30|
| user2|150|    7|          7|
| user2|250|   12|         19|
| user2|350|    8|         27|
+------+---+-----+-----------+



In [111]:
df_with_flag = spark.sql("""
    SELECT *,
           CASE WHEN userID = 'user1' THEN 1 ELSE 0 END AS flag
    FROM df_table
""")

df_with_flag.show()

+------+---+-----+----+
|userID| ts|value|flag|
+------+---+-----+----+
| user1|100|   10|   1|
| user1|200|   15|   1|
| user1|300|    5|   1|
| user2|150|    7|   0|
| user2|250|   12|   0|
| user2|350|    8|   0|
+------+---+-----+----+



In [15]:
# get a list of possible pages that could be visited
all_pages_df = user_log.select('page').dropDuplicates()

# find values in all_pages that are not in blank_pages
# these are the pages that the blank user did not go to
# NOTE WE SHOULD NOT USE .collect() on large datasets (>100 MB)
for row in set(all_pages_df.collect()) - set(blank_pages_df.collect()):
    print(row.page)

Save Settings
Error
Downgrade
Settings
Submit Upgrade
Logout
NextSong
Upgrade
Submit Downgrade
