# Lab 3 - Data Exploitation Pipeline

**Notebook:** `02_exploitation_pipeline.ipynb`  
**Objective:** We implement the **second pipeline**: transform cleaned data in the Formatted Zone into aggregated and enriched outputs in the Exploitation Zone, and validate the results. This should enable subsequent analysis of KPIs across space and time.

### Group: *L3-T04*

#### Group Members: **Marvin Ernst, Oriol Gelabert, Alex Malo**  
Class: **23D020 - Big Data Management for Data Science**  
Date: *June 23, 2025*


In this notebook we include the following two parts of the project (from section A):
- A.3: Move Data to the Exploitation Zone
- A.4: Validate the Data

---
### Steps in this pipeline:
1. **Create Exploitation Zone**  
   Define the directory and prepare structures for output.

2. **Data Aggregation and Transformation**  
   - Aggregate household size, commercial premises, and tourist housing datasets  
   - Calculate shares and total counts per spatial unit and year

3. **Data Validation**  
   Check schema, nulls, uniqueness, and summary statistics of outputs to ensure readiness for analysis.
---

First, we load the necessary libraries and set up the folder structure. All file paths are relative, ensuring portable execution of the pipeline.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, countDistinct, isnan, when, sum as _sum, round
from pathlib import Path
import os
import re

#### Initialize Spark Session

In [2]:
spark = SparkSession.builder.appName("Exploitation Pipeline").getOrCreate()

25/06/24 02:51:30 WARN Utils: Your hostname, MacBook-Pro-40.local resolves to a loopback address: 127.0.0.1; using 192.168.1.23 instead (on interface en0)
25/06/24 02:51:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/24 02:51:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/06/24 02:51:31 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


#### Define Folder Paths

In [3]:
project_root = Path().resolve().parent
formatted = project_root / "formatted_zone"
exploitation = project_root / "exploitation_zone"
exploitation.mkdir(parents=True, exist_ok=True)

## **A.3 - Move Data to the Exploitation Zone**

This section generates pre-aggregated, spatially-aware indicators from each dataset:

- **Tourist Housing**: counts by district and neighborhood per quarter  
- **Household Size** : shares of households by size and level (AEB, neighborhood, district)  
- **Commercial Premises**: percentages of selected economic indicators per zone
 
These tables will be saved as `.parquet` files in the exploitation zone.

#### **Household Size**

In [4]:
# First, we define a pattern to identify household parquet files by year:
pattern = re.compile(r"household_(20\d{2})_\.parquet", re.IGNORECASE)

# We initialize dictionaries to store results by aggregation level:
df_households = {}
df_households_aeb = {}
df_households_barri = {}
df_households_districte = {}
df_totals_dict = {}

