In [0]:
from pyspark.sql import Row
from pyspark.sql.types import *
from datetime import datetime, date, timedelta

schema = StructType([
    StructField("offer_country_code", StringType(), True),
    StructField("identity_id", StringType(), True),
    StructField("device_platform", StringType(), True),
    StructField("session_id", StringType(), True),
    StructField("play_start_est_timestamp", TimestampType(), True),
    StructField("play_end_est_timestamp", TimestampType(), True),
    StructField("play_end_est_date", DateType(), True),
    StructField("total_play_time_ms", LongType(), True),
    StructField("item_length_ms", LongType(), True),
    StructField("is_test", BooleanType(), True),
    StructField("source", StringType(), True),
    StructField(
        "output_route_array",
        StructType([
            StructField("output_route_type", ArrayType(StringType()), True)
        ]),
        True
    )
])

rows = []
start_day = date(2024, 1, 1)
session = 1

identities = ["id_1", "id_2", "id_3"]
platforms = ["ios", "android", "web"]
countries = ["US", "CA"]

for d in range(40):
    day = start_day + timedelta(days=d)

    for country in countries:
        for identity in identities:
            for platform in platforms:
                for s in range(2):  # 2 sessions per day
                    minutes = (
                        15 if platform == "ios"
                        else 10 if platform == "android"
                        else 5
                    )

                    rows.append(
                        Row(
                            offer_country_code=country,
                            identity_id=identity,
                            device_platform=platform,
                            session_id=f"sess_{session}",
                            play_start_est_timestamp=datetime(
                                day.year, day.month, day.day, 9 + s, 0
                            ),
                            play_end_est_timestamp=datetime(
                                day.year, day.month, day.day, 9 + s, minutes
                            ),
                            play_end_est_date=day,
                            total_play_time_ms=minutes * 60000,
                            item_length_ms=3600000,
                            is_test=False,
                            source=None,
                            output_route_array={
                                "output_route_type": (
                                    ["airplay"] if platform == "ios"
                                    else ["bluetooth"] if platform == "android"
                                    else ["hdmi"]
                                )
                            },
                        )
                    )
                    session += 1

play_fact_df = spark.createDataFrame(rows, schema)


In [0]:

from pyspark.sql import functions as F

display(
    play_fact_df.filter(
        (F.col('identity_id') == "id_1") &
        (F.col('offer_country_code') == 'US')
    )
)


In [0]:
play_device_platform_identity_1day_df = (
    play_fact_df
    .filter(
        (F.col("total_play_time_ms") > 1999)
        & (~F.col("is_test"))
        & (F.col("source").isNull())
    )
    .withColumn("day_count", F.lit(1))
    .withColumn("day_bucket", F.lit("1 day"))
    .groupBy(
        F.col("offer_country_code"),
        F.col("identity_id"),
        F.col("device_platform"),
        F.col("day_count"),
        F.col("day_bucket"),
        F.to_date(
            F.coalesce(
                F.col("play_end_est_timestamp"),
                F.col("play_start_est_timestamp"),
            )
        ).alias("play_end_est_date"),
    )
    .agg(
        (
            F.sum(F.least(F.col("total_play_time_ms"), F.col("item_length_ms")))
            / 60000
        ).alias("play_duration_minutes"),
        F.countDistinct(F.col("session_id")).alias("session_count"),
        F.max(F.col("play_start_est_timestamp")).alias(
            "last_play_start_est_timestamp"
        ),
        F.sum("total_play_time_ms").alias("total_play_time_sum"),
        F.collect_list(
            F.col("output_route_array.output_route_type")
        ).alias("output_route_type_array_list"),
    )
    .withColumn("rank", F.row_number().over(window_spec))
    .select(
        F.col("offer_country_code").alias("country_code"),
        F.col("identity_id"),
        F.col("device_platform"),
        F.col("day_count").cast("long"),
        F.col("play_duration_minutes"),
        F.col("day_bucket"),
        F.col("play_end_est_date").alias("snapshot_est_date"),
        F.col("session_count").cast("long"),
        F.col("last_play_start_est_timestamp"),
        F.array_distinct(
            F.flatten(F.col("output_route_type_array_list"))
        ).alias("output_route_type_array"),
        F.col("rank").cast("long"),
    )
)


In [0]:

