In [66]:
from pubg_util import mysql, load_schema, notifier
from sphynx import sphynx, NODE_SMALL, NODE_MEDIUM, NODE_LARGE
from pyspark.sql.functions import *
import pandas as pd

spark = sphynx.get_spark(executor_count=8, app_name='pdu_syp', node_spec=NODE_LARGE)

Spark cluster not assigned. creating a new one...
Node spec: 8 executors with 48G RAM each
Job Port 4049 is assigned for requested cluster
Waiting for Spark master to be available...
Spark master launched!
Creating new Spark session, name: pdu_syp...
Waiting for all executors ready...
All executors connected!
Complete! elapsed time: 00:00:25


In [150]:
sphynx.stop()

Stopping Spark session...
Destroying Spark cluster...
Done!


In [53]:
all_economy_daily = mysql.read_table(spark, "pubg_gi", "all_economy_gcoin_used_by_product_daily")

In [54]:
all_economy_daily.createOrReplaceTempView("all_economy_daily")

In [55]:
item_meta_sales = mysql.read_table(spark, "metainfo", "item_meta_sales")
item_meta_sales.createOrReplaceTempView("item_meta_sales")

In [25]:
q = """
select
	date,
	sum(au) as au
from
	(
	select
		date,
		device,
		platform,
		business_region,
		pubg_region,
		pu_seg_type,
		pu_type,
		user_type,
		au
	from
		all_economy_daily
	where
		date >= '2021-07-26'
		and date <= '2021-08-15'
		and device = 'PC'
	group by
		1,
		2,
		3,
		4,
		5,
		6,
		7,
		8,
		9
	order by
		1) t
group by
	1
order by
	1
"""
daily_au = spark.sql(q)

In [26]:
daily_au_df = daily_au.toPandas()

In [20]:
q = """
select
    date,
    sum(pu) as pu,
    (sum(free_gcoin_used) + sum(paid_gcoin_used))/100 as revenue
from all_economy_daily
where date >= "2021-07-26"
    and date <= "2021-08-15"
    and device = "PC"
group by 1
order by 1
"""
daily_summary = spark.sql(q)
daily_summary_df = daily_summary.toPandas()

In [27]:
daily = pd.merge(daily_au_df, daily_summary_df, on="date")

In [28]:
daily["pur"] = daily["pu"]/daily["au"]
daily["arpu"] = daily["revenue"]/daily["au"]
daily["arppu"] = daily["revenue"]/daily["pu"]

In [31]:
daily.to_csv("./data/daily.csv", index=False)

In [156]:
daily = pd.read_csv("./data/daily.csv")

In [32]:
daily

Unnamed: 0,date,au,pu,revenue,pur,arpu,arppu
0,2021-07-26,1542625,73187,1244850.8,0.047443,0.806969,17.009179
1,2021-07-27,1519156,66310,1119295.0,0.043649,0.736787,16.879732
2,2021-07-28,1574783,435823,4223527.0,0.276751,2.681974,9.690923
3,2021-07-29,1551455,199913,2158629.2,0.128855,1.391358,10.797843
4,2021-07-30,1578927,154796,1772224.0,0.098039,1.122423,11.448771
5,2021-07-31,1657376,151180,1839270.0,0.091216,1.109748,12.166093
6,2021-08-01,1679359,134404,1672603.6,0.080033,0.995977,12.444597
7,2021-08-02,1520773,101832,1285075.2,0.066961,0.845014,12.619562
8,2021-08-03,1498145,90475,1102918.0,0.060391,0.736189,12.190307
9,2021-08-04,1207607,210611,7428346.6,0.174404,6.151295,35.270459


In [56]:
q = """
select
	a.*,
	b.product_name,
	b.event_name
from
	(
	select
		date,
		product_id,
		sum(unit_sold) as unit_sold,
		ROW_NUMBER() over (partition by date
	order by
		sum(unit_sold) desc) as row_num
	from
		all_economy_daily
	where
		date >= "2021-07-26"
		and date <= "2021-08-15"
		and device = "PC"
	group by
		1,
		2) a
join (
	select
		product_id,
		product_name,
		event_name
	from
		item_meta_sales
	group by
		1,
		2,
		3) b on
	a.product_id = b.product_id
order by
	date,
	row_num asc
"""
df = spark.sql(q)