# We loop through each file in the formatted zone:
for file in os.listdir(formatted):
    match = pattern.match(file)
    if match:
        year = match.group(1)
        filepath = os.path.join(formatted, file)

        try:
            # Load each household file from the formatted zone:
            df = spark.read.parquet(filepath)

            # We compute a new column multiplying persons per household size:
            df = df.withColumn("TOTAL_PERSONES_AGG", col("NUM_PERSONES_AGG") * col("NUM_VALOR"))

            # We define grouping levels of interest:
            group_cols = ["COD_BARRI", "COD_AEB", "COD_DISTRICTE", "COD_SECCIO_CENSAL"]

            # For each spatial level, compute total persons and dwellings:
            for col_name in group_cols:
                sum_per_col = f"SUM_TOTAL_PERSONES_{'_'.join(col_name.split('_')[1:])}"
                pct_per_col = f"PCT_PERSONES_{'_'.join(col_name.split('_')[1:])}"
                sum_viv_col = f"SUM_VIVENDES_{'_'.join(col_name.split('_')[1:])}"
                pct_viv_col = f"PCT_VIVENDES_{'_'.join(col_name.split('_')[1:])}"

                # Aggregate total persons and dwellings per group:
                df_totals = df.groupBy(col_name).agg(
                    _sum("TOTAL_PERSONES_AGG").alias(sum_per_col),
                    _sum("NUM_VALOR").alias(sum_viv_col)
                )

                # Save aggregated totals in case we want to reuse them:
                df_totals_dict[f"{'_'.join(col_name.split('_')[1:])}_{year}"] = df_totals

                # Join back with original dataframe to calculate percentages:
                df = df.join(df_totals, on=col_name, how="left")
                df = df.withColumn(pct_per_col, round(100 * col("TOTAL_PERSONES_AGG") / col(sum_per_col), 3))
                df = df.withColumn(pct_viv_col, round(100 * col("NUM_VALOR") / col(sum_viv_col), 3))

            # Aggregate household data by AEB level:
            df_households_aeb[year] = df.groupBy("COD_AEB", "NUM_PERSONES_AGG").agg(
                _sum("TOTAL_PERSONES_AGG").alias("TOTAL_PERSONES_AGG"),
                _sum("NUM_VALOR").alias("NUM_VALOR"),
                round(_sum("PCT_PERSONES_AEB"), 3).alias("PCT_PERSONES_AEB"),
                round(_sum("PCT_VIVENDES_AEB"), 3).alias("PCT_VIVENDES_AEB")
            )

            # Aggregate household data by neighborhood level:
            df_households_barri[year] = df.groupBy("COD_BARRI", "DES_BARRI", "NUM_PERSONES_AGG").agg(
                _sum("TOTAL_PERSONES_AGG").alias("TOTAL_PERSONES_AGG"),
                _sum("NUM_VALOR").alias("NUM_VALOR"),
                round(_sum("PCT_PERSONES_BARRI"), 3).alias("PCT_PERSONES_BARRI"),
                round(_sum("PCT_VIVENDES_BARRI"), 3).alias("PCT_VIVENDES_BARRI")
            )

            # Aggregate household data by district level:
            df_households_districte[year] = df.groupBy("COD_DISTRICTE", "DES_DISTRICTE", "NUM_PERSONES_AGG").agg(
                _sum("TOTAL_PERSONES_AGG").alias("TOTAL_PERSONES_AGG"),
                _sum("NUM_VALOR").alias("NUM_VALOR"),
                round(_sum("PCT_PERSONES_DISTRICTE"), 3).alias("PCT_PERSONES_DISTRICTE"),
                round(_sum("PCT_VIVENDES_DISTRICTE"), 3).alias("PCT_VIVENDES_DISTRICTE")
            )

            # Drop intermediate columns used for joins to clean up the final DataFrame:
            df_households[year] = df.drop(
                'SUM_TOTAL_PERSONES_BARRI', 'SUM_VIVENDES_BARRI', 'PCT_PERSONES_BARRI', 'PCT_VIVENDES_BARRI',
                'SUM_TOTAL_PERSONES_AEB', 'SUM_VIVENDES_AEB', 'PCT_PERSONES_AEB', 'PCT_VIVENDES_AEB',
                'SUM_TOTAL_PERSONES_DISTRICTE', 'SUM_VIVENDES_DISTRICTE', 'PCT_PERSONES_DISTRICTE',
                'PCT_VIVENDES_DISTRICTE', 'SUM_TOTAL_PERSONES_SECCIO_CENSAL', 'SUM_VIVENDES_SECCIO_CENSAL'
            )

            # Finally, we save all three aggregation levels into the exploitation zone:
            df_households_aeb[year].write.parquet(f"{exploitation}/households_aeb_{year}.parquet", mode='overwrite')
            df_households_barri[year].write.parquet(f"{exploitation}/households_barri_{year}.parquet", mode='overwrite')
            df_households_districte[year].write.parquet(f"{exploitation}/households_districte_{year}.parquet", mode='overwrite')

        except Exception as e:
            print(f"Error processing household file {file}: {e}")

                                                                                

#### **Commercial Premises**

In [5]:
# Again, we define a pattern to match formatted commercial files by year:
pattern = re.compile(r"comercial_(20\d{2})\.parquet", re.IGNORECASE)

# We initialize two dictionaries: one for raw commercial data and one for aggregated indicators:
df_comercial = {}
df_indicators = {}