df_id1 = play_device_platform_identity_1day_df.filter(F.col('identity_id') == "id_1")
display(df_id1)

In [0]:
schema = StructType(
        [
            StructField("min_snapshot_est_date", DateType()),
            StructField("end_datetime", TimestampType()),
        ]
    )

    date_df = spark.createDataFrame(
        [(min_snapshot_est_date, end_datetime)], schema=schema
    )

    date_sequence_df = date_df.select(
        F.explode(
            F.sequence(
                F.to_date(F.col("min_snapshot_est_date")),
                F.to_date(F.col("end_datetime")),
                F.expr("interval 1 DAY"),
            )
        ).alias("calendar_date")
    )
    return date_sequence_df

In [0]:
two_days_back = df_id1.select(max(F.col('last_play_start_est_timestamp')))
print(two_days_back)

In [0]:
two_days_back = df_id1.select(F.max(F.col('last_play_start_est_timestamp'))).collect()[0][0]
display(two_days_back)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, TimestampType
from datetime import datetime, date

min_snapshot_est_date = date(2024, 2, 7)  # example start date
end_datetime = date(2024, 2, 9)           # example end date
spark = SparkSession.builder.getOrCreate()

schema = StructType([
    StructField("min_snapshot_est_date", DateType()),
    StructField("end_datetime", DateType())  # changed to DateType for simplicity
])

date_df = spark.createDataFrame(
        [(min_snapshot_est_date, end_datetime)], schema=schema
)
display(date_df)

In [0]:
date_sequence_df = date_df.select(
        F.explode(
            F.sequence(
                F.to_date(F.col("min_snapshot_est_date")),
                F.to_date(F.col("end_datetime")),
                F.expr("interval 1 DAY"),
            )
        ).alias("calendar_date")
)
display(date_sequence_df)

In [0]:
def get_7day_join_condition(
    snapshot_est_date_col: str, calendar_date_col: str
):
    """
    Conditions used for building 7 day bucket in aggregate table.
    Args:
        snapshot_est_date_col(str): snapshot_est_date column
        calendar_date_col(str): calendar date column
    Returns:
        Returns 7 day join condition
    """
    return F.col(snapshot_est_date_col).between(
        F.date_add(F.col(calendar_date_col), -6), F.col(calendar_date_col)
    )

In [0]:
display(df_id1.alias("p").join(date_sequence_df.alias("dt"), get_7day_join_condition("p.snapshot_est_date", "dt.calendar_date")))

In [0]:
window_spec_rank = Window.partitionBy(
        F.col("p.identity_id"), F.col("dt.calendar_date")
    ).orderBy(F.desc("play_duration_minutes"))
play_device_platform_identity_7day_df = (
        df_id1.alias("p")
        .join(
            date_sequence_df.alias("dt"),
            get_7day_join_condition("p.snapshot_est_date", "dt.calendar_date"),
        )
        .where(F.col("p.day_bucket") == "1 day")
        .withColumn("day_bucket", F.lit("7 day"))
        .groupBy(
            F.col("p.country_code"),
            F.col("p.identity_id"),
            F.col("p.device_platform"),
            F.col("day_bucket"),
            F.col("dt.calendar_date"),
            # F.col("dsrDetail"),
        )
        .agg(
            F.sum("p.day_count").alias("day_count"),
            F.sum("p.play_duration_minutes").alias("play_duration_minutes"),
            F.sum("session_count").alias("session_count"),
            F.max(F.col("last_play_start_est_timestamp")).alias(
                "last_play_start_est_timestamp"
            ),
            F.collect_list(F.col("output_route_type_array")).alias(
                "output_route_type_array_list"
            ),
        )
        .withColumn("rank", F.row_number().over(window_spec_rank))
        .select(
            F.col("p.country_code"),
            F.col("p.identity_id"),
            F.col("p.device_platform"),
            F.col("day_count").cast("long"),
            F.col("play_duration_minutes"),
            F.col("day_bucket"),
            F.col("dt.calendar_date").alias("snapshot_est_date"),
            F.col("session_count").cast("long"),
            F.col("last_play_start_est_timestamp"),
            F.array_distinct(F.flatten(F.col("output_route_type_array_list"))).alias(
                "output_route_type_array"
            ),
            F.col("rank").cast("long"),
            # F.col("dsrDetail"),
        )
    )

In [0]:
display(play_device_platform_identity_7day_df)