In [57]:
df = df.toPandas()

In [58]:
df.to_csv("./data/daily_product_rank.csv", index=False)

In [59]:
df

Unnamed: 0,date,product_id,unit_sold,row_num,product_name,event_name
0,2021-07-26,itemdesc.14100007,32903,2,CATACLYSMIC MAGMA - CONTRABAND CRATE,202107_wsus_apocalypse
1,2021-07-26,battlepasslevelup.202107,21649,3,TAEGO Level UP,202107_taego
2,2021-07-26,itemdesc.14100006,11831,4,GEAR HEAD I - CONTRABAND CRATE,202106_wsus_progressive_apocalypse
3,2021-07-26,itemdesc.13000312,2922,5,VICTORY DANCE - 2 PHUT HON (KAIZ REMIX),202107_phao_emote
4,2021-07-26,itemdesc.13000302,1845,6,SURVIVOR PASS: TAEGO WEAPON PACK,202107_taego
5,2021-07-26,itemdesc.12010445,1189,7,Son Heung-min - M416,202107_son
6,2021-07-26,itemdesc.13000311,1178,8,SON HEUNG-MIN'S WEAPON BUNDLE,202107_son
7,2021-07-26,itemdesc.45000003,1125,9,Nickname Change,202111_pgc
8,2021-07-26,itemdesc.45000003,1125,9,Nickname Change,undefined
9,2021-07-26,itemdesc.45000003,1125,9,Nickname Change,pcs4_reward


In [44]:
df[df["row_num"] <= 10]

Unnamed: 0,date,product_id,unit_sold,row_num,product_name
0,2021-07-26,itemdesc.14100007,32903,2,CATACLYSMIC MAGMA - CONTRABAND CRATE
1,2021-07-26,battlepasslevelup.202107,21649,3,TAEGO Level UP
2,2021-07-26,itemdesc.14100006,11831,4,GEAR HEAD I - CONTRABAND CRATE
3,2021-07-26,itemdesc.13000312,2922,5,VICTORY DANCE - 2 PHUT HON (KAIZ REMIX)
4,2021-07-26,itemdesc.13000302,1845,6,SURVIVOR PASS: TAEGO WEAPON PACK
5,2021-07-26,itemdesc.12010445,1189,7,Son Heung-min - M416
6,2021-07-26,itemdesc.13000311,1178,8,SON HEUNG-MIN'S WEAPON BUNDLE
7,2021-07-26,itemdesc.45000003,1125,9,Nickname Change
8,2021-07-26,itemdesc.12010419,813,10,MOZZ's M24
257,2021-07-27,itemdesc.14100007,29844,2,CATACLYSMIC MAGMA - CONTRABAND CRATE


## HEIMDAL LOG

In [None]:
store_explore_items

In [68]:
log = load_schema.heimdall(spark, 'prod', 'pc-prod', start_date="2021-11-10", end_date="2021-11-13")

In [72]:
item_log = log.where(col("category") == "store_explore_items")

In [73]:
item_click_log = item_log.where((col("category") == "store_explore_items") & (col("action") == "click_store_detail"))

In [76]:
item_click_log.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- category: string (nullable = true)
 |-- action: string (nullable = true)
 |-- label: string (nullable = true)
 |-- value: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tracking_id: string (nullable = true)
 |-- dt: string (nullable = true)
 |-- hr: string (nullable = true)



In [78]:
item_click_log.where(col("dt") == "2021-11-10").select("label").distinct().show()