# We loop through each commercial parquet file found in the formatted zone:
for file in os.listdir(formatted):
    match = pattern.match(file)
    if match:
        year = match.group(1)
        filepath = os.path.join(formatted, file)

        try:
            # Load the cleaned commercial data from the formatted zone:
            df = spark.read.parquet(filepath)

            # We define the list of indicator columns to summarize:
            indicator_cols = [
                "IND_OCI_NOCTURN", "IND_COWORKING", "IND_SERVEI_DEGUSTACIO", "IND_OBERT24H",
                "IND_MIXT", "IND_PEU_CARRER", "IND_MERCAT", "IND_GALERIA",
                "IND_CENTRE_COMERCIAL", "IND_EIX_COMERCIAL"
            ]

            # First, we aggregate the indicators at the district level:
            agg_df_dis = df.groupBy("DES_ACTIVITAT_PRINCIPAL", "DES_GRUP", "COD_DISTRICTE").agg(
                count("*").alias("TOTAL"),
                *[_sum(c).alias(f"TOTAL_{c}") for c in indicator_cols]
            )

            # Next, we do a similar aggregation at the neighborhood level:
            agg_df_barri = df.groupBy("DES_ACTIVITAT_PRINCIPAL", "DES_GRUP", "COD_DISTRICTE", "COD_BARRI").agg(
                count("*").alias("TOTAL"),
                *[_sum(c).alias(f"TOTAL_{c}") for c in indicator_cols]
            )

            # Finally, we repeat the aggregation at the census section level:
            agg_df_sec = df.groupBy("DES_ACTIVITAT_PRINCIPAL", "DES_GRUP", "COD_DISTRICTE", "COD_BARRI", "COD_SECCIO_CENSAL").agg(
                count("*").alias("TOTAL"),
                *[_sum(c).alias(f"TOTAL_{c}") for c in indicator_cols]
            )

            # For each aggregation level, we compute the percentage of each indicator relative to the total:
            for df_agg in [agg_df_dis, agg_df_barri, agg_df_sec]:
                for c in indicator_cols:
                    df_agg = df_agg.withColumn(f"PCT_{c}", round(100 * col(f"TOTAL_{c}") / col("TOTAL"), 3))

            # Here, unlike previous datasets, we write **three levels of aggregation** directly to the exploitation zone:
            agg_df_dis.write.parquet(f"{exploitation}/comercial_indicators_district_{year}.parquet", mode="overwrite")
            agg_df_barri.write.parquet(f"{exploitation}/comercial_indicators_barri_{year}.parquet", mode="overwrite")
            agg_df_sec.write.parquet(f"{exploitation}/comercial_indicators_section_{year}.parquet", mode="overwrite")

            # We also store the processed DataFrames in dictionaries for potential reuse:
            df_indicators[year] = agg_df_dis
            df_comercial[year] = df

            print(f"Saved commercial indicators for {year}")

        except Exception as e:
            print(f"Error processing commercial file {file}: {e}")

                                                                                

Saved commercial indicators for 2022


                                                                                

Saved commercial indicators for 2024




Saved commercial indicators for 2019


                                                                                

### **Tourist Housing**

In [6]:
# We define a pattern to match tourist housing files in the formatted zone, e.g., "hut_2022_1T.parquet":
pattern = re.compile(r"hut_(20\d{2})_(\dT)\.parquet", re.IGNORECASE)

# We initialize dictionaries to store the full, district-level, and neighborhood-level data:
df_hut = {}
df_hut_dis = {}
df_hut_barri = {}

# We iterate through each file in the formatted directory that matches our naming pattern:
for file in os.listdir(formatted):
    match = pattern.match(file)
    if match:
        year = match.group(1)
        term = match.group(2)
        filepath = os.path.join(formatted, file)

        try:
            # We load the cleaned tourist housing file:
            df = spark.read.parquet(filepath)

            # We compute the number of licenses per district:
            agg_df_dis = df.groupBy('COD_DISTRICTE').agg(count("*").alias("TOTAL"))

            # We also compute the number of licenses per district and neighborhood:
            agg_df_barri = df.groupBy('COD_DISTRICTE', 'DES_BARRI').agg(count("*").alias("TOTAL"))

            # We store the full and aggregated DataFrames in our dictionaries for further use:
            df_hut[f"{year}_{term}"] = df
            df_hut_dis[f"{year}_{term}"] = agg_df_dis
            df_hut_barri[f"{year}_{term}"] = agg_df_barri

            # We save the aggregated outputs into the exploitation zone for later analysis:
            agg_df_dis.write.parquet(f"{exploitation}/hut_district_{year}_{term}.parquet", mode='overwrite')
            agg_df_barri.write.parquet(f"{exploitation}/hut_barri_{year}_{term}.parquet", mode='overwrite')

        except Exception as e:
            print(f"Error processing tourist housing file {file}: {e}")

## **A.4 - Validate the Data**

In this section, we **validate the integrity and usability** of the datasets in the **Exploitation Zone**:

- Inspect schemas and row counts
- Check for nulls and missing values
- Validate key columns (e.g. IDs, zones, aggregates)
- Ensure data is ready for KPI analysis

In [7]:
exploitation = project_root / "exploitation_zone"
print(f"Exploitation path: {exploitation}")

Exploitation path: /Users/Admin/Documents/Git/BSE/BigData/Big-Data-Lab-3/exploitation_zone


#### **Household Size** (Barri Level)

In [8]:
df_households_barri = spark.read.parquet(f"{exploitation}/households_barri_2022.parquet")

# Print schema
df_households_barri.printSchema()

# Show sample rows
df_households_barri.show(5)

# Count total records
print("Total rows:", df_households_barri.count())

# Check for null values
df_households_barri.select([count(when(col(c).isNull(), c)).alias(c) for c in df_households_barri.columns]).show()

