In [7]:
import pandas as pd
d = {"headers":{"Activity":["player_id","device_id","event_date","games_played"]},"rows":{"Activity":[[1,2,"2016-03-01",5],[1,2,"2016-03-02",6],[2,3,"2017-06-25",1],[3,1,"2016-03-01",0],[3,4,"2018-07-03",5]]}}
pd.DataFrame(d['rows']['Activity'], columns=d['headers']['Activity']).to_csv("./Activity.txt", index=None)

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("game play analysis v").config("pyspark.sql.shuffle.partition", "4").getOrCreate()

In [8]:

a = spark.read.option("header", True).csv("./Activity.txt")
a = a.withColumn("player_id", a.player_id.cast('int')).withColumn("device_id", a.device_id.cast('int')).withColumn("games_played", a.games_played.cast('int')).withColumn("event_date", a.event_date.cast('date'))
a.printSchema()
a.show()

[Stage 7:>                                                          (0 + 1) / 1]                                                                                

root
 |-- player_id: integer (nullable = true)
 |-- device_id: integer (nullable = true)
 |-- event_date: date (nullable = true)
 |-- games_played: integer (nullable = true)

+---------+---------+----------+------------+
|player_id|device_id|event_date|games_played|
+---------+---------+----------+------------+
|        1|        2|2016-03-01|           5|
|        1|        2|2016-03-02|           6|
|        2|        3|2017-06-25|           1|
|        3|        1|2016-03-01|           0|
|        3|        4|2018-07-03|           5|
+---------+---------+----------+------------+



The install date of a player is the first login day of that player.

We define day one retention of some date x to be the number of players whose install date is x and they logged back in on the day right after x, divided by the number of players whose install date is x, rounded to 2 decimal places.

Write an SQL query to report for each install date, the number of players that installed the game on that day, and the day one retention.

Return the result table in any order.

The query result format is in the following example.

In [31]:
import pyspark.sql.functions as F

def addAlias(df, n):
    for i in df.columns:
        df = df.withColumnRenamed(i, f'{i}_{n}')
    return df

a1 = addAlias(a,1)

first_login = a.groupBy('player_id')\
            .agg(F.min('event_date'))\
            .withColumnRenamed("min(event_date)", "first_login")

installs = first_login.join(a, "player_id", "leftouter")\
            .where(a.event_date==first_login.first_login)

retention = installs.join(a1, a1.player_id_1==F.col('player_id') , "leftouter")\
            .where(F.datediff(a1.event_date_1, F.col('event_date'))==1)\
            .groupBy("first_login").count()

installs.groupBy("first_login").count().withColumnRenamed("count","installs").join(retention, "first_login", "leftouter").withColumnRenamed("first_login","install dt").select("install dt", "installs", (F.col("count")/F.col('installs')).alias("Day1_retention")).na.fill(0).show()


+----------+--------+--------------+
|install dt|installs|Day1_retention|
+----------+--------+--------------+
|2016-03-01|       2|           0.5|
|2017-06-25|       1|           0.0|
+----------+--------+--------------+

