In [1]:
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("exploitation-pipeline").getOrCreate()
income      = spark.read.parquet("formatted_zone/income")
population  = spark.read.parquet("formatted_zone/population_by_geographical")
incidences  = spark.read.parquet("formatted_zone/incidences")

25/06/24 17:22:27 WARN Utils: Your hostname, Mels-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.60.172.110 instead (on interface en0)
25/06/24 17:22:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/24 17:22:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/06/24 17:22:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/06/24 17:22:28 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [2]:
income.printSchema()
population.printSchema()
incidences.printSchema()

root
 |-- Codi_Districte: integer (nullable = true)
 |-- Nom_Districte: string (nullable = true)
 |-- Nom_Barri: string (nullable = true)
 |-- Població: integer (nullable = true)
 |-- Índex RFD Barcelona = 100: double (nullable = true)
 |-- Codi_Barri: integer (nullable = true)
 |-- Any: integer (nullable = true)

root
 |-- Codi_Districte: long (nullable = true)
 |-- Nom_Barri: string (nullable = true)
 |-- Nom_Districte: string (nullable = true)
 |-- Valor_int: integer (nullable = true)
 |-- SEXE_desc: string (nullable = true)
 |-- NACIONALITAT_REGIO_desc: string (nullable = true)
 |-- Codi_Barri: integer (nullable = true)
 |-- Any: integer (nullable = true)