+------------------+
|             label|
+------------------+
| salesitemdesc.427|
|salesitemdesc.1429|
|salesitemdesc.1647|
| salesitemdesc.583|
|salesitemdesc.1216|
|salesitemdesc.1627|
|salesitemdesc.1234|
|salesitemdesc.1719|
|salesitemdesc.1638|
|salesitemdesc.1448|
|salesitemdesc.1357|
|salesitemdesc.1746|
|salesitemdesc.1632|
|salesitemdesc.1753|
|salesitemdesc.1695|
|salesitemdesc.1768|
|salesitemdesc.1726|
|salesitemdesc.1346|
|salesitemdesc.1712|
|salesitemdesc.1463|
+------------------+
only showing top 20 rows



In [86]:
item_click_log = item_click_log.withColumn("sales_id", split("label", "\.")[1])

In [79]:
item_meta_sales = mysql.read_table(spark, "metainfo", "item_meta_sales")

In [92]:
item_click_log = item_click_log.join(item_meta_sales, "sales_id")
item_click_log.where(col("dt") == "2021-11-10").groupBy("product_name").agg(count("*").alias("click_cnt")).orderBy("click_cnt", ascending=False).count()

341

In [95]:
click_log_pd = item_click_log.groupBy("dt", "product_id", "product_name").agg(count("*").alias("click_cnt")).toPandas()

In [97]:
click_log_pd.to_csv("./data/click_log_pd.csv", index=False)

In [100]:
item_purchase_log = log.where((col("category") == "store_purchase_items") & (col("action") == "click_price_detail"))

In [101]:
item_purchase_log = item_purchase_log.withColumn("sales_id", split("label", "\.")[1])

In [102]:
item_purchase_log = item_purchase_log.join(item_meta_sales, "sales_id")

In [103]:
purchase_log_pd = item_purchase_log.groupBy("dt", "product_id", "product_name").agg(count("*").alias("purchase_cnt")).toPandas()

In [104]:
purchase_log_pd.to_csv("./data/purchase_log_pd.csv", index=False)

In [107]:
purchase_log_pd.columns

Index(['dt', 'product_id', 'product_name', 'purchase_cnt'], dtype='object')

In [111]:
item_tmp = pd.merge(purchase_log_pd, click_log_pd, on=["dt", "product_id", "product_name"])

In [113]:
item_tmp.head()

Unnamed: 0,dt,product_id,product_name,purchase_cnt,click_cnt
0,2021-11-13,itemdesc.18100004,Left,46,569
1,2021-11-12,itemdesc.12032065,Vivid Pages Backpack (Level 1),1,204
2,2021-11-10,itemdesc.11020273,Purrple Periwinkle Pants,92,13692
3,2021-11-10,itemdesc.13000230,GRAFFITI BACKPACK BUNDLE,39,944
4,2021-11-12,itemdesc.11030194,Pathfinder's Shoes,2,269


In [114]:
item_tmp["ctr"] = item_tmp["purchase_cnt"]/item_tmp["click_cnt"]

상품상세에서 구매버튼 클릭 , 상품상세 클릭 , 상품상세에서 구매버튼 클릭 / 상품상세 클릭

In [121]:
item_tmp.groupby("dt").mean()

Unnamed: 0_level_0,purchase_cnt,click_cnt,ctr
dt,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2021-11-10,373.002985,6238.98806,0.025378
2021-11-11,317.430678,5558.262537,0.029679
2021-11-12,524.169591,10167.122807,0.030599
2021-11-13,407.5,7049.032164,0.033092


In [122]:
bundle = ["itemdesc.13000386", "itemdesc.13000318"]

bundle_tmp = item_tmp[item_tmp.product_id.isin(bundle)]
not_bundle_tmp = item_tmp[~item_tmp.product_id.isin(bundle)]

In [123]:
bundle_tmp.groupby("dt").mean()

Unnamed: 0_level_0,purchase_cnt,click_cnt,ctr
dt,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2021-11-10,701.0,16734.0,0.041891
2021-11-11,886.0,23692.0,0.037397
2021-11-12,17621.5,241596.0,0.051738
2021-11-13,6380.0,97880.0,0.051928


In [124]:
not_bundle_tmp.groupby("dt").mean()

