In [0]:
# ==========================================
# 1. IMPORTS & PROFILER SETUP
# ==========================================
import sys
import os
import time
import pandas as pd
from pyspark.ml import Transformer
from pyspark.ml.util import DefaultParamsWritable, DefaultParamsReadable
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.functions import vector_to_array
from pyspark.sql import functions as F
from pyspark.sql.functions import col, to_timestamp, year, lit, broadcast

class PipelineProfiler:
    def __init__(self):
        self.stats = {}
    def start_timer(self, stage_name):
        self.stats[stage_name] = time.time()
        print(f"Starting {stage_name}...")
    def end_timer(self, stage_name):
        duration = time.time() - self.stats[stage_name]
        print(f"{stage_name} completed in {duration:.2f} seconds.")
        return duration

profiler = PipelineProfiler()
profiler.start_timer("Feature Engineering")

# ==========================================
# 2. CUSTOM TRANSFORMER (Requirement 2a)
# ==========================================
class PriceSegmenter(Transformer, DefaultParamsWritable, DefaultParamsReadable):
    """Adds domain-specific feature engineering (Market Segmentation)."""
    def _transform(self, dataset):
        return dataset.withColumn("Market_Segment", 
            F.when(F.col("Price") < 150000, "Budget")
             .when(F.col("Price") < 450000, "Standard")
             .otherwise("Premium"))

# ==========================================
# 3. DATA INGESTION & LINEAGE (Requirement 1b)
# ==========================================
try:
    df = spark.read.parquet("/Volumes/workspace/default/uk_land_registry/bronze_parquet")
    
    silver_df = df.withColumn("source_file", lit("uk_property_full.csv")) \
                  .withColumn("ingestion_layer", lit("Bronze")) \
                  .select(
                      col("Price").cast("double"),
                      to_timestamp(col("Date"), "yyyy-MM-dd HH:mm").alias("Sale_Date"),
                      "Property_Type", "Old_New", "Town_City", "source_file"
                  ).dropna()

    silver_df = silver_df.withColumn("Sale_Year", year(col("Sale_Date")))
    print("Bronze data loaded with lineage.")

except Exception as e:
    print(f"PIPELINE ERROR: {str(e)}")
    raise e

# ==========================================
# 4. DISTRIBUTED PROCESSING (Requirement 1b)
# ==========================================

# Broadcast Join Implementation for Property Mapping
mapping_data = [("D", "Detached"), ("S", "Semi-Detached"), ("T", "Terraced"), 
                ("P", "Flats/Maisonettes"), ("O", "Other")]
type_mapping_df = spark.createDataFrame(mapping_data, ["Property_Type", "Type_Description"])

silver_df_with_labels = silver_df.join(broadcast(type_mapping_df), on="Property_Type", how="left")

# ==========================================
# 5. ML PREPARATION & SCALING (Requirement 2a)
# ==========================================

# A. Indexing & Custom Segmentation
indexer = StringIndexer(inputCol="Property_Type", outputCol="type_label")
indexed_df = indexer.fit(silver_df_with_labels).transform(silver_df_with_labels)

segmenter = PriceSegmenter()
segmented_df = segmenter.transform(indexed_df)

# B. Vector Assembly & StandardScaler
assembler = VectorAssembler(inputCols=["Price"], outputCol="unscaled_features")
assembled_df = assembler.transform(segmented_df)

scaler = StandardScaler(inputCol="unscaled_features", outputCol="scaled_features", 
                        withStd=True, withMean=True)
scaler_model = scaler.fit(assembled_df)
final_engineered_df = scaler_model.transform(assembled_df)

# ==========================================
# 6. STORAGE & PERFORMANCE EVIDENCE (Requirement 1a & 1c)
# ==========================================
output_path = "/Volumes/workspace/default/uk_land_registry/silver_engineered_parquet"
final_engineered_df.write.mode("overwrite").parquet(output_path)

# Stop the timer and print stats for Dashboard 4
eng_duration = profiler.end_timer("Feature Engineering")

print("\n--- PERFORMANCE SUMMARY ---")
print(f"Feature Engineering Duration: {eng_duration:.2f} seconds")

# Explain Plan for optimization evidence
final_engineered_df.explain(mode="formatted")

# ==========================================
# 7. GITHUB SAMPLE GENERATION (Local Meta-Data)
# ==========================================

sample_path = "/Volumes/workspace/default/uk_land_registry/github_samples"
dbutils.fs.mkdirs(sample_path)

# Creating tiny 1k-row samples for the GitHub /data/samples folder
final_engineered_df.limit(1000).toPandas().to_csv(f"{sample_path}/silver_sample.csv", index=False)
print(f"GitHub Silver sample generated in {sample_path}")

Starting Feature Engineering...
Bronze data loaded with lineage.
Feature Engineering completed in 53.86 seconds.

--- PERFORMANCE SUMMARY ---
Feature Engineering Duration: 53.86 seconds
== Physical Plan ==
AdaptiveSparkPlan (15)
+- == Initial Plan ==
   Project (14)
   +- Project (13)
      +- Project (12)
         +- ColumnarToRow (11)
            +- PhotonResultStage (10)
               +- PhotonBroadcastHashJoin LeftOuter (9)
                  :- PhotonProject (3)
                  :  +- PhotonProject (2)
                  :     +- PhotonScan parquet  (1)
                  +- PhotonShuffleExchangeSource (8)
                     +- PhotonShuffleMapStage (7)
                        +- PhotonShuffleExchangeSink (6)
                           +- PhotonRowToColumnar (5)
                              +- LocalTableScan (4)


(1) PhotonScan parquet 
Output [6]: [Price#13206, Date#13207, Property_Type#13209, Old_New#13210, Town_City#13216, County#13220]
Location: InMemoryFileIndex [dbfs:/Vol