In [0]:
from pyspark.sql.types import *

events_data = [
    ("E1", "U1", "2024-04-01 09:15:00", "login", 1.0,
     '{"platform":"android","app_version":"1.0.0","country":"USA"}'),

    ("E2", "U1", "2024-04-01 09:30:00", "purchase", 500.0,
     '{"platform":"android","app_version":"1.0.0","country":"USA"}'),

    ("E3", "U1", "2024-04-01 09:45:00", "logout", 1.0,
     '{"platform":"android","app_version":"1.0.0","country":"USA"}'),

    ("E4", "U2", "2024-04-02 10:00:00", "login", 1.0,
     '{"platform":"ios","app_version":"2.1.0","country":"India"}'),

    ("E5", "U2", "2024-04-02 10:20:00", "purchase", 1200.0,
     '{"platform":"ios","app_version":"2.1.0","country":"India"}'),

    ("E6", "U2", "2024-04-02 10:25:00", "purchase", -50.0,
     '{"platform":"ios","app_version":"2.1.0","country":"India"}'),

    ("E7", "U3", "2024-04-03 11:00:00", "login", 1.0,
     '{"platform":"web","app_version":"3.0.0","country":"UK"}'),

    # User not present in users table
    ("E8", "U4", "2024-04-04 12:00:00", "purchase", 300.0,
     '{"platform":"android","app_version":"1.2.0","country":"USA"}')
]

events_schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("user_id", StringType(), False),
    StructField("event_time", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("event_value", DoubleType(), True),
    StructField("event_props", StringType(), True)
])

events_df = spark.createDataFrame(events_data, events_schema)
events_df.show(truncate=False)
events_df.printSchema()

users_data = [
    ("U1", "Alice", "2024-01-15", "premium"),
    ("U2", "Bob", "2024-02-01", "free"),
    ("U3", "Charlie", "2024-02-20", "premium")
]

users_schema = StructType([
    StructField("user_id", StringType(), False),
    StructField("user_name", StringType(), True),
    StructField("signup_date", StringType(), True),
    StructField("user_tier", StringType(), True)
])

users_df = spark.createDataFrame(users_data, users_schema)
users_df.show(truncate=False)
users_df.printSchema()


#TAsk1
from pyspark.sql.functions import *

event_df1 = events_df.withColumn("event_time", to_timestamp("event_time", "yyyy-MM-dd HH:mm:ss"))
user_df1 = users_df.withColumn("signup_date", to_date("signup_date", "yyyy-MM-dd"))

event_df1= event_df1 \
    .withColumn("platform",json_tuple(col("event_props"), "platform")) \
    .withColumn("app_version", json_tuple(col("event_props"), "app_version")) \
    .withColumn("country", json_tuple(col("event_props"), "country"))
event_df1 = event_df1.drop("event_props")
event_df1.show(truncate=False)

#Task2
event_df2 = event_df1.filter(
    (col("event_type").isin("login", "purchase")) &
    (col("event_value") > 0)
)


event_df2 = event_df2.dropDuplicates(["event_id"])
event_df2.show(truncate=False)

user_event = event_df2.join(user_df1,on="user_id",how="left")
user_event.show(truncate=False)


user_metr = (
    user_event
    .groupBy("user_id")
    .agg(
        sum(when(col("event_type") == "purchase", col("event_value")).otherwise(0)).alias("total_purchase_value"),
        count("event_id").alias("total_events"),
        avg("event_value").alias("avg_event_value")
    )
)

user_metr.show(truncate=False)
#Task5
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("user_id").orderBy(col("event_time").desc())

user_event_ranked = user_event.withColumn(
    "row_num",
    row_number().over(windowSpec)
)

user_event_flagged = user_event_ranked.withColumn(
    "is_latest_event",
    when(col("row_num") == 1, True).otherwise(False)
)

user_event_flagged.show(truncate=False)

user_event_flagged_2 = user_event_flagged.filter(col("is_latest_event") == "true")
column_needed = ['user_id','event_time','event_type','platform']

user_event_flagged_2 = user_event_flagged_2.select(*column_needed)
user_event_flagged_2 = user_event_flagged_2.withColumnRenamed("event_time","last_event_time")
user_event_flagged_2 = user_event_flagged_2.withColumnRenamed("event_type","last_event_type")

user_final = user_metr.join(user_event_flagged_2,on="user_id",how="left")
user_final.show(truncate=False)
user_event_2 = user_event.select("user_id","user_name","user_tier","country")
user_event_2 = user_event_2.dropDuplicates()
user_event_2.show(truncate=False)



user_final = user_final.join(user_event_2,on="user_id",how="left")
user_final.select("user_id","user_name","user_tier","country","total_events","total_purchase_value","avg_event_value","last_event_time","last_event_type","platform")
user_final.show(truncate=False)