Unnamed: 0_level_0,purchase_cnt,click_cnt,ctr
dt,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2021-11-10,372.020958,6207.565868,0.025329
2021-11-11,315.748521,5504.612426,0.029656
2021-11-12,423.597059,8805.776471,0.030475
2021-11-13,372.367647,6514.732353,0.032981


In [118]:
item_tmp.to_csv("./data/item_tmp.csv", index=False)

2021-08-02 ~ 2021-08-05 for blackpink bundle

In [144]:
aug_log = load_schema.heimdall(spark, 'prod', 'pc-prod', start_date="2021-08-02", end_date="2021-08-10")

In [145]:
aug_item_click_log = aug_log.where((col("category") == "store_explore_items") & (col("action") == "click_store_detail"))

In [146]:
aug_item_click_log = aug_item_click_log.withColumn("sales_id", split("label", "\.")[1])

In [147]:
aug_item_click_log = aug_item_click_log.join(item_meta_sales, "sales_id")

In [148]:
aug_click_log_pd = aug_item_click_log.groupBy("dt", "product_id", "product_name").agg(count("*").alias("click_cnt")).toPandas()

In [149]:
aug_click_log_pd.to_csv("./data/aug_click_log_pd.csv", index=False)

In [130]:
aug_item_purchase_log = aug_log.where((col("category") == "store_purchase_items") & (col("action") == "click_price_detail"))

In [131]:
aug_item_purchase_log = aug_item_purchase_log.withColumn("sales_id", split("label", "\.")[1])

In [132]:
aug_item_purchase_log = aug_item_purchase_log.join(item_meta_sales, "sales_id")

In [133]:
aug_purchase_log_pd = aug_item_purchase_log.groupBy("dt", "product_id", "product_name").agg(count("*").alias("purchase_cnt")).toPandas()

In [135]:
aug_purchase_log_pd.to_csv("./data/aug_purchase_log_pd.csv", index=False)

In [136]:
ctr = pd.merge(aug_purchase_log_pd, aug_click_log_pd, on=["dt", "product_id", "product_name"])

In [137]:
ctr["ctr"] = ctr["purchase_cnt"]/ctr["click_cnt"]

In [139]:
aug_bundle_tmp = ctr[ctr.product_id.isin(bundle)]
aug_not_bundle_tmp = ctr[~ctr.product_id.isin(bundle)]

In [140]:
aug_bundle_tmp.groupby("dt").mean()

Unnamed: 0_level_0,purchase_cnt,click_cnt,ctr
dt,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2021-08-04,39011,616936,0.063233
2021-08-05,16795,270066,0.062189


In [141]:
aug_not_bundle_tmp.groupby("dt").mean()

Unnamed: 0_level_0,purchase_cnt,click_cnt,ctr
dt,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2021-08-02,426.989209,5295.94964,0.030677
2021-08-03,411.34058,5112.898551,0.029611
2021-08-04,880.007042,21923.577465,0.029035
2021-08-05,657.064286,14151.878571,0.030503


In [143]:
ctr[ctr['product_id'] == "itemdesc.13000318"]

Unnamed: 0,dt,product_id,product_name,purchase_cnt,click_cnt,ctr
187,2021-08-05,itemdesc.13000318,BLACKPINK MEGA BUNDLE,16795,270066,0.062189
369,2021-08-04,itemdesc.13000318,BLACKPINK MEGA BUNDLE,39011,616936,0.063233


## PU, PUR, ARPPU 등

In [152]:
from pubg_util import mysql, load_schema, notifier
from sphynx import sphynx, NODE_SMALL, NODE_MEDIUM, NODE_LARGE
from pyspark.sql.functions import *
import pandas as pd

spark = sphynx.get_spark(executor_count=8, app_name='pdu_syp', node_spec=NODE_LARGE)

Spark cluster not assigned. creating a new one...
Node spec: 8 executors with 48G RAM each
Job Port 4049 is assigned for requested cluster
Waiting for Spark master to be available...
Spark master launched!
Creating new Spark session, name: pdu_syp...
Waiting for all executors ready...
All executors connected!
Complete! elapsed time: 00:00:26


