We will work with the following 3 datasets:

- Unemployment (JSON);
- Income (CSV);
- Incidences (CSV);

## Libraries

In [15]:
import pandas as pd 
import numpy as np
import os
from pathlib import Path
import json

## **A.1  Explore the data and choose the KPI’s**

Study and explore the provided datasets. Choose three datasets to be uses as data for the Landing Zone. Decide which analysis you will perform (i.e.,
fix the KPIs).

In [16]:
# Path to 'datasets' folder
datasets_path = Path.cwd() / "datasets"

# Get all subfolders inside 'datasets'
folders = sorted([f for f in datasets_path.iterdir() if f.is_dir()])

# Initialize a list to store the final dataframes
dfs = []

# Loop through the first 3 folders (or all if you want)
for folder in folders[:3]:
    print(f"\n Processing folder: {folder.name}")
    dataframes = []

    for file in folder.iterdir():
        if not file.is_file():
            continue

        try:
            if file.suffix == ".csv":
                df = pd.read_csv(file)
                print(f"  ✔ Read CSV: {file.name} | Shape: {df.shape}")
                dataframes.append(df)

            elif file.suffix == ".json":
                with open(file, 'r', encoding='utf-8') as f:
                    raw_json = json.load(f)

                if raw_json.get("success") is False:
                    print(f"  ⚠ Skipped invalid JSON (API error): {file.name}")
                    continue

                # Try to extract records from structured JSON (if available)
                if "result" in raw_json and "records" in raw_json["result"]:
                    df = pd.json_normalize(raw_json["result"]["records"])
                else:
                    # Assume it’s flat or list of dicts
                    df = pd.json_normalize(raw_json)

                print(f"  ✔ Read JSON: {file.name} | Shape: {df.shape}")
                dataframes.append(df)

        except Exception as e:
            print(f" Failed to load {file.name}: {e}")

    # Concatenate all files in this folder
    if dataframes:
        combined_df = pd.concat(dataframes, ignore_index=True)
        dfs.append(combined_df)
        print(f" Combined DataFrame for {folder.name}: {combined_df.shape}")
    else:
        dfs.append(None)
        print(f" No valid data files found in {folder.name}")

# Unpack into df1, df2, df3
df1, df2, df3 = dfs[:3]

# Show sample outputs
if df1 is not None:
    print("\n DF1 Sample:\n", df1.head())
if df2 is not None:
    print("\n DF2 Sample:\n", df2.head())
if df3 is not None:
    print("\n DF3 Sample:\n", df3.head())



 Processing folder: incidences
  ✔ Read CSV: 2016_Peticions_ciutadanes.csv | Shape: (100, 26)
  ✔ Read CSV: 2022_IRIS_Peticions_Ciutadanes_OpenData.csv | Shape: (100, 26)
  ✔ Read CSV: 2021_IRIS_Peticions_Ciutadanes_OpenData.csv | Shape: (100, 26)
  ✔ Read CSV: 2017_Peticions_ciutadanes.csv | Shape: (100, 26)
  ✔ Read CSV: 2019_Peticions_ciutadanes.csv | Shape: (100, 26)
  ✔ Read CSV: 2015_Peticions_ciutadanes.csv | Shape: (100, 26)
  ✔ Read CSV: 2020_IRIS_Peticions_Ciutadanes_OpenData.csv | Shape: (100, 26)
  ✔ Read CSV: 2018_Peticions_ciutadanes.csv | Shape: (100, 26)
  ✔ Read CSV: 2014_Peticions_ciutadanes.csv | Shape: (100, 26)
 Combined DataFrame for incidences: (900, 26)

 Processing folder: income
  ✔ Read CSV: 2011_Distribucio_territorial_renda_familiar.csv | Shape: (74, 7)
  ✔ Read CSV: 2013_Distribucio_territorial_renda_familiar.csv | Shape: (74, 7)
  ✔ Read CSV: 2010_Distribucio_territorial_renda_familiar.csv | Shape: (74, 7)
  ✔ Read CSV: 2007_Distribucio_territorial_renda

In [17]:
incident_df = df1.copy()
incident_df.head()

