In [0]:
# analyze and display monthly max transactions for categories A/B with flag Y from sample data
from pyspark.sql.functions import col, avg, max as spark_max, min as spark_min, count, when, lit, year, month, dayofmonth, rand
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType
import datetime

# 1. Define schema
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("category", StringType(), False),
    StructField("amount", DoubleType(), False),
    StructField("date", DateType(), False)
])

# 2. Generate sample data
categories = ["A", "B", "C", "D"]
base_date = datetime.date(2025, 1, 1)
data = []
for i in range(1, 101):
    cat = categories[i % 4]
    amt = round(1000 * (i % 7) + (i * 3.14) % 500, 2)
    dt = base_date + datetime.timedelta(days=i)
    data.append((i, cat, amt, dt))

# 3. Create DataFrame
df = spark.createDataFrame(data, schema)

# 4. Add random flag column
df = df.withColumn("flag", when(rand() > 0.5, lit("Y")).otherwise(lit("N")))

# 5. Extract year, month, day
df = df.withColumn("year", year(col("date"))) \
       .withColumn("month", month(col("date"))) \
       .withColumn("day", dayofmonth(col("date")))

# 6. Filter for category A or B and flag Y
filtered_df = df.filter((col("category").isin("A", "B")) & (col("flag") == "Y"))

# 7. Group by category and month, aggregate
agg_df = filtered_df.groupBy("category", "month").agg(
    count("*").alias("txn_count"),
    avg("amount").alias("avg_amount"),
    spark_max("amount").alias("max_amount"),
    spark_min("amount").alias("min_amount")
)

# Alias DataFrames
agg_df_alias = agg_df.alias("agg")
filtered_df_alias = filtered_df.alias("filtered")

# 8. Join with original to get detailed info for max transactions
max_txn_df = agg_df_alias.join(
    filtered_df_alias,
    (agg_df_alias.category == filtered_df_alias.category) &
    (agg_df_alias.month == filtered_df_alias.month) &
    (agg_df_alias.max_amount == filtered_df_alias.amount),
    "inner"
).select(
    filtered_df_alias["id"], filtered_df_alias["category"], filtered_df_alias["amount"],
    filtered_df_alias["date"], filtered_df_alias["month"], agg_df_alias["txn_count"]
)

# 9. Sort by month and amount descending
result_df = max_txn_df.orderBy(col("month"), col("amount").desc())

# 10. Display result
display(result_df)