In [153]:
from datetime import datetime, timedelta

def load_data_mart(device, start_date, end_date, table_name="cash_mtx", join_with_meta=True):
    """
        load economy data_mart with its corresponding meta
    """
    from datetime import datetime, timedelta
    from pyspark.sql import Window
    from pyspark.sql import functions as F
    from pyspark.sql.types import StringType
    from pubg_util import mysql
    import boto3
    from pubg_util.loader.utils import validate_s3_path
    client = boto3.client('s3')
    bucket = {
        "cash_mtx" : "s3a://pubg-log-labs/data_mart/economy_v2/cash_mtx/{device}/{target_date}",
        "pu_master" : "s3a://pubg-log-labs/data_mart/economy_v2/pu_master/{device}/{target_date}",
        "gcoin_use" : "s3a://pubg-log-labs/data_mart/economy_v3/gcoin_use/{device}/{target_date}",
        "gcoin_topup" : "s3a://pubg-log-labs/data_mart/economy_v3/gcoin_topup/{device}/{target_date}",
        "gcoin_master" : "s3a://pubg-log-labs/data_mart/economy_v3/gcoin_master/{device}/{target_date}",
        "user_master" : "s3a://pubg-log-labs/data_mart/user_master_ver2/{target_date}/{device}"
    }
    meta_name = {
        "cash_mtx" : "meta_cash_mtx",
        "gcoin_use" : "meta_vc_sales_items"
    }
    start_date = datetime.strptime(start_date, '%Y-%m-%d')
    end_date = datetime.strptime(end_date, '%Y-%m-%d')
    date_list = [(start_date + timedelta(d)).strftime("%Y-%m-%d") for d in range((end_date-start_date).days+1)]
    device_str = device.upper() if table_name=='user_master' else device
    path_list = [bucket[table_name].format(device=device_str, target_date=target_date) for target_date in date_list]
    # load_only valid paths
    valid_path_list = list(filter(lambda path: validate_s3_path(client, path), path_list))
    if len(valid_path_list) == 0:
        message = "no {} data in given period {}-{}".format(table_name, start_date, end_date)
        print(message)
        return None
    if device == 'pc' and table_name == 'cash_mtx':
        df_raw = spark.read.option("mergeSchema", "true").parquet(*valid_path_list)
    elif table_name == 'gcoin_use':
        df_raw = spark.read.option("mergeSchema", "true").parquet(*valid_path_list)
        df_raw_no_salesid = df_raw.where('date >= "2021-07-28" and sales_id is null').toPandas()
        if not df_raw_no_salesid.empty:
            message = 'GCOIN USE data exist: sales_id is null\n{}'.format(df_raw_no_salesid.to_string(index=False))
            print(message)
    else:
        df_raw = spark.read.parquet(*valid_path_list)
 
    missing_dates = list(set(path_list) - set(valid_path_list))
    if len(missing_dates) > 0:
        message = "no data exists in\n" + "\n".join(missing_dates)
        print(message)
    if join_with_meta is False or table_name not in ('cash_mtx', 'gcoin_use'):
        # if it's not cash_mtx or gcoin_use, return raw_df as there are no meta to join with
        return df_raw
    else:
        # load meta_cash_mtx
        meta_raw = (
            mysql.read_table(spark, 'metainfo', meta_name[table_name])
            .withColumnRenamed("platform", "platform_")
            .withColumnRenamed("product_id", "product_id_")
            .withColumnRenamed("price", "price_")
          )
        if table_name == 'gcoin_use':
            meta_raw = meta_raw.where("currency = 'gcoin'")
            df_raw = df_raw \
                .withColumn('is_salesid_exist',
                            F.when(F.col('sales_id').isNull(), F.lit(0)).otherwise(F.lit(1))) \
                .withColumn('sales_id_',
                            F.when(F.col('sales_id').startswith('salesitemdesc'),
                            F.split('sales_id', 'desc.')[1]).otherwise(F.col('sales_id'))) \
                .drop('sales_id')
        if device == 'console' and table_name == 'cash_mtx':
            # join condition
            condition = [
                df_raw.platform == meta_raw.platform_,
                df_raw.product_id == meta_raw.product_id_
            ]
            # join with meta
            # is_paid is hard coded for products that were provided to Stadia Pro users for free during promo period
            df = (
                df_raw
                .join(meta_raw, condition, 'left')
                .withColumn("transaction_id", lit(None).cast(StringType()))
                .withColumn("status", F.lit('Succeeded').cast(StringType()))
                .withColumnRenamed("time", "time_")
                .withColumn("time", F.coalesce(F.col("time_"), F.concat(F.col("date"), F.lit("T00:00:00"))))
                .drop('platform_', 'product_id_', 'time_')
                .withColumn('is_paid',
                    ~(
                        (F.col('product_id').isin(['PUBGPIONEREDITION', 'PUBGBDLDSLEISKIN']))
                    )
                ).where('is_paid')
            )
        else:
            meta_duplicate_count = (
                meta_raw
                .groupBy('product_id_', 'platform_')
                .agg(F.count(F.lit(1)).alias('duplications'))
            )
            window_spec = (
                Window
                .partitionBy('platform_', 'product_id_')
                .orderBy('start_time')
            )
            meta = (
                meta_raw.alias("meta_raw")
                .join(
                    meta_duplicate_count.alias("meta_duplicate_count"),
                    on=['product_id_', 'platform_'],
                    how='left')
                .select("meta_raw.*", "meta_duplicate_count.duplications")
                .withColumn(
                    "next_start_time",
                    F.coalesce(F.lead("start_time").over(window_spec), F.lit('2038-01-19 00:00:00'))
                )
            )
            if table_name == 'gcoin_use':
                condition = (
                (
                    (df_raw.is_salesid_exist == 1)
                    & (
                        (df_raw.platform == meta.platform_)
                        & (df_raw.sales_id_ == meta.sales_id)
                    )
                )
                | ( (df_raw.is_salesid_exist == 0)
                    & (df_raw.platform == meta.platform_)
                    & (df_raw.product_id == meta.product_id_)
                    & (
                        (
                            (df_raw.time >= meta.start_time)
                            & (df_raw.time < meta.next_start_time)
                            & (meta.duplications >= 2)
                        )
                            | (meta.duplications == 1)
                    )
                ))
                # join with meta
                df = (
                    df_raw
                    .join(meta, condition, 'left')
                    .drop('platform_', 'product_id_', 'next_start_time', "duplications")
                )
            else:
                condition = (
                    (df_raw.platform == meta.platform_)
                    & (df_raw.product_id == meta.product_id_)
                    & (
                        (
                            (df_raw.time >= meta.start_time)
                            & (df_raw.time < meta.next_start_time)
                            & (meta.duplications >= 2)
                        )
                            | (meta.duplications == 1)
                    )
                )
                # join with meta
                df = (
                    df_raw.drop('sales_id')
                    .join(meta, condition, 'left')
                    .drop('platform_', 'product_id_', 'next_start_time', "duplications")
                )
        if table_name == 'cash_mtx':
            df = df.withColumn("ingame_revenue", F.col("unit_sold") * F.col("ingame_price"))
 
        return df