Unnamed: 0,CODI_DISTRICTE,DISTRICTE,ANY_DATA_TANCAMENT,DETALL,BARRI,FITXA_ID,LATITUD,AREA,COORDENADA_Y,COORDENADA_X,...,MES_DATA_TANCAMENT,TIPUS_VIA,ANY_DATA_ALTA,MES_DATA_ALTA,SUPORT,CARRER,CODI_BARRI,_id,TIPUS,DIA_DATA_TANCAMENT
0,3.0,Sants-Montjuïc,2016,Disconforme servei de neteja carrers / places,la Marina de Port,249606,41.362124,Recollida i neteja de l'espai urbà,4579312.661,428231.1953,...,1,Carrer,2015,12,TELÈFON,Foneria,13.0,1,QUEIXA,1
1,5.0,Sarrià-Sant Gervasi,2016,Disconforme servei de neteja carrers / places,les Tres Torres,249604,41.40179,Recollida i neteja de l'espai urbà,4583727.945,427120.2991,...,1,Carrer,2015,12,WEB,Calatrava,24.0,2,QUEIXA,1
2,2.0,Eixample,2016,"Orins, excrements animals",Sant Antoni,249615,41.375867,Recollida i neteja de l'espai urbà,4580821.582,429969.0695,...,1,Carrer,2015,12,TELÈFON,Viladomat,10.0,3,INCIDENCIA,1
3,2.0,Eixample,2016,Disconforme servei de neteja carrers / places,l'Antiga Esquerra de l'Eixample,249616,41.390159,Recollida i neteja de l'espai urbà,4582412.364,429564.1159,...,1,Carrer,2015,12,TELÈFON,Mallorca,8.0,4,QUEIXA,1
4,7.0,Horta-Guinardó,2016,Disconforme servei de neteja carrers / places,el Guinardó,249617,41.41961,Recollida i neteja de l'espai urbà,4585666.666,431179.5351,...,1,Ronda,2015,12,WEB,Guinardó,35.0,5,QUEIXA,1


In [18]:
income_df = df2.copy()
income_df.head()

Unnamed: 0,Any,Codi_Districte,Nom_Districte,Codi_Barri,Nom_Barri,Població,Índex RFD Barcelona = 100
0,2011,1,Ciutat Vella,1,el Raval,48485,65.0
1,2011,1,Ciutat Vella,2,el Barri Gòtic,17257,98.8
2,2011,1,Ciutat Vella,3,la Barceloneta,15674,73.1
3,2011,1,Ciutat Vella,4,"Sant Pere, Santa Caterina i la Ribera",22632,86.1
4,2011,2,Eixample,5,el Fort Pienc,32348,97.9


In [19]:
unemployment_df = df3.copy()
unemployment_df.head()

Unnamed: 0,Codi_Districte,Sexe,Nom_Districte,Demanda_ocupacio,Nom_Barri,Mes,Codi_Barri,Nombre,_id,Any,Demanda_ocupació
0,1,Homes,Ciutat Vella,Atur registrat,el Raval,1,1,1900,1,2019,
1,1,Homes,Ciutat Vella,Atur registrat,el Barri Gòtic,1,2,485,2,2019,
2,1,Homes,Ciutat Vella,Atur registrat,la Barceloneta,1,3,496,3,2019,
3,1,Homes,Ciutat Vella,Atur registrat,"Sant Pere, Santa Caterina i la Ribera",1,4,686,4,2019,
4,2,Homes,Eixample,Atur registrat,el Fort Pienc,1,5,539,5,2019,


In [20]:

def quick_eda(df, name="DataFrame"):
    print(f"\n EDA Report for: {name}")
    print("-" * 50)
    
    # Shape
    print(f" Shape: {df.shape[0]} rows × {df.shape[1]} columns")
    
    # Column types
    print("\n Column Types:")
    print(df.dtypes)
    
    # Missing values
    print("\n Missing Values (%):")
    missing = df.isnull().mean() * 100
    print(missing[missing > 0].sort_values(ascending=False))

    # Unique values
    print("\n Unique Values (Top 10):")
    for col in df.columns[:10]:  # only first 10 columns to keep it readable
        print(f"{col}: {df[col].nunique()} unique values")

    # Summary stats for numeric columns
    print("\n Summary Statistics:")
    print(df.describe(include='number').T)

    # Sample values
    print("\n Sample Rows:")
    print(df.sample(min(5, len(df))))  # 5 random rows or less if df is small

    print("-" * 50)


