In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import functions as f
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder.appName("Badminton Court Analysis - 2").getOrCreate()

In [3]:
spark

In [4]:
# Define the schema corresponding to the data
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("kit_id", IntegerType(), True),
    StructField("login_date", StringType(), True),
    StructField("session_count", IntegerType(), True)
])

In [5]:
data = [
    (1, 2, "2016-03-01", 5),
    (1, 2, "2016-03-02", 6),
    (2, 3, "2017-06-25", 1),
    (3, 1, "2016-03-02", 0),
    (3, 4, "2018-07-03", 5)
]

In [6]:
# Create DataFrame
input_df = spark.createDataFrame(data, schema=schema)

In [7]:
input_df.show()

+-------+------+----------+-------------+
|user_id|kit_id|login_date|session_count|
+-------+------+----------+-------------+
|      1|     2|2016-03-01|            5|
|      1|     2|2016-03-02|            6|
|      2|     3|2017-06-25|            1|
|      3|     1|2016-03-02|            0|
|      3|     4|2018-07-03|            5|
+-------+------+----------+-------------+



In [8]:
# Problem 1
# Write a solution to report the device that is first logged in for each player 

In [9]:
# Solution: Using SQL

# WITH ranked_cte AS
# 	(
# 		SELECT
# 			*,
# 			ROW_NUMBER() OVER (PARTITION BY  user_id ORDER BY login_date) AS rn 
# 		FROM
# 			court
# 	)
# SELECT
# 	user_id,
# 	kit_id
# FROM
# 	ranked_cte
# WHERE
# 	rn = 1;

In [10]:
windowSpec = Window.partitionBy("user_id").orderBy("login_date")

ranked_df = input_df.withColumn(
    "rnk", f.rank().over(windowSpec)
)

In [11]:
ranked_df.show()

+-------+------+----------+-------------+---+
|user_id|kit_id|login_date|session_count|rnk|
+-------+------+----------+-------------+---+
|      1|     2|2016-03-01|            5|  1|
|      1|     2|2016-03-02|            6|  2|
|      2|     3|2017-06-25|            1|  1|
|      3|     1|2016-03-02|            0|  1|
|      3|     4|2018-07-03|            5|  2|
+-------+------+----------+-------------+---+



In [12]:
result = ranked_df.select(
    f.col("user_id"),
    f.col("kit_id").alias("first_device_used")
).filter(f.col("rnk") == 1)

In [13]:
result.show()

+-------+-----------------+
|user_id|first_device_used|
+-------+-----------------+
|      1|                2|
|      2|                3|
|      3|                1|
+-------+-----------------+



In [14]:
# Problem 2:
# Write a solution to report for each player and date, how many games played so far by the player.
# That is, the total number of gamed played by the player until the date

In [15]:
# Solution: Using SQL

# select 
# 	user_id as player_id,
# 	login_date as event_date,
# 	SUM(sessions_count) OVER (partition by user_id order by login_date) as games_played_so_far
# from 
# 	court;

In [16]:
windowSpec = Window.partitionBy("user_id").orderBy("login_date")

ranked_df = input_df.withColumn(
    "games_played_so_far", f.sum(f.col("session_count")).over(windowSpec)
)

In [17]:
ranked_df.show()

+-------+------+----------+-------------+-------------------+
|user_id|kit_id|login_date|session_count|games_played_so_far|
+-------+------+----------+-------------+-------------------+
|      1|     2|2016-03-01|            5|                  5|
|      1|     2|2016-03-02|            6|                 11|
|      2|     3|2017-06-25|            1|                  1|
|      3|     1|2016-03-02|            0|                  0|
|      3|     4|2018-07-03|            5|                  5|
+-------+------+----------+-------------+-------------------+



In [18]:
result = ranked_df.select(f.col("user_id").alias("player_id"), f.col("login_date").alias("event_date"), f.col("games_played_so_far"))

In [19]:
result.show()

+---------+----------+-------------------+
|player_id|event_date|games_played_so_far|
+---------+----------+-------------------+
|        1|2016-03-01|                  5|
|        1|2016-03-02|                 11|
|        2|2017-06-25|                  1|
|        3|2016-03-02|                  0|
|        3|2018-07-03|                  5|
+---------+----------+-------------------+



In [20]:
# Problem 3
# Write a solution to report players that logged in again on the day after the day they first logged in. 
# In other words, you need to find the players IDs that logged in for at least two consecutive days starting from their first login date

In [21]:
# Solution: Using SQL

# WITH curr_prev_login AS
# 	(
# 		select
# 			*,
# 			LEAD(login_date) over (partition by user_id order by login_date) as next_date
# 		from
# 			court
# 		order by
# 			user_id,
# 			login_date
# 	)
# select
# 	user_id
# from
# 	curr_prev_login
# where
# 	next_date - login_date = 1

In [22]:
input_df.show()

+-------+------+----------+-------------+
|user_id|kit_id|login_date|session_count|
+-------+------+----------+-------------+
|      1|     2|2016-03-01|            5|
|      1|     2|2016-03-02|            6|
|      2|     3|2017-06-25|            1|
|      3|     1|2016-03-02|            0|
|      3|     4|2018-07-03|            5|
+-------+------+----------+-------------+



In [23]:
# Solution
windowSpec = Window.partitionBy("user_id").orderBy("login_date")

lead_df = input_df.withColumn(
    "next_date",
    f.lead("login_date").over(windowSpec)
).orderBy("user_id", "login_date")

lead_df.show()

+-------+------+----------+-------------+----------+
|user_id|kit_id|login_date|session_count| next_date|
+-------+------+----------+-------------+----------+
|      1|     2|2016-03-01|            5|2016-03-02|
|      1|     2|2016-03-02|            6|      NULL|
|      2|     3|2017-06-25|            1|      NULL|
|      3|     1|2016-03-02|            0|2018-07-03|
|      3|     4|2018-07-03|            5|      NULL|
+-------+------+----------+-------------+----------+



In [24]:
result = lead_df.select("user_id").filter(
    f.date_diff(f.col("next_date"), f.col("login_date")) == 1
)

In [25]:
result.show()

+-------+
|user_id|
+-------+
|      1|
+-------+