In [154]:
start_date = "2021-07-28"
end_date = "2021-08-10"
device="pc"

In [155]:
revenue = load_data_mart(start_date=start_date, end_date=end_date, device=device, table_name="gcoin_use")

In [161]:
user = load_data_mart(start_date=start_date, end_date=end_date, device=device, table_name="user_master").where(col("lastlogindate")>=start_date)

In [163]:
au = user.groupBy("date").agg(countDistinct("accountid").alias("au"))
au = au.orderBy("date").toPandas()

In [None]:
au

In [173]:
bundle_buyer = revenue.where(col("product_id") == "itemdesc.13000318").select("account_id").distinct().toPandas()

In [176]:
buyer_list = bundle_buyer["account_id"].tolist()

In [183]:
len(buyer_list)

53493

In [178]:
before = revenue.where((col("date") <= "2021-08-03") & (col("account_id").isin(buyer_list)))

In [189]:
before_buyer = before.select("account_id").distinct().toPandas()["account_id"].tolist()

In [190]:
after_small = revenue.where((col("date") >= "2021-08-04") & (col("account_id").isin(before_buyer)))
after_small_tmp = after_small.groupBy("date", "account_id").agg(((sum(col("free_use")) + sum(col("paid_use")))/100).alias("spend")).toPandas()