In [21]:
quick_eda(incident_df, name="Incidences")
quick_eda(income_df, name="Income")
quick_eda(unemployment_df, name="Unemployment")



 EDA Report for: Incidences
--------------------------------------------------
 Shape: 900 rows × 26 columns

 Column Types:
CODI_DISTRICTE        float64
DISTRICTE              object
ANY_DATA_TANCAMENT      int64
DETALL                 object
BARRI                  object
FITXA_ID                int64
LATITUD               float64
AREA                   object
COORDENADA_Y          float64
COORDENADA_X          float64
LONGITUD              float64
DIA_DATA_ALTA           int64
CANALS_RESPOSTA        object
NUMERO                float64
ELEMENT                object
SECCIO_CENSAL         float64
MES_DATA_TANCAMENT      int64
TIPUS_VIA              object
ANY_DATA_ALTA           int64
MES_DATA_ALTA           int64
SUPORT                 object
CARRER                 object
CODI_BARRI            float64
_id                     int64
TIPUS                  object
DIA_DATA_TANCAMENT      int64
dtype: object

 Missing Values (%):
LATITUD           56.888889
LONGITUD          56.888889
TI

Dataset: Income

| KPI                                                         | Description                      |
| ----------------------------------------------------------- | -------------------------------- |
| Average RFD (Relative Family Income Index) per district     | Measures average income level    |
| Districts with income above/below Barcelona's average (100) | Detect inequality                |
| Population-weighted income index                            | Accounts for district population |


Dataset: Incident

| KPI                                               | Description                |
| ------------------------------------------------- | -------------------------- |
| Total number of incidences per district           | Measures complaint density |
| Most common type of incidence                     | Find frequent issues       |
| Month with more incidences                        | Normalize for comparison   |



Dataset: Unemployment

| KPI                                          | Description                                   |
| -------------------------------------------- | --------------------------------------------- |
| Unemployment rate by gender                  | Detect gender gap                             |
| Unemployment per district                    | Regional economic pressure                    |
| % change in unemployment from previous years | Trend analysis  |


## **A.2  Data Formatting Process**

Create a directory on your local file system to serve as the Formatted Zone.
Write the required Spark jobs to read the raw data from the Landing Zone
(i.e. 3 datasets from the local file system), perform necessary transformations,
and write data in Parquet or Delta format in the Formatted Zone.
Ensure the data is partitioned appropriately for efficient querying.

In [22]:
from pyspark.sql import SparkSession
from pathlib import Path

# Initialize Spark
spark = SparkSession.builder \
    .appName("FormattedZoneETL") \
    .getOrCreate()

# Define landing and formatted zones
landing_zone = Path.cwd() / "datasets"
formatted_zone = Path.cwd() / "formatted_zone"
formatted_zone.mkdir(parents=True, exist_ok=True)  

now let's read from the landing zone

In [23]:
income_path = str(landing_zone / "income")
income_df = spark.read.option("header", True).option("inferSchema", True).csv(income_path)

In [24]:
incident_path = str(landing_zone / "incidences")
incident_df = spark.read.option("header", True).option("inferSchema", True).csv(incident_path)

In [25]:
unemployment_path = str(landing_zone / "unemployment")
unemployment_df = spark.read.option("multiline", True).json(unemployment_path)

In [26]:
income_df.printSchema()
income_df.show(5, truncate=False)

root
 |-- Any: integer (nullable = true)
 |-- Codi_Districte: integer (nullable = true)
 |-- Nom_Districte: string (nullable = true)
 |-- Codi_Barri: integer (nullable = true)
 |-- Nom_Barri: string (nullable = true)
 |-- Població: integer (nullable = true)
 |-- Índex RFD Barcelona = 100: string (nullable = true)

+----+--------------+-------------+----------+-------------------------------------+--------+-------------------------+
|Any |Codi_Districte|Nom_Districte|Codi_Barri|Nom_Barri                            |Població|Índex RFD Barcelona = 100|
+----+--------------+-------------+----------+-------------------------------------+--------+-------------------------+
|2013|1             |Ciutat Vella |1         |el Raval                             |49225   |60.3                     |
|2013|1             |Ciutat Vella |2         |el Barri Gòtic                       |16327   |103.6                    |
|2013|1             |Ciutat Vella |3         |la Barceloneta                       |