# Check uniqueness of (COD_BARRI, NUM_PERSONES_AGG)
print("Unique group combinations (COD_BARRI, household size):",
      df_households_barri.select("COD_BARRI", "NUM_PERSONES_AGG").distinct().count())

root
 |-- COD_BARRI: integer (nullable = true)
 |-- DES_BARRI: string (nullable = true)
 |-- NUM_PERSONES_AGG: integer (nullable = true)
 |-- TOTAL_PERSONES_AGG: long (nullable = true)
 |-- NUM_VALOR: long (nullable = true)
 |-- PCT_PERSONES_BARRI: double (nullable = true)
 |-- PCT_VIVENDES_BARRI: double (nullable = true)

+---------+--------------------+----------------+------------------+---------+------------------+------------------+
|COD_BARRI|           DES_BARRI|NUM_PERSONES_AGG|TOTAL_PERSONES_AGG|NUM_VALOR|PCT_PERSONES_BARRI|PCT_VIVENDES_BARRI|
+---------+--------------------+----------------+------------------+---------+------------------+------------------+
|        4|Sant Pere, Santa ...|               6|               906|      151|             4.101|             1.473|
|       33|    el Baix Guinardó|               4|              5716|     1429|            22.574|            13.295|
|       13|   la Marina de Port|               6|              1458|      243|            

#### **Commercial Premises Indicators** (District Level)

In [9]:
df_comercial_dis = spark.read.parquet(f"{exploitation}/comercial_indicators_district_2022.parquet")

# Print schema
df_comercial_dis.printSchema()

# Show sample rows
df_comercial_dis.show(5)

# Count total records
print("Total rows:", df_comercial_dis.count())

# Check null values
df_comercial_dis.select([count(when(col(c).isNull(), c)).alias(c) for c in df_comercial_dis.columns]).show()

# Check number of unique activities
print("Unique activities:", df_comercial_dis.select("DES_ACTIVITAT_PRINCIPAL").distinct().count())

root
 |-- DES_ACTIVITAT_PRINCIPAL: string (nullable = true)
 |-- DES_GRUP: string (nullable = true)
 |-- COD_DISTRICTE: integer (nullable = true)
 |-- TOTAL: long (nullable = true)
 |-- TOTAL_IND_OCI_NOCTURN: long (nullable = true)
 |-- TOTAL_IND_COWORKING: long (nullable = true)
 |-- TOTAL_IND_SERVEI_DEGUSTACIO: long (nullable = true)
 |-- TOTAL_IND_OBERT24H: long (nullable = true)
 |-- TOTAL_IND_MIXT: long (nullable = true)
 |-- TOTAL_IND_PEU_CARRER: long (nullable = true)
 |-- TOTAL_IND_MERCAT: long (nullable = true)
 |-- TOTAL_IND_GALERIA: long (nullable = true)
 |-- TOTAL_IND_CENTRE_COMERCIAL: long (nullable = true)
 |-- TOTAL_IND_EIX_COMERCIAL: long (nullable = true)

+-----------------------+--------------------+-------------+-----+---------------------+-------------------+---------------------------+------------------+--------------+--------------------+----------------+-----------------+--------------------------+-----------------------+
|DES_ACTIVITAT_PRINCIPAL|            DE

### **Tourist Housing Licenses** (District Level)

In [None]:
df_hut_dis = spark.read.parquet(f"{exploitation}/hut_district_2022_1T.parquet")

# Print schema
df_hut_dis.printSchema()

# Show sample rows
df_hut_dis.show(5)

# Count total rows
print("Total rows:", df_hut_dis.count())

# Check nulls
df_hut_dis.select([count(when(col(c).isNull(), c)).alias(c) for c in df_hut_dis.columns]).show()

# Check unique districts
print("Unique districts:", df_hut_dis.select("COD_DISTRICTE").distinct().count())

root
 |-- COD_DISTRICTE: string (nullable = true)
 |-- TOTAL: long (nullable = true)

+-------------+-----+
|COD_DISTRICTE|TOTAL|
+-------------+-----+
|            7|  218|
|            3| 1134|
|            8|   29|
|            5|  491|
|            6| 1055|
+-------------+-----+
only showing top 5 rows

Total rows: 10
+-------------+-----+
|COD_DISTRICTE|TOTAL|
+-------------+-----+
|            0|    0|
+-------------+-----+

Unique districts: 10


25/06/24 08:53:41 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 2156750 ms exceeds timeout 120000 ms
25/06/24 08:53:42 WARN SparkContext: Killing executors is not supported by current scheduler.
25/06/24 08:53:46 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

Bassed on those SPARK query outputs we can see that the data has been transformed correctly and is ready for consumption.