In [0]:

orders = spark.createDataFrame([
    (1, "A", "2024-01-01", 120.0),
    (2, "B", "2024-01-01", 80.0),
    (3, "A", "2024-01-02", 250.0),
    (4, "C", "2024-01-02", 30.0),
    (5, "B", "2024-01-03", 90.0),
], ["order_id","cust","order_dt","amount"])

customers = spark.createDataFrame([
    ("A","Delhi","Retail"),
    ("B","Mumbai","Online"),
    ("C","Pune","Retail")
], ["cust","city","segment"])

In [0]:
from pyspark.sql import functions as F, Window as W, types as T

In [0]:
orders.select(col("cust").alias("customer"), "amount").show()

In [0]:
orders.withColumn("gst", F.round(F.col("amount") * 0.18, 2)).show()

In [0]:
orders.filter(F.col("amount") >= 100).show()

In [0]:
orders.withColumn("bucket",
    F.when(F.col("amount") >= 200, "High")
     .when(F.col("amount") >= 100, "Medium")
     .otherwise("Low")
).show()

In [0]:
orders = orders.withColumn("order_dt", F.to_date("order_dt"))
orders.select("order_dt", F.month("order_dt").alias("month")).show()

In [0]:
orders.groupBy("cust").agg(F.count("*").alias("cnt"), F.sum("amount").alias("total")).show()

In [0]:
orders.groupBy("cust").agg(F.expr("percentile(amount, 0.5)").alias("p50")).show()

In [0]:
orders.rollup("cust").agg(F.sum("amount").alias("sum")).orderBy("cust").show()

In [0]:
orders.join(customers, "cust", "inner").show()

In [0]:
orders.join(F.broadcast(customers), "cust", "left").explain(True)

In [0]:
w = W.partitionBy("cust").orderBy("order_dt")
orders.withColumn("running_total", F.sum("amount").over(w)).show()

In [0]:
orders.withColumn("rank", F.dense_rank().over(w.orderBy(F.col("order_dt").desc())))\
      .filter("rank = 1").show()

In [0]:
w2 = W.partitionBy("cust").orderBy(F.col("order_dt").desc())
orders.withColumn("rn", F.row_number().over(w2))\
      .filter("rn = 1").show()

In [0]:
w2 = W.partitionBy("cust").orderBy(F.col("order_dt").desc())
orders.withColumn("rn", F.row_number().over(w2))\
      .filter("rn = 1").drop("rn").show()

In [0]:
orders.repartition("cust").explain()

In [0]:
orders.select("cust").distinct().show()
orders.dropDuplicates(["cust","order_dt"]).show()