In [27]:
incident_df.printSchema()
incident_df.show(5, truncate=False)

root
 |-- CODI_DISTRICTE: integer (nullable = true)
 |-- DISTRICTE: string (nullable = true)
 |-- ANY_DATA_TANCAMENT: integer (nullable = true)
 |-- DETALL: string (nullable = true)
 |-- BARRI: string (nullable = true)
 |-- FITXA_ID: integer (nullable = true)
 |-- LATITUD: double (nullable = true)
 |-- AREA: string (nullable = true)
 |-- COORDENADA_Y: double (nullable = true)
 |-- COORDENADA_X: double (nullable = true)
 |-- LONGITUD: double (nullable = true)
 |-- DIA_DATA_ALTA: integer (nullable = true)
 |-- CANALS_RESPOSTA: string (nullable = true)
 |-- NUMERO: integer (nullable = true)
 |-- ELEMENT: string (nullable = true)
 |-- SECCIO_CENSAL: integer (nullable = true)
 |-- MES_DATA_TANCAMENT: integer (nullable = true)
 |-- TIPUS_VIA: string (nullable = true)
 |-- ANY_DATA_ALTA: integer (nullable = true)
 |-- MES_DATA_ALTA: integer (nullable = true)
 |-- SUPORT: string (nullable = true)
 |-- CARRER: string (nullable = true)
 |-- CODI_BARRI: integer (nullable = true)
 |-- _id: integer

In [28]:
unemployment_df.printSchema()
unemployment_df.show(5, truncate=False)

root
 |-- error: struct (nullable = true)
 |    |-- __type: string (nullable = true)
 |    |-- message: string (nullable = true)
 |-- help: string (nullable = true)
 |-- result: struct (nullable = true)
 |    |-- _links: struct (nullable = true)
 |    |    |-- next: string (nullable = true)
 |    |    |-- start: string (nullable = true)
 |    |-- fields: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |-- records: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Any: string (nullable = true)
 |    |    |    |-- Codi_Barri: string (nullable = true)
 |    |    |    |-- Codi_Districte: string (nullable = true)
 |    |    |    |-- Demanda_ocupacio: string (nullable = true)
 |    |    |    |-- Demanda_ocupació: string (nullable = true)
 |    |    |    |-- Mes: string (nullable = true)
 |    |    |    |-- Nom_Ba

Let's proceed with the data transformation

The "Índex RFD Barcelona = 100 is a string, so we will convert that to an integer.

In [29]:
from pyspark.sql.functions import col, regexp_replace, when

# Clean Index_RFD and convert comma to dot + handle dashes
income_cleaned_df = income_df \
    .withColumnRenamed("Índex RFD Barcelona = 100", "Index_RFD") \
    .withColumn("Index_RFD", regexp_replace(col("Index_RFD"), ",", ".")) \
    .withColumn("Index_RFD", when(col("Index_RFD") == "-", None).otherwise(col("Index_RFD"))) \
    .withColumn("Index_RFD", col("Index_RFD").cast("float")) \
    .filter(col("Index_RFD").isNotNull())

# Rename all relevant columns to English
income_cleaned_df = income_cleaned_df \
    .withColumnRenamed("Any", "Year") \
    .withColumnRenamed("Codi_Districte", "District_Code") \
    .withColumnRenamed("Nom_Districte", "District_Name") \
    .withColumnRenamed("Codi_Barri", "Neighborhood_Code") \
    .withColumnRenamed("Nom_Barri", "Neighborhood_Name") \
    .withColumnRenamed("Població", "Population") \
    .withColumnRenamed("Index_RFD", "Income_Index")

In [30]:
income_cleaned_df.printSchema()
income_cleaned_df.show(5, truncate=False)

root
 |-- Year: integer (nullable = true)
 |-- District_Code: integer (nullable = true)
 |-- District_Name: string (nullable = true)
 |-- Neighborhood_Code: integer (nullable = true)
 |-- Neighborhood_Name: string (nullable = true)
 |-- Population: integer (nullable = true)
 |-- Income_Index: float (nullable = true)

