# Problems on badminton court queries

In [0]:
spark

In [0]:
from pyspark.sql import SparkSession

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank, col, sum as f_sum, lead, min as f_min, datediff, to_date, row_number, collect_list
from pyspark.sql.types import IntegerType, BooleanType, DateType

spark = SparkSession.builder.appName('thebigdatashow.me').getOrCreate()

badminton_court_data = [
(1, 2, "2016-03-01", 5),
(1, 2, "2016-03-02", 6),
(2, 3, "2017-06-25", 1),
(3, 1, "2016-03-02", 2),
(3, 4, "2016-03-02", 3),
(3, 2, "2018-07-03", 5)
]
print(badminton_court_data)

coloumns = ["user_id", "kit_id", "login_date", "sessions_count"]

court_input_df = spark.createDataFrame(badminton_court_data, coloumns)

court_input_df.show()
court_input_df.printSchema()

In [0]:
# renamed_court_input_df = court_input_df.withColumn("login_date", to_date(col('login_date'),'yyyy-MM-dd').cast(DateType()))
renamed_court_input_df = court_input_df.withColumn("login_date", col('login_date').cast(DateType()))
renamed_court_input_df.show()

In [0]:
renamed_court_input_df.printSchema()

In [0]:

first_day = renamed_court_input_df.groupBy('user_id').agg(f_min('login_date'))
first_day.show()

In [0]:
first_day = renamed_court_input_df.groupBy('user_id').agg(f_min('login_date').alias('first_login'))
first_day.show()

In [0]:
renamed_court_input_df.createOrReplaceTempView("renamed_court_input_df")

In [0]:
query_to_find_first_day = """
SELECT user_id, MIN(login_date) as first_login FROM renamed_court_input_df GROUP BY user_id
"""
first_day_sql = spark.sql(query_to_find_first_day)
first_day_sql.show()

In [0]:
# Using wiindow function - pyspark
window_spec = Window.partitionBy('user_id').orderBy('login_date')

renamed_court_df = court_input_df.withColumn('first_login',row_number().over(window_spec))
renamed_court_input_df1 = renamed_court_df.select('user_id','login_date').filter(renamed_court_df.first_login == 1)
renamed_court_input_df1.show()


In [0]:
# what first kit a user used on their first day - pyspark
window_spec = Window.partitionBy('user_id').orderBy('login_date')

renamed_court_df = court_input_df.withColumn('first_login',rank().over(window_spec))
renamed_court_input_df1 = renamed_court_df.select('user_id','kit_id').filter(renamed_court_df.first_login == 1)
renamed_court_input_df1.show()

In [0]:
# what first kit a user used on their first day - sql
# window_spec = Window.partitionBy('user_id').orderBy('login_date')

# renamed_court_df = court_input_df.withColumn('first_login',rank().over(window_spec))
# renamed_court_input_df1 = renamed_court_df.select('user_id','kit_id').filter(renamed_court_df.first_login == 1)

court_input_df.createOrReplaceTempView("court_input_df")

query_for_first_day_kit = """
                        with first_login as(
                        select *, rank()  over(partition by user_id order by login_date) as rnk
                        from court_input_df
                        )

                        select user_id, kit_id from first_login where rnk = 1
"""

spark.sql(query_for_first_day_kit).show()


In [0]:
window_spec = Window.partitionBy('user_id').orderBy('login_date')

first_kit_df = court_input_df.withColumn('first_login',rank().over(window_spec))
first_kit_df = first_kit_df.select('user_id','kit_id').filter(col('first_login')==1)
first_kit_df.show()

In [0]:
# Created array for each user
# collect_list is used for collecting as a list, whereas collect_set is used to collect as set.
flatend_first_kit_df = first_kit_df.groupBy('user_id').agg(collect_list(col('kit_id')).alias('kit_id'))

flatend_first_kit_df.show()

In [0]:
# Kit ids using SQL

first_kit_df.createOrReplaceTempView('first_kit_df')

In [0]:
query_for_kit_list = """
                        select user_id, collect_list(kit_id) as kit_id
                        from first_kit_df
                        group by user_id
"""

spark.sql(query_for_kit_list).show()

In [0]:
# cummalative sum for games_played

window_spec = Window.partitionBy('user_id').orderBy('login_date')

cum_sum_court_input_df = court_input_df.withColumn('total_played',f_sum(col('sessions_count')).over(window_spec))

cum_sum_court_input_df.show()


In [0]:
# Get total played day by day for each user 
court_input_df.createOrReplaceTempView("court_input_df")

query_for_cum_sum = """
                    select *, sum(sessions_count) over(partition by user_id order by login_date)
                    as total_played
                    from court_input_df
"""
spark.sql(query_for_cum_sum).show()