In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType

schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("date_of_transfer", DateType(), True),
    StructField("postcode", StringType(), True),
    StructField("property_type", StringType(), True),
    StructField("old_new", StringType(), True),
    StructField("duration", StringType(), True),
    StructField("paon", StringType(), True),
    StructField("saon", StringType(), True),
    StructField("street", StringType(), True),
    StructField("locality", StringType(), True),
    StructField("town_city", StringType(), True),
    StructField("district", StringType(), True),
    StructField("county", StringType(), True),
    StructField("ppd_category", StringType(), True),
    StructField("record_status", StringType(), True)
])


In [0]:

data_path = "/Volumes/workspace/default/virtusa_data/*.csv"

df = (spark.read
    .option("header", "false")
    .schema(schema)
    .csv(data_path)
)
df.count()

30289104

In [0]:
from pyspark.sql.functions import year, month

df = df.withColumn("year", year("date_of_transfer")) \
       .withColumn("month", month("date_of_transfer"))



In [0]:
df_95_to_21=df.filter((df["year"] >= 1995) & (df["year"] <= 2021))
df_95_to_21.count()

27368592

In [0]:
from pyspark.sql import functions as F
df_2021 = df.filter(df["year"] == 2021)
df_2021.count()

1278278

In [0]:
df_sel = df_2021.select(
    "transaction_id",
    "price",
    "postcode"
)

In [0]:
df_q1 = (
    df_sel
    .filter(
        (F.col("price").isNotNull()) &
        (F.col("postcode").isNotNull())
    )
    .withColumn(
    "postcode_area", F.split(F.col("postcode"), " ").getItem(0)
    )   
)

median_prices = (
    df_q1.groupBy("postcode_area")
    .agg(F.expr("percentile_approx(price, 0.5)").alias("median_price"))
    .orderBy(F.col("median_price").asc())
    .limit(10)
)

median_prices.show()

+-------------+------------+
|postcode_area|median_price|
+-------------+------------+
|          NP2|     30000.0|
|          DL4|     56000.0|
|          TS1|     57000.0|
|          BD1|     59995.0|
|         DL17|     62950.0|
|          SR1|     65000.0|
|          SR8|     70000.0|
|         DN31|     70000.0|
|         BB11|     71000.0|
|          TS3|     72500.0|
+-------------+------------+



In [0]:
df_cb = df_sel.withColumn("postcode_area", F.split(F.col("postcode"), " ").getItem(0))

df_cb = df_cb.filter(F.col("postcode_area").startswith("CB"))

mean_prices = (
    df_cb.groupBy("postcode_area")
    .agg(F.mean("price").alias("mean_price"))
    .orderBy("mean_price")
    .limit(5)
)

mean_prices.show()

+-------------+------------------+
|postcode_area|        mean_price|
+-------------+------------------+
|          CB9|303575.12658227846|
|          CB6| 337535.5477137177|
|          CB7| 341310.2309495897|
|         CB25|418859.19946808513|
|         CB24| 435710.0737704918|
+-------------+------------------+



In [0]:
from pyspark.sql.window import Window

df_q3 = (
    df_95_to_21
    .filter(
        (F.col("price").isNotNull()) &
        (F.col("postcode").isNotNull())
    )
    .withColumn(
    "postcode_area", F.split(F.col("postcode"), " ").getItem(0)
    )   
)
medians = (
    df_q3.groupBy("postcode_area", "year")
    .agg(F.expr("percentile_approx(price, 0.5)").alias("median_price"))
)

w = Window.partitionBy("postcode_area").orderBy("year")

medians = medians.withColumn(
    "prev_median", F.lag("median_price").over(w)
).withColumn(
    "median_diff", F.abs(F.col("median_price") - F.col("prev_median"))
)

medians_filtered = medians.filter(
    (F.col("postcode_area").isNotNull()) & (F.col("median_diff").isNotNull())
)

diff_sum = (
    medians_filtered.groupBy("postcode_area")
    .agg(F.sum("median_diff").alias("cumulative_median_diff_sum"))
    .orderBy("cumulative_median_diff_sum")
    .limit(25)
)

diff_sum.display()

postcode_area,cumulative_median_diff_sum
L66,4000.0
CF7,8000.0
L43,12000.0
GL30,28000.0
RG3,86500.0
L41,94500.0
DN31,98874.0
ST1,100210.0
ST6,106292.0
LA14,107100.0


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window


df_q4 = df.withColumn(
    "postcode_area", F.split(F.col("postcode"), " ").getItem(0)
)

df_cb = df_q4.filter(
    (F.col("year") >= 1995) & (F.col("year") <= 2025) & (F.col("postcode_area").startswith("CB"))
)

medians = df_cb.groupBy("postcode_area", "year").agg(
    F.expr("percentile_approx(price, 0.5)").alias("median_price")
)

min_medians = medians.groupBy("postcode_area").agg(
    F.min("median_price").alias("min_median_price")
).orderBy("min_median_price").limit(5)

top5_cb_areas = [row["postcode_area"] for row in min_medians.collect()]

means = df_cb.filter(F.col("postcode_area").isin(top5_cb_areas)).groupBy(
    "postcode_area", "year"
).agg(F.mean("price").alias("mean_price"))

w = Window.partitionBy("postcode_area").orderBy("year")

means = means.withColumn("p_mean", F.lag("mean_price").over(w)).withColumn(
    "mean_diff", F.abs(F.col("mean_price") - F.coalesce(F.col("p_mean"), F.lit(0)))
)

result = means.groupBy("postcode_area").agg(
    F.sum("mean_diff").alias("cum_mean")
).orderBy("cum_mean")

display(result)

postcode_area,cum_mean
CB9,513211.04357582383
CB6,523251.2574297156
CB7,585639.2568889805
CB8,1129971.0709551102
CB5,2072598.7562686384


In [0]:
median_prices.write.mode("overwrite").csv("/Volumes/workspace/default/virtusa_data/task1.csv", header=True)

In [0]:
mean_prices.write.mode("overwrite").csv("/Volumes/workspace/default/virtusa_data/task2.csv", header=True)

In [0]:
diff_sum.write.mode("overwrite").csv("/Volumes/workspace/default/virtusa_data/task03.csv", header=True)

In [0]:
result.write.mode("overwrite").csv("/Volumes/workspace/default/virtusa_data/task4.csv", header=True)