+----+-------------+-------------+-----------------+-------------------------------------+----------+------------+
|Year|District_Code|District_Name|Neighborhood_Code|Neighborhood_Name                    |Population|Income_Index|
+----+-------------+-------------+-----------------+-------------------------------------+----------+------------+
|2013|1            |Ciutat Vella |1                |el Raval                             |49225     |60.3        |
|2013|1            |Ciutat Vella |2                |el Barri Gòtic                       |16327     |103.6       |
|2013|1            |Ciutat Vella |3                |la Barceloneta                       |15571     |82.1 

now let's write the transformed income table to the formatted zone

In [31]:
income_cleaned_df.write \
    .partitionBy("District_Name") \
    .mode("overwrite") \
    .parquet(str(formatted_zone / "income"))

                                                                                

lets move onto the incidence

In [32]:
incident_cleaned_df = incident_df \
    .withColumnRenamed("CODI_DISTRICTE", "District_Code") \
    .withColumnRenamed("DISTRICTE", "District_Name") \
    .withColumnRenamed("ANY_DATA_TANCAMENT", "Closure_Year") \
    .withColumnRenamed("MES_DATA_TANCAMENT", "Closure_Month") \
    .withColumnRenamed("DIA_DATA_TANCAMENT", "Closure_Day") \
    .withColumnRenamed("DETALL", "Details") \
    .withColumnRenamed("BARRI", "Neighborhood_Name") \
    .withColumnRenamed("CODI_BARRI", "Neighborhood_Code") \
    .withColumnRenamed("LATITUD", "Latitude") \
    .withColumnRenamed("LONGITUD", "Longitude") \
    .withColumnRenamed("COORDENADA_X", "Coord_X") \
    .withColumnRenamed("COORDENADA_Y", "Coord_Y") \
    .withColumnRenamed("TIPUS", "Type") \
    .withColumnRenamed("TIPUS_VIA", "Street_Type") \
    .withColumnRenamed("CARRER", "Street_Name") \
    .withColumnRenamed("FITXA_ID", "Record_ID") \
    .withColumnRenamed("CANALS_RESPOSTA", "Response_Channel") \
    .withColumnRenamed("ELEMENT", "Element") \
    .withColumnRenamed("AREA", "Area") \
    .withColumnRenamed("ANY_DATA_ALTA", "Creation_Year") \
    .withColumnRenamed("MES_DATA_ALTA", "Creation_Month") \
    .withColumnRenamed("DIA_DATA_ALTA", "Creation_Day") \
    .withColumnRenamed("SUPORT", "Support_Channel") \
    .withColumnRenamed("SECCIO_CENSAL", "Census_Section") \
    .withColumnRenamed("_id", "Internal_ID")

In [33]:
incident_cleaned_df.printSchema()
incident_cleaned_df.show(5, truncate=False)