root
 |-- FITXA_ID: integer (nullable = true)
 |-- TIPUS: string (nullable = true)
 |-- AREA: string (nullable = true)
 |-- ELEMENT: string (nullable = true)
 |-- DETALL: string (nullable = true)
 |-- DIA_DATA_ALTA: integer (nullable = true)
 |-- MES_DATA_ALTA: integer (nullable = true)
 |-- DIA_DATA_TANCAMENT: integer (nullable 

In [3]:
population.show(10)

+--------------+--------------------+-------------+---------+---------+-----------------------+----------+----+
|Codi_Districte|           Nom_Barri|Nom_Districte|Valor_int|SEXE_desc|NACIONALITAT_REGIO_desc|Codi_Barri| Any|
+--------------+--------------------+-------------+---------+---------+-----------------------+----------+----+
|             1|Sant Pere, Santa ...| Ciutat Vella|      240|   Female|        Northern Africa|         4|2017|
|             1|Sant Pere, Santa ...| Ciutat Vella|      562|     Male|        Northern Africa|         4|2017|
|             1|Sant Pere, Santa ...| Ciutat Vella|        6|     Male|        Southern Africa|         4|2017|
|             1|Sant Pere, Santa ...| Ciutat Vella|       13|   Female|         Western Africa|         4|2017|
|             1|Sant Pere, Santa ...| Ciutat Vella|      127|     Male|         Western Africa|         4|2017|
|             1|Sant Pere, Santa ...| Ciutat Vella|      101|   Female|              Caribbean|         

### Multiculutral index

We are going to create the **Shannon Diversity Index**, a measure of biodiversity that considers both the number of species (richness) and their relative abundance (evenness) within a community. A higher Shannon index generally indicates a more diverse and balanced community, while a lower index suggests a less diverse community, potentially dominated by one or a few species.

In this project, we apply this concept to human populations, using the diversity of nationalities present in a given region. While the Shannon Index is typically used to measure species evenness in ecological communities, we found it an interesting approach to use it for analyzing human diversity — since, biologically, humans are part of the animal kingdom.

**Note:**
Melisa and Pol — we are completely against any form of racism or discrimination. This project and its analysis are purely observational and neutral. There is no judgment of "good" or "bad" in this context. We simply thought it would be insightful to apply the Shannon Diversity Index to the datasets provided in class.

We calculate the diversity index of each neighborhood (barri) by treating each "NACIONALITAT_REGIO_desc" as a distinct "species" for the purpose of the calculation. However, we are fully aware that all humans are equal — we are all part of the same species. We hope this project is not misunderstood, and that its purpose and objectives are seen clearly as an exploration of data, not a commentary on value or worth.

$H=−∑p_i⋅log(p_i)$ 

Where $p_i$ is the proportion of people of a nationality in neighborhood.

In [4]:
""" Fist we calculate the tot_nac that is the total population by region and year. Then we calculate the tot_barri that is the total population by neighborhood and year.
Finally, we join both DataFrames to calculate p_i = total_regio / total_barri
Then we calculate the Shannon index for each neighborhood and year using the formula H = -sum(p_i * log(p_i))."""


tot_nac = (population
           .groupBy("Codi_Barri", "Any", "NACIONALITAT_REGIO_desc","Nom_Barri")
           .agg(F.sum("Valor_int").alias("total_regio")))

tot_barri = (population
             .groupBy("Codi_Barri", "Any","Nom_Barri")
             .agg(F.sum("Valor_int").alias("total_barri")))

# Join both dattasets with a inner joint  p_i = total_regio / total_barri
joined = (tot_nac
          .join(tot_barri, on=["Codi_Barri", "Any","Nom_Barri"], how="inner")
          .withColumn("p_i", F.col("total_regio") / F.col("total_barri")))

# H = -sum(p_i * log(p_i))
joined = joined.withColumn("p_logp", -F.col("p_i") * F.log(F.col("p_i")))
shannon_index = (joined
                 .groupBy("Codi_Barri", "Any","Nom_Barri")
                 .agg(F.sum("p_logp").alias("shannon_index")))


In [5]:
shannon_index.show(10)

+----------+----+--------------------+-------------------+
|Codi_Barri| Any|           Nom_Barri|      shannon_index|
+----------+----+--------------------+-------------------+
|        49|2016|           Canyelles|0.22710819806587076|
|         1|2017|            el Raval| 1.5682470015494185|
|        58|2015|       Baró de Viver| 0.4522842304431748|
|        22|2016|Vallvidrera, el T...| 0.5073731033954247|
|        63|2016|               Navas| 0.6192463791754728|
|        66|2016|el Parc i la Llac...| 0.8311868872025775|
|        52|2015|      la Prosperitat| 0.6366973991934503|
|        61|2014|          la Sagrera| 0.5810116441918528|
|         7|2017|la Dreta de l'Eix...| 0.8535846455922169|
|        23|2016|              Sarrià| 0.5158249536786634|
+----------+----+--------------------+-------------------+
only showing top 10 rows



The advantage of using the Shannon Diversity Index is that it not only counts how many different nationalities are present, but also considers how evenly they are distributed within the same region. For example, if a neighborhood has 10 different nationalities but one of them makes up 90% of the population, it will have a lower index than another neighborhood where all nationalities are more evenly distributed.

In [6]:
#multi = (tot_nac.filter("total_regio > 0").groupBy("Codi_Barri").agg(F.countDistinct("NACIONALITAT_REGIO_desc").alias("multicultural_idx")))

In [7]:
# i will save partitioned by year, but this should be in the formatted_zone
shannon_index.write \
    .option("header", True) \
    .mode("overwrite") \
    .csv("exploitation_zone/shannon_index")

#.partitionBy("Any") \

In [8]:
tot_nac.show(100)

+----------+----+-----------------------+--------------------+-----------+
|Codi_Barri| Any|NACIONALITAT_REGIO_desc|           Nom_Barri|total_regio|
+----------+----+-----------------------+--------------------+-----------+
|        26|2014|        Central America|Sant Gervasi - Ga...|        304|
|         8|2017|           Central Asia|l'Antiga Esquerra...|         23|
|        26|2014|   Australia and New...|Sant Gervasi - Ga...|         14|
|        69|2015|        Southern Europe|Diagonal Mar i el...|      11532|
|         4|2015|          Southern Asia|Sant Pere, Santa ...|        655|
|         4|2014|       Northern America|Sant Pere, Santa ...|        223|
|        27|2017|         Western Europe|el Putxet i el Farró|        608|
|        66|2017|         Western Europe|el Parc i la Llac...|        387|
|         4|2017|          South America|Sant Pere, Santa ...|       1280|
|        69|2016|           Central Asia|Diagonal Mar i el...|          8|
|        66|2017|        

### Income inequality per district (GINI coefficient)

The Gini coeffient is often used in econocmics to tract income inequality within a population. While it is most often used to track inequality between individuals in a group, it can also be used to track inequality in between neighborhoods of a disctrict, which is what we will be doing. The formula for the Gini coefficient is as follows:

$$gini\_coefficient = \frac{2 \times rank\_income\_sum}{n \times total\_income} - \frac{n + 1}{n}$$

where $n$ is the number of member neighborhoods within a district and: 

rank_income_sum = $\sum_{i=1}^{n} i \times y_i$

total_income = $\sum_{i=1}^{n} y_i$

keep in mine eixample has 6 not 24 

In [9]:
income_clean = income \
    .withColumnRenamed("Codi_Districte", "district_code") \
    .withColumnRenamed("Nom_Districte", "district_name") \
    .withColumnRenamed("Codi_Barri", "neighborhood_code") \
    .withColumnRenamed("Nom_Barri", "neighborhood_name") \
    .withColumnRenamed("Índex RFD Barcelona = 100", "income_index") \
    .withColumnRenamed("Població", "population") \
    .withColumnRenamed("Any", "year") \
    .filter(F.col("income_index").isNotNull())

In [10]:

window_district_year = Window.partitionBy("district_code", "year").orderBy("income_index")
gini_prep = income_clean.withColumn("rank", F.row_number().over(window_district_year))

gini_components = gini_prep.groupBy("district_code", "district_name", "year").agg(
    F.sum(F.col("rank") * F.col("income_index")).alias("rank_income_sum"),
    F.sum("income_index").alias("total_income"),
    F.count("neighborhood_code").alias("n")
)

gini_coefficients = gini_components.withColumn(
    "gini_coefficient",
    (2 * F.col("rank_income_sum")) / (F.col("n") * F.col("total_income")) - (F.col("n") + 1) / F.col("n")
)

district_stats = income_clean.groupBy("district_code", "district_name", "year").agg(
    F.avg("income_index").alias("mean_income"),
    F.expr("percentile_approx(income_index, 0.5)").alias("median_income"),

    F.min("income_index").alias("min_income"),
    F.max("income_index").alias("max_income"),
    F.stddev("income_index").alias("income_stddev"),
    F.variance("income_index").alias("income_variance"),

    F.count("neighborhood_code").alias("neighborhoods_count"),
    F.sum("population").alias("total_population"),
    F.avg("population").alias("avg_neighborhood_population")
).join(
    gini_coefficients.select("district_code", "year", "gini_coefficient"),
    ["district_code", "year"]
).withColumn(
    "coefficient_of_variation",
    F.col("income_stddev") / F.col("mean_income")
).orderBy("year", "gini_coefficient")

district_stats.select(
    "year",
    "district_name",
    "district_code",
    F.round("mean_income", 1).alias("avg_income"),
    F.round("gini_coefficient", 3).alias("gini"),
    F.round("coefficient_of_variation", 3).alias("cv"),
    "neighborhoods_count"
).show(50, truncate=False)

+----+-------------------+-------------+----------+-----+-----+-------------------+
|year|district_name      |district_code|avg_income|gini |cv   |neighborhoods_count|
+----+-------------------+-------------+----------+-----+-----+-------------------+
|2014|Gràcia             |6            |102.5     |0.061|0.13 |5                  |
|2014|Sarrià-Sant Gervasi|5            |183.3     |0.074|0.15 |6                  |
|2014|Ciutat Vella       |1            |85.4      |0.077|0.166|4                  |
|2014|Horta-Guinardó     |7            |80.3      |0.078|0.15 |11                 |
|2014|Sants-Montjuïc     |3            |71.7      |0.089|0.198|8                  |
|2014|Nou Barris         |8            |50.4      |0.093|0.172|13                 |
|2014|Sant Andreu        |9            |67.6      |0.097|0.19 |7                  |
|2014|Eixample           |2            |116.1     |0.11 |0.232|6                  |
|2014|Les Corts          |4            |163.2     |0.189|0.471|3            

In [11]:
district_stats.write \
    .option("header", True) \
    .mode("overwrite") \
    .csv("exploitation_zone/gini_coefficients")

### Average Incident Resolution Time Per Neighborhood

We calculate the **Average Incident Resolution Time** per Neighborhood as a key performance indicator for public safety and emergency response efficiency. This metric measures how quickly incidents are resolved from the moment they are reported until they are officially closed, expressed in days. The resolution time is computed by finding the difference between the incident closure date and the incident creation date for each neighborhood and year.
This KPI helps identify service efficiency patterns across Barcelona's neighborhoods, revealing areas where emergency services respond more quickly or slowly. Lower average resolution times indicate more efficient incident management, while higher times may suggest areas needing additional resources or process improvements. However, our analysis reveals potential data quality issues, as many incidents show identical creation and closure dates (0-day resolution), which may indicate systematic data entry practices or bulk processing rather than actual same-day resolution of all incidents.

In [12]:
incident_with_dates = (incidences
                      .withColumn("creation_date",
                                 F.to_date(F.concat_ws("-", F.col("Any"), F.col("MES_DATA_ALTA"), F.col("DIA_DATA_ALTA")), "yyyy-M-d"))
                      .withColumn("closure_date",
                                 F.to_date(F.concat_ws("-", F.col("ANY_DATA_TANCAMENT"), F.col("MES_DATA_TANCAMENT"), F.col("DIA_DATA_TANCAMENT")), "yyyy-M-d"))
                      .filter(F.col("creation_date").isNotNull() & F.col("closure_date").isNotNull()))

# resolution time
incident_resolution = (incident_with_dates
                      .withColumn("resolution_days",
                                 F.datediff(F.col("closure_date"), F.col("creation_date")))
                      .filter(F.col("resolution_days") >= 0))

# average resolution time per neighborhood and year
avg_resolution_time = (incident_resolution
                      .groupBy("Codi_Barri", "Any")
                      .agg(F.avg("resolution_days").alias("avg_resolution_days"),
                           F.count("*").alias("total_incidents"),
                           F.min("resolution_days").alias("min_resolution_days"),
                           F.max("resolution_days").alias("max_resolution_days"))
                      .orderBy("Codi_Barri", "Any"))

avg_resolution_time.show(10)



+----------+----+-------------------+---------------+-------------------+-------------------+
|Codi_Barri| Any|avg_resolution_days|total_incidents|min_resolution_days|max_resolution_days|
+----------+----+-------------------+---------------+-------------------+-------------------+
|         1|2013| 45.166666666666664|             30|                  2|                202|
|         1|2014|  6.877161055505004|           2198|                  0|                310|
|         1|2015|    8.1679607682877|           2447|                  0|                325|
|         1|2016|  9.845474613686534|           2718|                  0|                177|
|         1|2017|  10.79578886645515|           3467|                  0|                144|
|         2|2013|               20.5|             22|                  7|                 84|
|         2|2014|  8.230582524271844|           1648|                  0|                353|
|         2|2015|  8.825335892514396|           1563|       

                                                                                

In [13]:
avg_resolution_time.write \
    .option("header", True) \
    .mode("overwrite") \
    .csv("exploitation_zone/avg_resolution_time")

                                                                                

In [15]:
from pyspark.sql.functions import col, sum as spark_sum, when, isnan, isnull
incidences.groupBy("Codi_Barri").count().orderBy("count", ascending=False).show()



+----------+-----+
|Codi_Barri|count|
+----------+-----+
|        31|11510|
|         7|10915|
|         1|10860|
|        18| 9409|
|        60| 9278|
|        68| 9245|
|         4| 8502|
|         9| 8379|
|        26| 8265|
|        11| 8249|
|        19| 8240|
|        35| 7498|
|         2| 7385|
|         8| 7211|
|         6| 6575|
|        37| 6505|
|        10| 6423|
|        43| 6309|
|        13| 5721|
|        64| 5299|
+----------+-----+
only showing top 20 rows



                                                                                

In [16]:

debug_neighborhood_1 = (incidences
                       .filter((F.col("Codi_Barri") == 1) & (F.col("Any") == 2016))
                       .select("DIA_DATA_ALTA", "MES_DATA_ALTA", "Any","Codi_Barri",
                              "DIA_DATA_TANCAMENT", "MES_DATA_TANCAMENT", "ANY_DATA_TANCAMENT")
                       .withColumn("same_day",
                                  (F.col("DIA_DATA_ALTA") == F.col("DIA_DATA_TANCAMENT")) &
                                  (F.col("MES_DATA_ALTA") == F.col("MES_DATA_TANCAMENT")) &
                                  (F.col("Any") == F.col("ANY_DATA_TANCAMENT"))))

debug_neighborhood_1.show()

same_date_check = (incidences
                  .withColumn("same_date",
                             (F.col("DIA_DATA_ALTA") == F.col("DIA_DATA_TANCAMENT")) &
                             (F.col("MES_DATA_ALTA") == F.col("MES_DATA_TANCAMENT")) &
                             (F.col("Any") == F.col("ANY_DATA_TANCAMENT")))
                  .groupBy("same_date")
                  .count())

same_date_check.show()

+-------------+-------------+----+----------+------------------+------------------+------------------+--------+
|DIA_DATA_ALTA|MES_DATA_ALTA| Any|Codi_Barri|DIA_DATA_TANCAMENT|MES_DATA_TANCAMENT|ANY_DATA_TANCAMENT|same_day|
+-------------+-------------+----+----------+------------------+------------------+------------------+--------+
|           17|            7|2016|         1|                18|                 7|              2016|   false|
|           18|            7|2016|         1|                18|                 7|              2016|    true|
|           18|            7|2016|         1|                18|                 7|              2016|    true|
|           27|            4|2016|         1|                19|                 7|              2016|   false|
|           27|            6|2016|         1|                19|                 7|              2016|   false|
|            1|            7|2016|         1|                19|                 7|              2016|  



+---------+------+
|same_date| count|
+---------+------+
|     true| 52988|
|    false|269534|
+---------+------+



                                                                                

In [17]:
district_stats.write \
    .option("header", True) \
    .mode("overwrite") \
    .csv("exploitation_zone/gini_coefficients")

In [None]:
## do the kpis the incident things and save this

### kpis already created in the exploitation file.\
### formatted to exploitation.