In [191]:
after_small_tmp.groupby("date")["spend"].mean().reset_index()

Unnamed: 0,date,spend
0,2021-08-04,105.639785
1,2021-08-05,80.11308
2,2021-08-06,68.119032
3,2021-08-07,60.214369
4,2021-08-08,53.931137
5,2021-08-09,51.58828
6,2021-08-10,54.104775


In [184]:
after = revenue.where((col("date") >= "2021-08-04") & (col("account_id").isin(buyer_list)))

In [179]:
before_tmp = before.groupBy("date", "account_id").agg(((sum(col("free_use")) + sum(col("paid_use")))/100).alias("spend")).toPandas()

In [185]:
after_tmp = after.groupBy("date", "account_id").agg(((sum(col("free_use")) + sum(col("paid_use")))/100).alias("spend")).toPandas()

In [182]:
before_tmp.groupby("date")["spend"].mean().reset_index()

Unnamed: 0,date,spend
0,2021-07-28,24.249319
1,2021-07-29,25.988662
2,2021-07-30,28.537927
3,2021-07-31,32.762055
4,2021-08-01,34.071518
5,2021-08-02,32.466821
6,2021-08-03,33.272715


In [186]:
after_tmp.groupby("date")["spend"].mean().reset_index()

Unnamed: 0,date,spend
0,2021-08-04,105.483416
1,2021-08-05,84.412992
2,2021-08-06,74.286418
3,2021-08-07,69.978348
4,2021-08-08,63.354124
5,2021-08-09,62.072433
6,2021-08-10,65.497655


In [157]:
daily.head()

Unnamed: 0,date,au,pu,revenue,pur,arpu,arppu
0,2021-07-26,1542625,73187,1244850.8,0.047443,0.806969,17.009179
1,2021-07-27,1519156,66310,1119295.0,0.043649,0.736787,16.879732
2,2021-07-28,1574783,435823,4223527.0,0.276751,2.681974,9.690923
3,2021-07-29,1551455,199913,2158629.2,0.128855,1.391358,10.797843
4,2021-07-30,1578927,154796,1772224.0,0.098039,1.122423,11.448771


In [159]:
daily_revenue = revenue.groupBy("date").agg(countDistinct("account_id").alias("pu"), ((sum("free_use") + sum("paid_use"))/100).alias("revenue"))

In [164]:
daily_revenue = daily_revenue.orderBy("date").toPandas()

In [165]:
au

Unnamed: 0,date,au
0,2021-07-28,1581360
1,2021-07-29,2199607
2,2021-07-30,2667545
3,2021-07-31,3113196
4,2021-08-01,3479033
5,2021-08-02,3732852
6,2021-08-03,3953418
7,2021-08-04,4120117
8,2021-08-05,4304933
9,2021-08-06,4488953


In [166]:
daily_revenue

Unnamed: 0,date,pu,revenue
0,2021-07-28,154813,2111763.5
1,2021-07-29,75863,1079340.6
2,2021-07-30,59157,886112.0
3,2021-07-31,57430,919635.0
4,2021-08-01,51395,836303.8
5,2021-08-02,39158,642537.6
6,2021-08-03,34987,551464.0
7,2021-08-04,73488,3714174.3
8,2021-08-05,65742,2353433.9
9,2021-08-06,53868,1654689.8