root
 |-- District_Code: integer (nullable = true)
 |-- District_Name: string (nullable = true)
 |-- Closure_Year: integer (nullable = true)
 |-- Details: string (nullable = true)
 |-- Neighborhood_Name: string (nullable = true)
 |-- Record_ID: integer (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Area: string (nullable = true)
 |-- Coord_Y: double (nullable = true)
 |-- Coord_X: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Creation_Day: integer (nullable = true)
 |-- Response_Channel: string (nullable = true)
 |-- NUMERO: integer (nullable = true)
 |-- Element: string (nullable = true)
 |-- Census_Section: integer (nullable = true)
 |-- Closure_Month: integer (nullable = true)
 |-- Street_Type: string (nullable = true)
 |-- Creation_Year: integer (nullable = true)
 |-- Creation_Month: integer (nullable = true)
 |-- Support_Channel: string (nullable = true)
 |-- Street_Name: string (nullable = true)
 |-- Neighborhood_Code: integer (nullable = 

In [34]:
preferred_order = [
    "Record_ID", "Internal_ID", "Type", "Details",
    "District_Code", "District_Name", "Neighborhood_Code", "Neighborhood_Name",
    "Closure_Year", "Closure_Month", "Closure_Day",
    "Creation_Year", "Creation_Month", "Creation_Day",
    "Latitude", "Longitude", "Coord_X", "Coord_Y",
    "Street_Type", "Street_Name", "Census_Section", "NUMERO",
    "Response_Channel", "Support_Channel", "Element", "Area"
]

incident_cleaned_df = incident_cleaned_df.select(preferred_order)

In [35]:
incident_cleaned_df.printSchema()
incident_cleaned_df.show(5, truncate=False)

root
 |-- Record_ID: integer (nullable = true)
 |-- Internal_ID: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Details: string (nullable = true)
 |-- District_Code: integer (nullable = true)
 |-- District_Name: string (nullable = true)
 |-- Neighborhood_Code: integer (nullable = true)
 |-- Neighborhood_Name: string (nullable = true)
 |-- Closure_Year: integer (nullable = true)
 |-- Closure_Month: integer (nullable = true)
 |-- Closure_Day: integer (nullable = true)
 |-- Creation_Year: integer (nullable = true)
 |-- Creation_Month: integer (nullable = true)
 |-- Creation_Day: integer (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Coord_X: double (nullable = true)
 |-- Coord_Y: double (nullable = true)
 |-- Street_Type: string (nullable = true)
 |-- Street_Name: string (nullable = true)
 |-- Census_Section: integer (nullable = true)
 |-- NUMERO: integer (nullable = true)
 |-- Response_Channel: string (nullable 

In [36]:
incident_cleaned_df.write \
    .partitionBy("District_Name") \
    .mode("overwrite") \
    .parquet(str(formatted_zone / "incidences"))

lets move onto unemployment

In [37]:
unemployment_df.printSchema()
unemployment_df.show(5, truncate=False)

root
 |-- error: struct (nullable = true)
 |    |-- __type: string (nullable = true)
 |    |-- message: string (nullable = true)
 |-- help: string (nullable = true)
 |-- result: struct (nullable = true)
 |    |-- _links: struct (nullable = true)
 |    |    |-- next: string (nullable = true)
 |    |    |-- start: string (nullable = true)
 |    |-- fields: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |-- records: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Any: string (nullable = true)
 |    |    |    |-- Codi_Barri: string (nullable = true)
 |    |    |    |-- Codi_Districte: string (nullable = true)
 |    |    |    |-- Demanda_ocupacio: string (nullable = true)
 |    |    |    |-- Demanda_ocupació: string (nullable = true)
 |    |    |    |-- Mes: string (nullable = true)
 |    |    |    |-- Nom_Ba

In [38]:
from pyspark.sql.functions import col
import pandas as pd

# Extract the nested `records` list from the `result` field
records = unemployment_df.select("result.records").first()[0]

# Convert to pandas for easier handling of the list of dicts
records_pd = pd.DataFrame(records)

records_pd

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10
0,2016,1,1,,Atur registrat,1,el Raval,Ciutat Vella,2431,Homes,1
1,2016,2,1,,Atur registrat,1,el Barri Gòtic,Ciutat Vella,588,Homes,2
2,2016,3,1,,Atur registrat,1,la Barceloneta,Ciutat Vella,637,Homes,3
3,2016,4,1,,Atur registrat,1,"Sant Pere, Santa Caterina i la Ribera",Ciutat Vella,878,Homes,4
4,2016,5,2,,Atur registrat,1,el Fort Pienc,Eixample,693,Homes,5
...,...,...,...,...,...,...,...,...,...,...,...
95,2016,22,5,,Demanda no aturats,1,"Vallvidrera, el Tibidabo i les Planes",Sarrià-Sant Gervasi,12,Homes,96
96,2016,23,5,,Demanda no aturats,1,Sarrià,Sarrià-Sant Gervasi,72,Homes,97
97,2016,24,5,,Demanda no aturats,1,les Tres Torres,Sarrià-Sant Gervasi,46,Homes,98
98,2016,25,5,,Demanda no aturats,1,Sant Gervasi - la Bonanova,Sarrià-Sant Gervasi,78,Homes,99


In [39]:
records_pd.describe(include='all')

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10
count,100.0,100.0,100.0,0.0,100,100.0,100,100,100.0,100,100.0
unique,1.0,74.0,11.0,0.0,2,1.0,74,11,95.0,1,
top,2016.0,1.0,3.0,,Atur registrat,1.0,el Raval,Sants-Montjuïc,148.0,Homes,
freq,100.0,2.0,16.0,,74,100.0,2,16,2.0,100,
mean,,,,,,,,,,,50.5
std,,,,,,,,,,,29.011492
min,,,,,,,,,,,1.0
25%,,,,,,,,,,,25.75
50%,,,,,,,,,,,50.5
75%,,,,,,,,,,,75.25


In [40]:
records_pd.columns = [
    "Year",                
    "Month",               
    "District_Code",       
    "Neighborhood_Code",   # (all None - will drop)
    "Occupation_Type",     
    "Constant_Code",       # 5 (always 1 - will drop)
    "Neighborhood_Name",   
    "District_Name",       
    "Count",               
    "Gender",              
    "Row_ID"               
]

records_pd_clean = records_pd.drop(columns=["Neighborhood_Code", "Constant_Code", "Row_ID"])

records_pd_clean

Unnamed: 0,Year,Month,District_Code,Occupation_Type,Neighborhood_Name,District_Name,Count,Gender
0,2016,1,1,Atur registrat,el Raval,Ciutat Vella,2431,Homes
1,2016,2,1,Atur registrat,el Barri Gòtic,Ciutat Vella,588,Homes
2,2016,3,1,Atur registrat,la Barceloneta,Ciutat Vella,637,Homes
3,2016,4,1,Atur registrat,"Sant Pere, Santa Caterina i la Ribera",Ciutat Vella,878,Homes
4,2016,5,2,Atur registrat,el Fort Pienc,Eixample,693,Homes
...,...,...,...,...,...,...,...,...
95,2016,22,5,Demanda no aturats,"Vallvidrera, el Tibidabo i les Planes",Sarrià-Sant Gervasi,12,Homes
96,2016,23,5,Demanda no aturats,Sarrià,Sarrià-Sant Gervasi,72,Homes
97,2016,24,5,Demanda no aturats,les Tres Torres,Sarrià-Sant Gervasi,46,Homes
98,2016,25,5,Demanda no aturats,Sant Gervasi - la Bonanova,Sarrià-Sant Gervasi,78,Homes


In [41]:
unemployment_cleaned_df = spark.createDataFrame(records_pd_clean)

In [42]:
unemployment_cleaned_df.printSchema()
unemployment_cleaned_df.show(5, truncate=False)

root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- District_Code: string (nullable = true)
 |-- Occupation_Type: string (nullable = true)
 |-- Neighborhood_Name: string (nullable = true)
 |-- District_Name: string (nullable = true)
 |-- Count: string (nullable = true)
 |-- Gender: string (nullable = true)



[Stage 26:>                                                         (0 + 1) / 1]

+----+-----+-------------+---------------+-------------------------------------+-------------+-----+------+
|Year|Month|District_Code|Occupation_Type|Neighborhood_Name                    |District_Name|Count|Gender|
+----+-----+-------------+---------------+-------------------------------------+-------------+-----+------+
|2016|1    |1            |Atur registrat |el Raval                             |Ciutat Vella |2431 |Homes |
|2016|2    |1            |Atur registrat |el Barri Gòtic                       |Ciutat Vella |588  |Homes |
|2016|3    |1            |Atur registrat |la Barceloneta                       |Ciutat Vella |637  |Homes |
|2016|4    |1            |Atur registrat |Sant Pere, Santa Caterina i la Ribera|Ciutat Vella |878  |Homes |
|2016|5    |2            |Atur registrat |el Fort Pienc                        |Eixample     |693  |Homes |
+----+-----+-------------+---------------+-------------------------------------+-------------+-----+------+
only showing top 5 rows


                                                                                

In [43]:
unemployment_cleaned_df.write \
    .partitionBy("Year") \
    .mode("overwrite") \
    .parquet(str(formatted_zone / "unemployment"))

In [44]:
from pyspark.sql.functions import countDistinct

for dataset in ["income", "unemployment", "incidences"]:
    print(f"\nPreviewing formatted_zone/{dataset}")
    df = spark.read.parquet(str(formatted_zone / dataset))
    df.printSchema()
    df.show(3)
    df.select([countDistinct(c).alias(f"{c}_distinct") for c in df.columns]).show()


Previewing formatted_zone/income
root
 |-- Year: integer (nullable = true)
 |-- District_Code: integer (nullable = true)
 |-- Neighborhood_Code: integer (nullable = true)
 |-- Neighborhood_Name: string (nullable = true)
 |-- Population: integer (nullable = true)
 |-- Income_Index: float (nullable = true)
 |-- District_Name: string (nullable = true)

+----+-------------+-----------------+--------------------+----------+------------+-------------+
|Year|District_Code|Neighborhood_Code|   Neighborhood_Name|Population|Income_Index|District_Name|
+----+-------------+-----------------+--------------------+----------+------------+-------------+
|2010|           10|               64|el Camp de l'Arpa...|     38399|        84.6|   Sant Martí|
|2010|           10|               65|             el Clot|     27505|        82.8|   Sant Martí|
|2010|           10|               66|el Parc i la Llac...|     13728|        97.3|   Sant Martí|
+----+-------------+-----------------+--------------------+

## **A.3 Move Data to the Exploitation Zone**


In [56]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Exploitation Zone Pipeline") \
    .getOrCreate()


In [57]:
formatted_zone = Path.cwd() / "formatted_zone"

incident_df = spark.read.option("basePath", str(formatted_zone / "incidences")) \
    .parquet(str(formatted_zone / "incidences"))

income_df = spark.read.option("basePath", str(formatted_zone / "income")) \
    .parquet(str(formatted_zone / "income"))

unemployment_df = spark.read.option("basePath", str(formatted_zone / "unemployment")) \
    .parquet(str(formatted_zone / "unemployment"))


**Transformations to support KPI analysis**

Dataset: Incidences

In [58]:
from pyspark.sql.functions import col, count, month

# Total number of incidences per district
incident_kpi1 = incident_df.groupBy("District_Name").agg(count("*").alias("total_incidences"))

# Most common type of incidence
incident_kpi2 = incident_df.groupBy("Details").agg(count("*").alias("count")).orderBy(col("count").desc())

# Month with more incidences
incident_df = incident_df.withColumn("month", col("Creation_Month").cast("int"))
incident_kpi3 = incident_df.groupBy("month").agg(count("*").alias("total")).orderBy(col("total").desc())


Dataset: Income

In [61]:
from pyspark.sql.functions import avg, sum as _sum, when

income_df = income_df.withColumn("RFD", col("Income_Index").cast("float")) \
                     .withColumn("Population", col("Population").cast("int"))

# Average RFD per district
income_kpi1 = income_df.groupBy("District_Name").agg(avg("RFD").alias("avg_RFD"))

# Above/below average RFD
income_kpi2 = income_kpi1.withColumn("above_avg", when(col("avg_RFD") > 100, "Above").otherwise("Below"))

# Population-weighted income
income_df = income_df.withColumn("weighted", col("RFD") * col("Population"))
income_kpi3 = income_df.groupBy("District_Name") \
    .agg((_sum("weighted") / _sum("Population")).alias("population_weighted_RFD"))


Dataset: Unemployment

In [63]:
unemployment_df.head()

Row(Month='11', District_Code='3', Occupation_Type='Demanda no aturats', Neighborhood_Name='el Poble Sec', District_Name='Sants-Montjuïc', Count='311', Gender='Homes', Year=2016)

In [72]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, round

unemployment_df = unemployment_df.withColumn("Unemp", col("Occupation_Type").try_cast("float"))

# Unemployment by gender
unemp_kpi1 = unemployment_df.groupBy("Gender").agg(avg("Unemp").alias("avg_unemp"))

# Unemployment by district
unemp_kpi2 = unemployment_df.groupBy("District_Code").agg(avg("Unemp").alias("avg_unemp"))

# % change in unemployment by year
window = Window.partitionBy("District_Code").orderBy("Year")
unemployment_df = unemployment_df.withColumn("prev", lag("Unemp").over(window))
unemployment_df = unemployment_df.withColumn("pct_change", round(((col("Unemp") - col("prev")) / col("prev")) * 100, 2))
unemp_kpi3 = unemployment_df.select("District_Code", "Year", "Unemp", "pct_change")


Write processed data to the Exploitation Zone

In [73]:
exploitation_zone = Path.cwd() / "exploitation_zone"


def write_df(df, name):
    df.write.mode("overwrite").parquet(str(exploitation_zone / name))

write_df(incident_kpi1, "incidents_by_district")
write_df(incident_kpi2, "top_incident_types")
write_df(incident_kpi3, "incidents_by_month")

write_df(income_kpi1, "income_avg_by_district")
write_df(income_kpi2, "income_above_below_avg")
write_df(income_kpi3, "population_weighted_income")

write_df(unemp_kpi1, "unemployment_by_gender")
write_df(unemp_kpi2, "unemployment_by_district")
write_df(unemp_kpi3, "unemployment_pct_change")


## **A.4 Validate the Data**