In [1]:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DateType, DecimalType
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder.appName("Task4").getOrCreate()

In [4]:
schema = StructType([
    StructField("transaction_unique_identifier", StringType(), True),
    StructField("price", DecimalType(10, 2), True),
    StructField("date_of_transfer", DateType(), True),
    StructField("postcode", StringType(), True),
    StructField("property_type", StringType(), True),
    StructField("old_new", StringType(), True),
    StructField("duration", StringType(), True),
    StructField("paon", StringType(), True),
    StructField("saon", StringType(), True),
    StructField("street", StringType(), True),
    StructField("locality", StringType(), True),
    StructField("town_city", StringType(), True),
    StructField("district", StringType(), True),
    StructField("county", StringType(), True),
    StructField("ppd_category_type", StringType(), True),
    StructField("record_status", StringType(), True)
])


In [None]:
df = (
    spark.read.format("csv")
    .option("header", "true")
    .schema(schema)
    .load("../data/*.csv") 
)

In [None]:

df = df.withColumns({
    "year": F.year(F.col("date_of_transfer")),
    "postcode_area": F.split(F.col("postcode"), " ").getItem(0)
})
df_cb = df.filter(
    (F.col("year") >= 1995) & (F.col("year") <= 2021) &
    (F.col("postcode_area").startswith("CB"))
)


medians = (
    df_cb.groupBy("postcode_area", "year")
    .agg(F.expr("percentile_approx(price, 0.5)").alias("median_price"))
)

min_medians = (
    medians.groupBy("postcode_area")
    .agg(F.min("median_price").alias("min_median_price"))
    .orderBy("min_median_price")
    .limit(5)
)

top5_cb_areas = [row["postcode_area"] for row in min_medians.collect()]

means = (
    df_cb.filter(F.col("postcode_area").isin(top5_cb_areas))
    .groupBy("postcode_area", "year")
    .agg(F.mean("price").alias("mean_price"))
)

w = Window.partitionBy("postcode_area").orderBy("year")

means = means.withColumn(
    "prev_mean", F.lag("mean_price").over(w)
).withColumn(
    "mean_diff", F.abs(F.col("mean_price") - F.coalesce(F.col("prev_mean"), F.lit(0)))
)

result = (
    means.groupBy("postcode_area")
    .agg(F.sum("mean_diff").alias("cumulative_mean_diff_sum"))
    .orderBy("cumulative_mean_diff_sum")
)

result.show()

+-------------+------------------------+
|postcode_area|cumulative_mean_diff_sum|
+-------------+------------------------+
|          CB6|           397921.877916|
|          CB9|           417879.956958|
|          CB7|           525531.094890|
|          CB8|          1035394.046625|
|          CB5|          1825138.417710|
+-------------+------------------------+

