## Big Data Management Lab 3

## Exploitation Zone

In this notebook we load the data from the formatted zone, and create the KPIs that we require for our final dashboard processes.


In [2]:
# Install and setup PySpark
!pip install pyspark



In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import json
import matplotlib.pyplot as plt
import seaborn as sns
from google.colab import files
import os

In [4]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Lab3-Exploitation-Zone") \
    .master("local[*]") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

print("Spark Session Created Successfully!")
print(f"Spark Version: {spark.version}")
spark.sparkContext.setLogLevel("ERROR")

Spark Session Created Successfully!
Spark Version: 3.5.1


In [5]:
#Display all the configurations
print(f"Python version = {spark.sparkContext.pythonVer}")
print(f"Spark version = {spark.sparkContext.version}")
print(spark.sparkContext.getConf().getAll())

Python version = 3.11
Spark version = 3.5.1
[('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false'), ('spark.app.submitTime', '1750700476549'), ('spark.executor.i

In [6]:
# Retrieve the level of parallelism configured (equal to the number of cores is obtained with "*")
print("Master: ",spark.conf.get("spark.master"))
print("Parallelism: ",spark.sparkContext.defaultParallelism)
print("Minimum number of partitions: ",spark.sparkContext.defaultMinPartitions)

Master:  local[*]
Parallelism:  2
Minimum number of partitions:  2


In [7]:
from google.colab import drive
import os
from datetime import datetime

# Mount Google Drive
print("Mounting Google Drive...")
drive.mount('/content/drive')

# Auto-generate project name or let user customize
def setup_project():
    """Interactive setup for the data pipeline project"""
    print("\n" + "="*60)
    print("DATA PIPELINE PROJECT SETUP")
    print("="*60)

    # Option 1: Use default name with timestamp
    default_name = f"data-pipeline-{datetime.now().strftime('%Y%m%d')}"

    print(f"\nDefault project name: {default_name}")
    user_input = input("Press Enter to use default, or type a custom project name: ").strip()

    project_name = user_input if user_input else default_name

    return project_name

# Get project name
PROJECT_NAME = setup_project()

# Set up all paths
BASE_DRIVE_PATH = "/content/drive/MyDrive"
DRIVE_PATH = f"{BASE_DRIVE_PATH}/{PROJECT_NAME}"
LANDING_ZONE = f"{DRIVE_PATH}/landing_zone"
FORMATTED_ZONE = f"{DRIVE_PATH}/formatted_zone"
EXPLOITATION_ZONE = f"{DRIVE_PATH}/exploitation_zone"

# Create directories
def create_project_structure():
    """Create the complete project folder structure"""
    directories = {
        "Project Root": DRIVE_PATH,
        "Landing Zone": LANDING_ZONE,
        "Formatted Zone": FORMATTED_ZONE,
        "Exploitation Zone": EXPLOITATION_ZONE
    }

    print(f"\nCreating project structure in: {DRIVE_PATH}")
    print("-" * 40)

    for name, path in directories.items():
        if not os.path.exists(path):
            os.makedirs(path)
            print(f"✓ Created {name}: {path}")
        else:
            print(f"✓ {name} exists: {path}")

# Run the setup
create_project_structure()

Mounting Google Drive...
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).

DATA PIPELINE PROJECT SETUP

Default project name: data-pipeline-20250623
Press Enter to use default, or type a custom project name: dbm-lab3

Creating project structure in: /content/drive/MyDrive/dbm-lab3
----------------------------------------
✓ Project Root exists: /content/drive/MyDrive/dbm-lab3
✓ Landing Zone exists: /content/drive/MyDrive/dbm-lab3/landing_zone
✓ Formatted Zone exists: /content/drive/MyDrive/dbm-lab3/formatted_zone
✓ Exploitation Zone exists: /content/drive/MyDrive/dbm-lab3/exploitation_zone


## Loading the data from the formatted Zone

In [8]:
def load_formatted_data():
    """
    Load processed data from formatted zone
    """
    print("LOADING DATA FROM FORMATTED ZONE...")

    try:
        # Load income data
        income_df = spark.read.parquet(f"{FORMATTED_ZONE}/income")
        print(f"Loaded income data: {income_df.count()} rows")

        # Load prices data
        prices_df = spark.read.parquet(f"{FORMATTED_ZONE}/prices")
        print(f"Loaded prices data: {prices_df.count()} rows")

        # Load cultural sites data
        cultural_df = spark.read.parquet(f"{FORMATTED_ZONE}/cultural-sites")
        print(f"Loaded cultural sites data: {cultural_df.count()} rows")

        # Show schemas
        print("\nIncome data schema:")
        income_df.printSchema()

        print("\nPrices data schema:")
        prices_df.printSchema()

        print("\nCultural sites schema:")
        cultural_df.printSchema()

        return income_df, prices_df, cultural_df

    except Exception as e:
        print(f"Error loading formatted data: {e}")
        return None, None, None

# Load the data
income_formatted, prices_formatted, cultural_formatted = load_formatted_data()

LOADING DATA FROM FORMATTED ZONE...
Loaded income data: 365 rows
Loaded prices data: 359 rows
Loaded cultural sites data: 871 rows

Income data schema:
root
 |-- district_code: integer (nullable = true)
 |-- district_name: string (nullable = true)
 |-- neighborhood_code: integer (nullable = true)
 |-- neighborhood_name: string (nullable = true)
 |-- population: integer (nullable = true)
 |-- rfd_index: double (nullable = true)
 |-- year: integer (nullable = true)


Prices data schema:
root
 |-- neighborhood_name: string (nullable = true)
 |-- district_name: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- used_amount: double (nullable = true)
 |-- per_meter: double (nullable = true)
 |-- used_per_meter: double (nullable = true)
 |-- year: integer (nullable = true)


Cultural sites schema:
root
 |-- district_name: string (nullable = true)
 |-- neighborhood_name: string (nullable = true)
 |-- cultural_site_name: string (nullable = true)
 |-- cultural_site_latitude: dou

In [9]:
prices_formatted.show()

+--------------------+--------------+------+-----------+---------+--------------+----+
|   neighborhood_name| district_name|amount|used_amount|per_meter|used_per_meter|year|
+--------------------+--------------+------+-----------+---------+--------------+----+
|            el Raval|  Ciutat Vella| 240.3|      240.0|   3469.9|        3468.9|2017|
|      el Barri Gòtic|  Ciutat Vella| 472.7|      424.0|   4565.8|        4441.9|2017|
|      la Barceloneta|  Ciutat Vella| 197.7|      198.2|   4501.1|        4551.3|2017|
|Sant Pere, Santa ...|  Ciutat Vella| 314.3|      310.4|   4593.7|        4506.9|2017|
|       el Fort Pienc|      Eixample| 400.4|      383.2|   4296.6|        4217.6|2017|
|  la Sagrada Família|      Eixample| 255.8|      255.8|   3494.5|        3494.5|2017|
|la Dreta de l'Eix...|      Eixample| 519.8|      535.3|   4628.0|        4731.0|2017|
|l'Antiga Esquerra...|      Eixample| 372.2|      363.4|   4218.9|        4178.8|2017|
|la Nova Esquerra ...|      Eixample| 339.6

### Step 1: Loading data from formatting zone

In [10]:
# Income data
print("INCOME DATA:")
print(f"  Shape: {income_formatted.count()} rows × {len(income_formatted.columns)} columns")
print(f"  Columns: {income_formatted.columns}")
income_formatted.show(3)

# Prices data
print("\nPRICES DATA:")
print(f"  Shape: {prices_formatted.count()} rows × {len(prices_formatted.columns)} columns")
print(f"  Columns: {prices_formatted.columns}")
prices_formatted.show(3)

# Cultural data
print("\nCULTURAL DATA:")
print(f"  Shape: {cultural_formatted.count()} rows × {len(cultural_formatted.columns)} columns")
print(f"  Columns: {cultural_formatted.columns}")
cultural_formatted.show(3)

INCOME DATA:
  Shape: 365 rows × 7 columns
  Columns: ['district_code', 'district_name', 'neighborhood_code', 'neighborhood_name', 'population', 'rfd_index', 'year']
+-------------+-------------+-----------------+-----------------+----------+---------+----+
|district_code|district_name|neighborhood_code|neighborhood_name|population|rfd_index|year|
+-------------+-------------+-----------------+-----------------+----------+---------+----+
|            1| Ciutat Vella|                1|         el Raval|     47617|     75.8|2015|
|            1| Ciutat Vella|                2|   el Barri Gòtic|     15269|    108.5|2015|
|            1| Ciutat Vella|                3|   la Barceloneta|     15036|     76.6|2015|
+-------------+-------------+-----------------+-----------------+----------+---------+----+
only showing top 3 rows


PRICES DATA:
  Shape: 359 rows × 7 columns
  Columns: ['neighborhood_name', 'district_name', 'amount', 'used_amount', 'per_meter', 'used_per_meter', 'year']
+------

In [11]:
print("Income data schema:")
income_formatted.printSchema()

print("\nPrices data schema:")
prices_formatted.printSchema()

print("\nCultural data schema:")
cultural_formatted.printSchema()

Income data schema:
root
 |-- district_code: integer (nullable = true)
 |-- district_name: string (nullable = true)
 |-- neighborhood_code: integer (nullable = true)
 |-- neighborhood_name: string (nullable = true)
 |-- population: integer (nullable = true)
 |-- rfd_index: double (nullable = true)
 |-- year: integer (nullable = true)


Prices data schema:
root
 |-- neighborhood_name: string (nullable = true)
 |-- district_name: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- used_amount: double (nullable = true)
 |-- per_meter: double (nullable = true)
 |-- used_per_meter: double (nullable = true)
 |-- year: integer (nullable = true)


Cultural data schema:
root
 |-- district_name: string (nullable = true)
 |-- neighborhood_name: string (nullable = true)
 |-- cultural_site_name: string (nullable = true)
 |-- cultural_site_latitude: double (nullable = true)
 |-- cultural_site_longitude: double (nullable = true)
 |-- creation_date: date (nullable = true)



#### Missing Data

In [12]:
# Check missing data in prices dataset
total_rows = prices_formatted.count()
print(f"Total records: {total_rows}")

# Check each column for nulls
print("\nMissing data count:")
print(f"amount: {prices_formatted.filter('amount IS NULL').count()}")
print(f"used_amount: {prices_formatted.filter('used_amount IS NULL').count()}")
print(f"per_meter: {prices_formatted.filter('per_meter IS NULL').count()}")
print(f"used_per_meter: {prices_formatted.filter('used_per_meter IS NULL').count()}")

# Check if both used columns are missing together
both_missing = prices_formatted.filter("used_amount IS NULL AND used_per_meter IS NULL").count()
print(f"\nBoth used columns missing: {both_missing}")

# Show records with missing data
print("\nRecords with missing used_amount:")
prices_formatted.filter("used_amount IS NULL").select("neighborhood_name", "district_name", "year", "amount", "used_amount", "per_meter", "used_per_meter").show()

Total records: 359

Missing data count:
amount: 0
used_amount: 2
per_meter: 0
used_per_meter: 2

Both used columns missing: 2

Records with missing used_amount:
+-----------------+--------------+----+------+-----------+---------+--------------+
|neighborhood_name| district_name|year|amount|used_amount|per_meter|used_per_meter|
+-----------------+--------------+----+------+-----------+---------+--------------+
|         la Clota|Horta-Guinardó|2016| 160.0|       NULL|    342.6|          NULL|
|         la Clota|Horta-Guinardó|2013| 108.0|       NULL|   1723.6|          NULL|
+-----------------+--------------+----+------+-----------+---------+--------------+



In [13]:
# We will fill the missing with 0 since it seems this is simply no sale
prices_formatted = prices_formatted.fillna(0, subset=['used_amount', 'used_per_meter'])

#### District and Neighborhood Consistency

In [14]:
# Check if district-neighborhood mappings are consistent across datasets
income_districts = income_formatted.select("district_name", "neighborhood_name").distinct()
prices_districts = prices_formatted.select("district_name", "neighborhood_name").distinct()
cultural_districts = cultural_formatted.select("district_name", "neighborhood_name").distinct()

print(f"  Income data: {income_districts.count()} unique district-neighborhood combinations")
print(f"  Prices data: {prices_districts.count()} unique district-neighborhood combinations")
print(f"  Cultural data: {cultural_districts.count()} unique district-neighborhood combinations")

  Income data: 73 unique district-neighborhood combinations
  Prices data: 73 unique district-neighborhood combinations
  Cultural data: 68 unique district-neighborhood combinations


We don't have perfect coherence among the datasets, but we think that is ok since we don't necessarily need everything to match up perfectly, and in this case it depends entirely on if cultural sites exist in that district. Since it seems 5 districts don't have any cultural sites, it is fine, we just need to be aware of this when joining datasets, such that we don't lose information.

In [15]:
# Check for mismatched neighborhood-district assignments
income_mapping = income_formatted.select("district_name", "neighborhood_name").distinct()
prices_mapping = prices_formatted.select("district_name", "neighborhood_name").distinct()

# Find neighborhoods that appear in different districts across datasets
mismatched = income_mapping.join(
    prices_mapping,
    income_mapping.neighborhood_name == prices_mapping.neighborhood_name,
    "inner"
).filter(
    income_mapping.district_name != prices_mapping.district_name
)

mismatch_count = mismatched.count()
if mismatch_count > 0:
    print(f"  ❌ Found {mismatch_count} neighborhood-district mismatches between income and prices data")
    mismatched.show()
else:
    print("  ✅ District-neighborhood mappings consistent between income and prices")

  ✅ District-neighborhood mappings consistent between income and prices


#### Population Validation

In [16]:
print("Population Data:")

# Check for reasonable population ranges
pop_stats = income_formatted.select(
    min("population").alias("min_pop"),
    max("population").alias("max_pop"),
    avg("population").alias("avg_pop")
).collect()[0]

print(f"  Population range: {pop_stats['min_pop']:,} to {pop_stats['max_pop']:,}")
print(f"  Average population: {pop_stats['avg_pop']:,.0f}")

# Check for zero or negative populations
invalid_pop = income_formatted.filter(col("population") <= 0).count()
if invalid_pop > 0:
    print(f"  ❌ {invalid_pop} neighborhoods with zero or negative population")
else:
    print("  ✅ All populations are positive")

Population Data:
  Population range: 504 to 58,315
  Average population: 22,116
  ✅ All populations are positive


### RFD Index Validation (Relative Family Disposable Income Index)

In [17]:
rfd_stats = income_formatted.select(
    min("rfd_index").alias("min_rfd"),
    max("rfd_index").alias("max_rfd"),
    avg("rfd_index").alias("avg_rfd")
).collect()[0]

print(f"  RFD Index range: {rfd_stats['min_rfd']:.1f} to {rfd_stats['max_rfd']:.1f}")
print(f"  Average RFD Index: {rfd_stats['avg_rfd']:.1f}")

extreme_rfd = income_formatted.filter(
    (col("rfd_index") < 30) | (col("rfd_index") > 255)
)

extreme_count = extreme_rfd.count()
if extreme_count > 0:
    print(f"  ⚠️  {extreme_count} neighborhoods with extreme RFD values (< 30 or > 200)")
    print("  Extreme RFD values:")
    extreme_rfd.select("neighborhood_name", "district_name", "rfd_index").show()
else:
    print("  ✅ All RFD indices within reasonable range")

  RFD Index range: 34.3 to 251.7
  Average RFD Index: 92.4
  ✅ All RFD indices within reasonable range


#### Price Data Validation

In [18]:
print("Price Data Validation:")

# Check for negative prices
negative_prices = prices_formatted.filter(
    (col("amount") < 0) | (col("used_amount") < 0) |
    (col("per_meter") < 0) | (col("used_per_meter") < 0)
).count()

if negative_prices > 0:
    print(f"  ❌ {negative_prices} records with negative prices")
else:
    print("  ✅ No negative prices found")

# Check price ranges
price_stats = prices_formatted.select(
    min("per_meter").alias("min_per_m2"),
    max("per_meter").alias("max_per_m2"),
    avg("per_meter").alias("avg_per_m2")
).collect()[0]

print(f"  Price per m²: €{price_stats['min_per_m2']:.0f} to €{price_stats['max_per_m2']:.0f}")
print(f"  Average price per m²: €{price_stats['avg_per_m2']:.0f}")

# Check for unrealistic prices (too low or too high for Barcelona)
unrealistic_prices = prices_formatted.filter(
    (col("per_meter") < 1000) | (col("per_meter") > 15000)
)

unrealistic_count = unrealistic_prices.count()
if unrealistic_count > 0:
    print(f"  ⚠️  {unrealistic_count} records with potentially unrealistic prices per m²")
    print("  Sample unrealistic prices:")
    unrealistic_prices.select("neighborhood_name", "per_meter", "year").show(5)

Price Data Validation:
  ✅ No negative prices found
  Price per m²: €343 to €6951
  Average price per m²: €2732
  ⚠️  7 records with potentially unrealistic prices per m²
  Sample unrealistic prices:
+-----------------+---------+----+
|neighborhood_name|per_meter|year|
+-----------------+---------+----+
|         la Clota|    342.6|2016|
|      Can Peguera|    613.9|2016|
|      Can Peguera|    612.6|2013|
| la Trinitat Nova|    945.1|2013|
| la Trinitat Nova|    968.2|2014|
+-----------------+---------+----+
only showing top 5 rows



Even though the "outliers" do seem somewhat unrealistic, this still is possible, and I don't believe these are actual outliers in the data.

#### Temporal Conistency Checks

In [19]:
# Check year ranges
income_years = income_formatted.select("year").distinct().orderBy("year")
prices_years = prices_formatted.select("year").distinct().orderBy("year")

print("Year coverage:")
print(f"  Income data: {[row.year for row in income_years.collect()]}")
print(f"  Prices data: {[row.year for row in prices_years.collect()]}")

# Cultural sites creation dates
cultural_date_range = cultural_formatted.select(
    min("creation_date").alias("earliest"),
    max("creation_date").alias("latest")
).collect()[0]

print(f"  Cultural sites creation: {cultural_date_range['earliest']} to {cultural_date_range['latest']}")

# Check for future dates in cultural sites
from datetime import date
today = date.today()

future_dates = cultural_formatted.filter(col("creation_date") > today).count()
if future_dates > 0:
    print(f"  ❌ {future_dates} cultural sites with future creation dates")
else:
    print("  ✅ No future creation dates found")

Year coverage:
  Income data: [2013, 2014, 2015, 2016, 2017]
  Prices data: [2013, 2014, 2015, 2016, 2017]
  Cultural sites creation: 1984-05-30 to 2022-12-16
  ✅ No future creation dates found


## Step 2: Define new KPIs and final data structure for dashboarding.  Basic descriptive Analysis

KPIs considered

1. Cultural Sites Per District: How many cultural sites per district?
2. Housing Affordability Trend: RFD_INDEX / per_m2_price
3. City Pice Evolution: By how much are prices increasing every year?
4. Income Inequality Progression (Max(rfd) - Min(rdf)) / year
5. City Price Index (Average price across the whole city -> can calculate relative values)
6. Neighborhood Population Growth Rate: (current_year_population - previous_year_population) / previous_year_population × 100

In [20]:
from pyspark.sql.functions import count, desc, year, col

def calculate_cultural_sites_per_district_per_year():
    """
    Calculate cumulative cultural sites per district by year
    """

    print("KPI 1: CULTURAL SITES PER DISTRICT PER YEAR")
    print("=" * 50)

    # Extract year from creation_date and count sites available each year
    cultural_with_year = cultural_formatted.withColumn(
        "creation_year",
        year(col("creation_date"))
    )

    # For each year, count sites that existed by that year (cumulative)
    # Sites created in 2010 exist in 2010, 2011, 2012, etc.

    # Get all years we want to analyze (from income data)
    analysis_years = income_formatted.select("year").distinct().collect()

    results = []

    for year_row in analysis_years:
        analysis_year = year_row["year"]

        # Count sites that existed by this year (creation_year <= analysis_year)
        sites_by_year = cultural_with_year.filter(
            col("creation_year") <= analysis_year
        ).groupBy("district_name").agg(
            count("cultural_site_name").alias("cultural_sites_available")
        ).withColumn("year", lit(analysis_year))

        results.append(sites_by_year)

    # Union all years together
    from functools import reduce
    cultural_sites_by_district_year = reduce(lambda df1, df2: df1.union(df2), results)

    # Calculate sites per district (average across districts for each year)
    total_districts = cultural_formatted.select("district_name").distinct().count()

    final_kpi = cultural_sites_by_district_year.withColumn(
        "sites_per_district",
        round(col("cultural_sites_available") / total_districts, 2)
    )

    print("Cultural sites per district by year:")
    final_kpi.orderBy("year", desc("cultural_sites_available")).show()

    # Summary by year
    print("\nCity-wide summary by year:")
    yearly_summary = final_kpi.groupBy("year").agg(
        sum("cultural_sites_available").alias("total_sites_citywide"),
        round(avg("cultural_sites_available"), 1).alias("avg_sites_per_district")
    ).orderBy("year")

    yearly_summary.show()

    return final_kpi



In [21]:
kpi_1 = calculate_cultural_sites_per_district_per_year()
kpi_1

KPI 1: CULTURAL SITES PER DISTRICT PER YEAR
Cultural sites per district by year:
+-------------------+------------------------+----+------------------+
|      district_name|cultural_sites_available|year|sites_per_district|
+-------------------+------------------------+----+------------------+
|       Ciutat Vella|                      84|2013|               8.4|
|Sarrià-Sant Gervasi|                      77|2013|               7.7|
|     Sants-Montjuïc|                      66|2013|               6.6|
|           Eixample|                      57|2013|               5.7|
|             Gràcia|                      50|2013|               5.0|
|         Sant Martí|                      41|2013|               4.1|
|          Les Corts|                      37|2013|               3.7|
|     Horta-Guinardó|                      36|2013|               3.6|
|        Sant Andreu|                      36|2013|               3.6|
|         Nou Barris|                      16|2013|               1

DataFrame[district_name: string, cultural_sites_available: bigint, year: int, sites_per_district: double]



KPI 1:
- per district and year

Since we have many neighborhoods, we select the metric to be cultural sites per district. The motivation is that cultural sites are likely a driver of higher rental prices, and perhaps income and vice versa.

#### KPI 2: Affordability Ratio

KPI 2: Housing Affordability Trend

RFD_INDEX / per_m2_price

In [22]:
# KPI 2: Price Trend by District (Simple & Clear)

from pyspark.sql.functions import col, avg, round as spark_round, lag
from pyspark.sql.window import Window

def calculate_price_trend_by_district():
    """
    Simple: Track average price per m2 by district and year with percentage change
    """

    print("KPI 2: PRICE TREND BY DISTRICT")
    print("=" * 40)

    # Track prices by district and year
    price_by_year = prices_formatted.groupBy("district_name", "year").agg(
        spark_round(avg("per_meter"), 0).alias("avg_price_per_m2"),
        count("*").alias("neighborhoods_count")
    ).orderBy("district_name", "year")

    # Add percentage change calculation
    window_spec = Window.partitionBy("district_name").orderBy("year")

    price_trend = price_by_year.withColumn(
        "previous_year_price",
        lag("avg_price_per_m2").over(window_spec)
    ).withColumn(
        "price_change_percent",
        spark_round(
            when(col("previous_year_price").isNotNull(),
                ((col("avg_price_per_m2") - col("previous_year_price")) / col("previous_year_price")) * 100
            ).otherwise(None), 1
        )
    ).withColumn(
        "price_change_euros",
        spark_round(col("avg_price_per_m2") - col("previous_year_price"), 0)
    )

    print("Price trend with percentage change by district:")
    price_trend.select(
        "district_name", "year", "avg_price_per_m2",
        "price_change_euros", "price_change_percent"
    ).show()

    # Show which districts had highest price growth
    print("\nDistricts with highest price growth (average across years):")
    avg_growth_by_district = price_trend.filter(
        col("price_change_percent").isNotNull()
    ).groupBy("district_name").agg(
        spark_round(avg("price_change_percent"), 1).alias("avg_annual_growth_percent")
    ).orderBy(col("avg_annual_growth_percent").desc())

    avg_growth_by_district.show()

    return price_trend

# Run KPI 2 - Simple price tracking
kpi_2 = calculate_price_trend_by_district()

KPI 2: PRICE TREND BY DISTRICT
Price trend with percentage change by district:
+--------------+----+----------------+------------------+--------------------+
| district_name|year|avg_price_per_m2|price_change_euros|price_change_percent|
+--------------+----+----------------+------------------+--------------------+
|  Ciutat Vella|2013|          2624.0|              NULL|                NULL|
|  Ciutat Vella|2014|          2821.0|             197.0|                 7.5|
|  Ciutat Vella|2015|          3137.0|             316.0|                11.2|
|  Ciutat Vella|2016|          3688.0|             551.0|                17.6|
|  Ciutat Vella|2017|          4283.0|             595.0|                16.1|
|      Eixample|2013|          2836.0|              NULL|                NULL|
|      Eixample|2014|          3025.0|             189.0|                 6.7|
|      Eixample|2015|          3296.0|             271.0|                 9.0|
|      Eixample|2016|          3382.0|              

In [23]:
kpi_2.show()

+--------------+----+----------------+-------------------+-------------------+--------------------+------------------+
| district_name|year|avg_price_per_m2|neighborhoods_count|previous_year_price|price_change_percent|price_change_euros|
+--------------+----+----------------+-------------------+-------------------+--------------------+------------------+
|  Ciutat Vella|2013|          2624.0|                  4|               NULL|                NULL|              NULL|
|  Ciutat Vella|2014|          2821.0|                  4|             2624.0|                 7.5|             197.0|
|  Ciutat Vella|2015|          3137.0|                  4|             2821.0|                11.2|             316.0|
|  Ciutat Vella|2016|          3688.0|                  4|             3137.0|                17.6|             551.0|
|  Ciutat Vella|2017|          4283.0|                  4|             3688.0|                16.1|             595.0|
|      Eixample|2013|          2836.0|          

#### KPI-3: Year-Price Index

In [24]:
# KPI 3: Income Evolution
# How is income (RFD index) changing over time?

from pyspark.sql.functions import col, avg, round as spark_round, lag
from pyspark.sql.window import Window

def calculate_income_evolution():
    """
    Track how income (RFD index) is evolving over time
    """

    print("KPI 3: INCOME EVOLUTION")
    print("=" * 30)

    # City-wide income evolution by year
    yearly_income = income_formatted.groupBy("year").agg(
        spark_round(avg("rfd_index"), 1).alias("avg_rfd_index"),
        count("*").alias("neighborhoods_count")
    ).orderBy("year")

    # Add year-over-year income change
    window_spec = Window.orderBy("year")

    income_evolution = yearly_income.withColumn(
        "previous_year_rfd",
        lag("avg_rfd_index").over(window_spec)
    ).withColumn(
        "rfd_change",
        spark_round(col("avg_rfd_index") - col("previous_year_rfd"), 1)
    ).withColumn(
        "rfd_growth_percent",
        spark_round(
            when(col("previous_year_rfd").isNotNull(),
                ((col("avg_rfd_index") - col("previous_year_rfd")) / col("previous_year_rfd")) * 100
            ).otherwise(None), 1
        )
    )

    print("City-wide income evolution:")
    income_evolution.select(
        "year", "avg_rfd_index", "rfd_change", "rfd_growth_percent"
    ).show()

    # Income evolution by district
    print("\nIncome evolution by district:")
    district_income = income_formatted.groupBy("district_name", "year").agg(
        spark_round(avg("rfd_index"), 1).alias("avg_rfd_index")
    ).orderBy("district_name", "year")

    # Add percentage change for districts
    window_spec_district = Window.partitionBy("district_name").orderBy("year")

    district_income_growth = district_income.withColumn(
        "previous_year_rfd",
        lag("avg_rfd_index").over(window_spec_district)
    ).withColumn(
        "rfd_growth_percent",
        spark_round(
            when(col("previous_year_rfd").isNotNull(),
                ((col("avg_rfd_index") - col("previous_year_rfd")) / col("previous_year_rfd")) * 100
            ).otherwise(None), 1
        )
    )

    district_income_growth.select(
        "district_name", "year", "avg_rfd_index", "rfd_growth_percent"
    ).show()

    # Show which districts had best/worst income growth
    print("\nAverage income growth by district:")
    avg_income_growth = district_income_growth.filter(
        col("rfd_growth_percent").isNotNull()
    ).groupBy("district_name").agg(
        spark_round(avg("rfd_growth_percent"), 1).alias("avg_annual_rfd_growth")
    ).orderBy(col("avg_annual_rfd_growth").desc())

    avg_income_growth.show()

    return district_income_growth

# Run KPI 3 - Income Evolution
kpi_3 = calculate_income_evolution()

KPI 3: INCOME EVOLUTION
City-wide income evolution:
+----+-------------+----------+------------------+
|year|avg_rfd_index|rfd_change|rfd_growth_percent|
+----+-------------+----------+------------------+
|2013|         91.5|      NULL|              NULL|
|2014|         91.3|      -0.2|              -0.2|
|2015|         92.3|       1.0|               1.1|
|2016|         93.0|       0.7|               0.8|
|2017|         93.7|       0.7|               0.8|
+----+-------------+----------+------------------+


Income evolution by district:
+--------------+----+-------------+------------------+
| district_name|year|avg_rfd_index|rfd_growth_percent|
+--------------+----+-------------+------------------+
|  Ciutat Vella|2013|         84.3|              NULL|
|  Ciutat Vella|2014|         85.4|               1.3|
|  Ciutat Vella|2015|         89.3|               4.6|
|  Ciutat Vella|2016|         91.9|               2.9|
|  Ciutat Vella|2017|         89.1|              -3.0|
|      Eixample|2

In [25]:
kpi_3.show()

+--------------+----+-------------+-----------------+------------------+
| district_name|year|avg_rfd_index|previous_year_rfd|rfd_growth_percent|
+--------------+----+-------------+-----------------+------------------+
|  Ciutat Vella|2013|         84.3|             NULL|              NULL|
|  Ciutat Vella|2014|         85.4|             84.3|               1.3|
|  Ciutat Vella|2015|         89.3|             85.4|               4.6|
|  Ciutat Vella|2016|         91.9|             89.3|               2.9|
|  Ciutat Vella|2017|         89.1|             91.9|              -3.0|
|      Eixample|2013|        116.4|             NULL|              NULL|
|      Eixample|2014|        116.1|            116.4|              -0.3|
|      Eixample|2015|        116.3|            116.1|               0.2|
|      Eixample|2016|        119.9|            116.3|               3.1|
|      Eixample|2017|        122.6|            119.9|               2.3|
|        Gràcia|2013|        102.2|             NUL

#### KPI 4: Income Inequality Progression

In [26]:
# KPI 4: Income Inequality Progression
# (Max(rfd) - Min(rfd)) / year

from pyspark.sql.functions import col, max, min, round as spark_round, lag
from pyspark.sql.window import Window

def calculate_income_inequality_progression():
    """
    Calculate income inequality progression: (Max(rfd) - Min(rfd)) by year
    """

    print("KPI 4: INCOME INEQUALITY PROGRESSION")
    print("=" * 50)

    # Calculate inequality (gap between richest and poorest) by year
    yearly_inequality = income_formatted.groupBy("year").agg(
        max("rfd_index").alias("max_rfd_index"),
        min("rfd_index").alias("min_rfd_index"),
        spark_round(avg("rfd_index"), 2).alias("avg_rfd_index"),
        count("*").alias("neighborhoods_count")
    ).withColumn(
        "inequality_gap",
        spark_round(col("max_rfd_index") - col("min_rfd_index"), 2)
    ).orderBy("year")

    print("Income inequality by year:")
    yearly_inequality.show()

    # Calculate year-over-year inequality changes
    window_spec = Window.orderBy("year")

    inequality_changes = yearly_inequality.withColumn(
        "previous_year_gap",
        lag("inequality_gap").over(window_spec)
    ).withColumn(
        "gap_change",
        spark_round(col("inequality_gap") - col("previous_year_gap"), 2)
    ).withColumn(
        "gap_change_percent",
        spark_round(
            ((col("inequality_gap") - col("previous_year_gap")) / col("previous_year_gap")) * 100,
            2
        )
    )

    print("\nYear-over-year inequality changes:")
    inequality_changes.select(
        "year",
        "inequality_gap",
        "previous_year_gap",
        "gap_change",
        "gap_change_percent"
    ).show()

    # Show which neighborhoods are the extremes each year
    print("\nRichest neighborhoods by year:")
    richest_by_year = income_formatted.join(
        yearly_inequality.select("year", "max_rfd_index"),
        on="year"
    ).filter(col("rfd_index") == col("max_rfd_index")).select(
        "year", "neighborhood_name", "district_name", "rfd_index"
    ).orderBy("year")
    richest_by_year.show()

    print("\nPoorest neighborhoods by year:")
    poorest_by_year = income_formatted.join(
        yearly_inequality.select("year", "min_rfd_index"),
        on="year"
    ).filter(col("rfd_index") == col("min_rfd_index")).select(
        "year", "neighborhood_name", "district_name", "rfd_index"
    ).orderBy("year")
    poorest_by_year.show()

    # Calculate total inequality trend
    inequality_data = inequality_changes.collect()
    if len(inequality_data) > 1:
        first_year_gap = inequality_data[0]["inequality_gap"]
        last_year_gap = inequality_data[-1]["inequality_gap"]
        total_gap_change = last_year_gap - first_year_gap

        print(f"\nSummary:")
        print(f"Period: {inequality_data[0]['year']} to {inequality_data[-1]['year']}")
        print(f"Starting inequality gap: {first_year_gap:.1f} points")
        print(f"Ending inequality gap: {last_year_gap:.1f} points")
        print(f"Total gap change: {total_gap_change:+.1f} points")
        print(f"Trend: {'Inequality INCREASING' if total_gap_change > 0 else 'Inequality DECREASING'}")

    return inequality_changes

# Run KPI 4
kpi_4 = calculate_income_inequality_progression()

KPI 4: INCOME INEQUALITY PROGRESSION
Income inequality by year:
+----+-------------+-------------+-------------+-------------------+--------------+
|year|max_rfd_index|min_rfd_index|avg_rfd_index|neighborhoods_count|inequality_gap|
+----+-------------+-------------+-------------+-------------------+--------------+
|2013|        243.9|         38.5|        91.55|                 73|         205.4|
|2014|        251.7|         34.7|        91.33|                 73|         217.0|
|2015|        250.5|         34.5|         92.3|                 73|         216.0|
|2016|        242.4|         34.3|        93.01|                 73|         208.1|
|2017|        248.8|         38.6|        93.67|                 73|         210.2|
+----+-------------+-------------+-------------+-------------------+--------------+


Year-over-year inequality changes:
+----+--------------+-----------------+----------+------------------+
|year|inequality_gap|previous_year_gap|gap_change|gap_change_percent|
+--

In [27]:
kpi_4.show()

+----+-------------+-------------+-------------+-------------------+--------------+-----------------+----------+------------------+
|year|max_rfd_index|min_rfd_index|avg_rfd_index|neighborhoods_count|inequality_gap|previous_year_gap|gap_change|gap_change_percent|
+----+-------------+-------------+-------------+-------------------+--------------+-----------------+----------+------------------+
|2013|        243.9|         38.5|        91.55|                 73|         205.4|             NULL|      NULL|              NULL|
|2014|        251.7|         34.7|        91.33|                 73|         217.0|            205.4|      11.6|              5.65|
|2015|        250.5|         34.5|         92.3|                 73|         216.0|            217.0|      -1.0|             -0.46|
|2016|        242.4|         34.3|        93.01|                 73|         208.1|            216.0|      -7.9|             -3.66|
|2017|        248.8|         38.6|        93.67|                 73|        

In [28]:
#### KPI 5:

In [29]:
# KPI 5: District Price Index (Simple)
# Average price by district relative to city average

from pyspark.sql.functions import col, avg, round as spark_round

def calculate_district_price_index():
    """
    Simple: Calculate price index by district relative to city average
    """

    print("KPI 5: DISTRICT PRICE INDEX")
    print("=" * 35)

    # Step 1: Get city average price (baseline = 100)
    city_avg_price = prices_formatted.agg(avg("per_meter")).collect()[0][0]
    print(f"City average price: €{city_avg_price:.0f}/m² (Index = 100)")

    # Step 2: Calculate average price by district and year
    district_prices = prices_formatted.groupBy("district_name", "year").agg(
        spark_round(avg("per_meter"), 0).alias("avg_price_per_m2")
    ).withColumn(
        "price_index",
        spark_round((col("avg_price_per_m2") / city_avg_price) * 100, 1)
    ).withColumn(
        "price_vs_city_avg",
        spark_round(col("avg_price_per_m2") - city_avg_price, 0)
    ).orderBy("district_name", "year")

    print("\nPrice index by district and year:")
    district_prices.show()

    # Simple categories
    print("\nDistrict price categories by year:")
    district_categories = district_prices.withColumn(
        "category",
        when(col("price_index") > 120, "Expensive")
        .when(col("price_index") > 100, "Above Average")
        .when(col("price_index") > 80, "Below Average")
        .otherwise("Affordable")
    ).select("district_name", "year", "avg_price_per_m2", "price_index", "category")

    district_categories.show()

    return district_prices

# Run KPI 5 - Simple district version
kpi_5 = calculate_district_price_index()

KPI 5: DISTRICT PRICE INDEX
City average price: €2732/m² (Index = 100)

Price index by district and year:
+--------------+----+----------------+-----------+-----------------+
| district_name|year|avg_price_per_m2|price_index|price_vs_city_avg|
+--------------+----+----------------+-----------+-----------------+
|  Ciutat Vella|2013|          2624.0|       96.0|           -108.0|
|  Ciutat Vella|2014|          2821.0|      103.3|             89.0|
|  Ciutat Vella|2015|          3137.0|      114.8|            405.0|
|  Ciutat Vella|2016|          3688.0|      135.0|            956.0|
|  Ciutat Vella|2017|          4283.0|      156.8|           1551.0|
|      Eixample|2013|          2836.0|      103.8|            104.0|
|      Eixample|2014|          3025.0|      110.7|            293.0|
|      Eixample|2015|          3296.0|      120.6|            564.0|
|      Eixample|2016|          3382.0|      123.8|            650.0|
|      Eixample|2017|          4080.0|      149.3|           1348.

In [30]:
kpi_5.show()

+--------------+----+----------------+-----------+-----------------+
| district_name|year|avg_price_per_m2|price_index|price_vs_city_avg|
+--------------+----+----------------+-----------+-----------------+
|  Ciutat Vella|2013|          2624.0|       96.0|           -108.0|
|  Ciutat Vella|2014|          2821.0|      103.3|             89.0|
|  Ciutat Vella|2015|          3137.0|      114.8|            405.0|
|  Ciutat Vella|2016|          3688.0|      135.0|            956.0|
|  Ciutat Vella|2017|          4283.0|      156.8|           1551.0|
|      Eixample|2013|          2836.0|      103.8|            104.0|
|      Eixample|2014|          3025.0|      110.7|            293.0|
|      Eixample|2015|          3296.0|      120.6|            564.0|
|      Eixample|2016|          3382.0|      123.8|            650.0|
|      Eixample|2017|          4080.0|      149.3|           1348.0|
|        Gràcia|2013|          2647.0|       96.9|            -85.0|
|        Gràcia|2014|          261

In [31]:
income_formatted.show()

+-------------+--------------+-----------------+--------------------+----------+---------+----+
|district_code| district_name|neighborhood_code|   neighborhood_name|population|rfd_index|year|
+-------------+--------------+-----------------+--------------------+----------+---------+----+
|            1|  Ciutat Vella|                1|            el Raval|     47617|     75.8|2015|
|            1|  Ciutat Vella|                2|      el Barri Gòtic|     15269|    108.5|2015|
|            1|  Ciutat Vella|                3|      la Barceloneta|     15036|     76.6|2015|
|            1|  Ciutat Vella|                4|Sant Pere, Santa ...|     22305|     96.4|2015|
|            2|      Eixample|                5|       el Fort Pienc|     31645|    104.8|2015|
|            2|      Eixample|                6|  la Sagrada Família|     51347|     95.8|2015|
|            2|      Eixample|                7|la Dreta de l'Eix...|     43344|    165.8|2015|
|            2|      Eixample|          

In [32]:
# KPI 6: District Population Growth Rate
# Aggregate neighborhoods to district level first, then calculate growth

from pyspark.sql.functions import *
from pyspark.sql.window import Window

def district_population_growth():
    """
    Calculate population growth rate by district (aggregate neighborhoods first)
    """

    print("KPI 6: DISTRICT POPULATION GROWTH RATE")
    print("=" * 45)

    # Step 1: Aggregate neighborhoods to district level by year
    district_population = income_formatted.groupBy("district_name", "year").agg(
        sum("population").alias("total_district_population")
    ).orderBy("district_name", "year")

    print("District population by year:")
    district_population.show()

    # Step 2: Calculate growth rate by district
    window = Window.partitionBy("district_name").orderBy("year")

    district_growth = district_population.withColumn(
        "prev_population",
        lag("total_district_population").over(window)
    ).withColumn(
        "population_change",
        col("total_district_population") - col("prev_population")
    ).withColumn(
        "growth_rate_percent",
        round(((col("total_district_population") - col("prev_population")) / col("prev_population")) * 100, 1)
    ).filter(col("growth_rate_percent").isNotNull())

    print("\nDistrict population growth rates:")
    district_growth.select(
        "district_name", "year", "total_district_population",
        "prev_population", "population_change", "growth_rate_percent"
    ).show()

    # City-wide average growth by year
    print("\nCity-wide average population growth by year:")
    city_growth = district_growth.groupBy("year").agg(
        round(avg("growth_rate_percent"), 1).alias("avg_growth_rate"),
        sum("population_change").alias("total_city_population_change")
    ).orderBy("year")

    city_growth.show()

    # Average growth rate by district (across all years)
    print("\nAverage growth rate by district:")
    avg_district_growth = district_growth.groupBy("district_name").agg(
        round(avg("growth_rate_percent"), 1).alias("avg_annual_growth_rate")
    ).orderBy(col("avg_annual_growth_rate").desc())

    avg_district_growth.show()

    return district_growth

# Run KPI 6 - District level
kpi_6 = district_population_growth()

KPI 6: DISTRICT POPULATION GROWTH RATE
District population by year:
+--------------+----+-------------------------+
| district_name|year|total_district_population|
+--------------+----+-------------------------+
|  Ciutat Vella|2013|                   103944|
|  Ciutat Vella|2014|                   102237|
|  Ciutat Vella|2015|                   100227|
|  Ciutat Vella|2016|                   100451|
|  Ciutat Vella|2017|                   102250|
|      Eixample|2013|                   264888|
|      Eixample|2014|                   265303|
|      Eixample|2015|                   263991|
|      Eixample|2016|                   264487|
|      Eixample|2017|                   267184|
|        Gràcia|2013|                   121031|
|        Gràcia|2014|                   120843|
|        Gràcia|2015|                   120676|
|        Gràcia|2016|                   120907|
|        Gràcia|2017|                   121566|
|Horta-Guinardó|2013|                   167944|
|Horta-Guinardó|2014

#### Combining all KPIs and saving in the exploitation zone

Year-District KPIs
- KPI1, KPI2, KPI3, KPI5, KPI6

Year-City KPIS
- KPI4


variable names are
- kpi_1, kpi_2, etc..

In [33]:
kpi_1.show(2)

+-------------+------------------------+----+------------------+
|district_name|cultural_sites_available|year|sites_per_district|
+-------------+------------------------+----+------------------+
|       Gràcia|                      64|2015|               6.4|
|   Sant Martí|                      53|2015|               5.3|
+-------------+------------------------+----+------------------+
only showing top 2 rows



In [34]:
kpi_2.show(2)

+-------------+----+----------------+-------------------+-------------------+--------------------+------------------+
|district_name|year|avg_price_per_m2|neighborhoods_count|previous_year_price|price_change_percent|price_change_euros|
+-------------+----+----------------+-------------------+-------------------+--------------------+------------------+
| Ciutat Vella|2013|          2624.0|                  4|               NULL|                NULL|              NULL|
| Ciutat Vella|2014|          2821.0|                  4|             2624.0|                 7.5|             197.0|
+-------------+----+----------------+-------------------+-------------------+--------------------+------------------+
only showing top 2 rows



In [35]:
kpi_3.show(2)

+-------------+----+-------------+-----------------+------------------+
|district_name|year|avg_rfd_index|previous_year_rfd|rfd_growth_percent|
+-------------+----+-------------+-----------------+------------------+
| Ciutat Vella|2013|         84.3|             NULL|              NULL|
| Ciutat Vella|2014|         85.4|             84.3|               1.3|
+-------------+----+-------------+-----------------+------------------+
only showing top 2 rows



In [36]:
# only by city and year
kpi_4.show(2)

+----+-------------+-------------+-------------+-------------------+--------------+-----------------+----------+------------------+
|year|max_rfd_index|min_rfd_index|avg_rfd_index|neighborhoods_count|inequality_gap|previous_year_gap|gap_change|gap_change_percent|
+----+-------------+-------------+-------------+-------------------+--------------+-----------------+----------+------------------+
|2013|        243.9|         38.5|        91.55|                 73|         205.4|             NULL|      NULL|              NULL|
|2014|        251.7|         34.7|        91.33|                 73|         217.0|            205.4|      11.6|              5.65|
+----+-------------+-------------+-------------+-------------------+--------------+-----------------+----------+------------------+
only showing top 2 rows



In [37]:
kpi_5.show(2)

+-------------+----+----------------+-----------+-----------------+
|district_name|year|avg_price_per_m2|price_index|price_vs_city_avg|
+-------------+----+----------------+-----------+-----------------+
| Ciutat Vella|2013|          2624.0|       96.0|           -108.0|
| Ciutat Vella|2014|          2821.0|      103.3|             89.0|
+-------------+----+----------------+-----------+-----------------+
only showing top 2 rows



In [38]:
kpi_6.show(2)

+-------------+----+-------------------------+---------------+-----------------+-------------------+
|district_name|year|total_district_population|prev_population|population_change|growth_rate_percent|
+-------------+----+-------------------------+---------------+-----------------+-------------------+
| Ciutat Vella|2014|                   102237|         103944|            -1707|               -1.6|
| Ciutat Vella|2015|                   100227|         102237|            -2010|               -2.0|
+-------------+----+-------------------------+---------------+-----------------+-------------------+
only showing top 2 rows



In [39]:
## Step 3: Save the data into the exploitation zone

In [40]:
from pyspark.sql.functions import *

def create_final_kpi_tables():
    """
    Create the final two KPI summary tables for the exploitation zone
    """

    print("📊 CREATING FINAL KPI SUMMARY TABLES")
    print("=" * 50)

    # ========================================
    # TABLE 1: DISTRICT-YEAR KPIs
    # ========================================

    print("Creating District-Year KPI table...")

    # Get all district-year combinations from income data as base
    district_year_base = income_formatted.select("district_name", "year").distinct()

    # KPI 1: Cultural sites by district (no year dimension, so we'll use latest year)
    kpi_1_district = cultural_formatted.groupBy("district_name").agg(
        count("cultural_site_name").alias("kpi_1_cultural_sites_count")
    )

    # KPI 2: Price trend by district-year
    kpi_2_price_trend = prices_formatted.groupBy("district_name", "year").agg(
        round(avg("per_meter"), 0).alias("kpi_2_avg_price_per_m2")
    )

    # Add price growth rate
    window_price = Window.partitionBy("district_name").orderBy("year")
    kpi_2_with_growth = kpi_2_price_trend.withColumn(
        "prev_price",
        lag("kpi_2_avg_price_per_m2").over(window_price)
    ).withColumn(
        "kpi_2_price_growth_percent",
        round(
            when(col("prev_price").isNotNull(),
                ((col("kpi_2_avg_price_per_m2") - col("prev_price")) / col("prev_price")) * 100
            ).otherwise(None), 1
        )
    ).select("district_name", "year", "kpi_2_avg_price_per_m2", "kpi_2_price_growth_percent")

    # KPI 3: Income evolution by district-year
    kpi_3_income = income_formatted.groupBy("district_name", "year").agg(
        round(avg("rfd_index"), 1).alias("kpi_3_avg_rfd_index")
    )

    # Add income growth rate
    window_income = Window.partitionBy("district_name").orderBy("year")
    kpi_3_with_growth = kpi_3_income.withColumn(
        "prev_rfd",
        lag("kpi_3_avg_rfd_index").over(window_income)
    ).withColumn(
        "kpi_3_income_growth_percent",
        round(
            when(col("prev_rfd").isNotNull(),
                ((col("kpi_3_avg_rfd_index") - col("prev_rfd")) / col("prev_rfd")) * 100
            ).otherwise(None), 1
        )
    ).select("district_name", "year", "kpi_3_avg_rfd_index", "kpi_3_income_growth_percent")

    # KPI 5: District price index
    city_avg_price = prices_formatted.agg(avg("per_meter")).collect()[0][0]
    kpi_5_price_index = prices_formatted.groupBy("district_name", "year").agg(
        round(avg("per_meter"), 0).alias("avg_price_per_m2")
    ).withColumn(
        "kpi_5_price_index",
        round((col("avg_price_per_m2") / city_avg_price) * 100, 1)
    ).select("district_name", "year", "kpi_5_price_index")

    # KPI 6: District population growth
    district_population = income_formatted.groupBy("district_name", "year").agg(
        sum("population").alias("total_population")
    )

    window_pop = Window.partitionBy("district_name").orderBy("year")
    kpi_6_population = district_population.withColumn(
        "prev_population",
        lag("total_population").over(window_pop)
    ).withColumn(
        "kpi_6_population_growth_percent",
        round(
            when(col("prev_population").isNotNull(),
                ((col("total_population") - col("prev_population")) / col("prev_population")) * 100
            ).otherwise(None), 1
        )
    ).select("district_name", "year", "total_population", "kpi_6_population_growth_percent")

    # Join all district-year KPIs
    district_year_kpis = district_year_base \
        .join(kpi_1_district, "district_name", "left") \
        .join(kpi_2_with_growth, ["district_name", "year"], "left") \
        .join(kpi_3_with_growth, ["district_name", "year"], "left") \
        .join(kpi_5_price_index, ["district_name", "year"], "left") \
        .join(kpi_6_population, ["district_name", "year"], "left") \
        .orderBy("district_name", "year")

    print("District-Year KPI table preview:")
    district_year_kpis.show()

    # ========================================
    # TABLE 2: CITY-YEAR KPIs
    # ========================================

    print("\nCreating City-Year KPI table...")

    # KPI 4: Income inequality by year (city-wide)
    kpi_4_inequality = income_formatted.groupBy("year").agg(
        max("rfd_index").alias("max_rfd"),
        min("rfd_index").alias("min_rfd")
    ).withColumn(
        "kpi_4_inequality_gap",
        round(col("max_rfd") - col("min_rfd"), 1)
    )

    # Add inequality growth rate
    window_ineq = Window.orderBy("year")
    kpi_4_with_growth = kpi_4_inequality.withColumn(
        "prev_gap",
        lag("kpi_4_inequality_gap").over(window_ineq)
    ).withColumn(
        "kpi_4_inequality_change",
        round(col("kpi_4_inequality_gap") - col("prev_gap"), 1)
    ).select("year", "kpi_4_inequality_gap", "kpi_4_inequality_change")

    # Add city-wide summary metrics for context
    city_year_context = income_formatted.groupBy("year").agg(
        round(avg("rfd_index"), 1).alias("city_avg_rfd"),
        sum("population").alias("city_total_population")
    ).join(
        prices_formatted.groupBy("year").agg(
            round(avg("per_meter"), 0).alias("city_avg_price")
        ), "year"
    )

    # Final city-year table
    city_year_kpis = kpi_4_with_growth \
        .join(city_year_context, "year") \
        .orderBy("year")

    print("City-Year KPI table preview:")
    city_year_kpis.show()

    return district_year_kpis, city_year_kpis

def save_final_kpi_tables(district_kpis, city_kpis, exploitation_zone_path):
    """
    Save both KPI tables to exploitation zone
    """

    print(f"💾 SAVING FINAL KPI TABLES TO: {exploitation_zone_path}")
    print("=" * 60)

    # Save district-year KPIs
    district_path = f"{exploitation_zone_path}/barcelona_district_year_kpis"
    district_kpis.write.mode("overwrite").parquet(district_path)
    print(f"✅ District-Year KPIs saved: {district_path}")

    # Save city-year KPIs
    city_path = f"{exploitation_zone_path}/barcelona_city_year_kpis"
    city_kpis.write.mode("overwrite").parquet(city_path)
    print(f"✅ City-Year KPIs saved: {city_path}")

In [41]:
district_kpis, city_kpis = create_final_kpi_tables()

📊 CREATING FINAL KPI SUMMARY TABLES
Creating District-Year KPI table...
District-Year KPI table preview:
+--------------+----+--------------------------+----------------------+--------------------------+-------------------+---------------------------+-----------------+----------------+-------------------------------+
| district_name|year|kpi_1_cultural_sites_count|kpi_2_avg_price_per_m2|kpi_2_price_growth_percent|kpi_3_avg_rfd_index|kpi_3_income_growth_percent|kpi_5_price_index|total_population|kpi_6_population_growth_percent|
+--------------+----+--------------------------+----------------------+--------------------------+-------------------+---------------------------+-----------------+----------------+-------------------------------+
|  Ciutat Vella|2013|                       124|                2624.0|                      NULL|               84.3|                       NULL|             96.0|          103944|                           NULL|
|  Ciutat Vella|2014|                  

In [42]:
save_final_kpi_tables(district_kpis, city_kpis, f'{EXPLOITATION_ZONE}')

💾 SAVING FINAL KPI TABLES TO: /content/drive/MyDrive/dbm-lab3/exploitation_zone
✅ District-Year KPIs saved: /content/drive/MyDrive/dbm-lab3/exploitation_zone/barcelona_district_year_kpis
✅ City-Year KPIs saved: /content/drive/MyDrive/dbm-lab3/exploitation_zone/barcelona_city_year_kpis


In [43]:
spark.stop()

## END OF DOCUMENT