## initialize Spark

We will use local mode, where all the processing is done on a single machine. In case you need to install Apache Spark, there are two options, either go to the [Spark download page](https://spark.apache.org/downloads.html) and choose "Pre-built for Apache Hadoop 3.3 and later", or, install only PySpark Python library and its dependencies running `pip install pyspark[sql, ml, mllib]`.

You will also need to have Java 8 or later installed in your system.

In [1]:
# The purpose of findspark is to make it easier to find and use Spark from Python,
# especially if you have not set the SPARK_HOME environment variable or
# if your Spark and PySpark setup is not in your system's PATH.
# If pyspark was NOT installed with pip, uncomment the next two lines
#import findspark
#findspark.init()

from pyspark.sql import SparkSession
# initialize allocating 4 cores to Spark, my machine has 8
spark = SparkSession.builder \
    .appName("bcn_traffic_incidents") \
    .master("local[4]") \
    .getOrCreate()

24/04/30 17:42:04 WARN Utils: Your hostname, Paus-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.130 instead (on interface en0)
24/04/30 17:42:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/30 17:42:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# spark.stop()

## load datasets as DataFrames and create temporary views

- df_incidents
- df_incidents_type
- df_incidents_vehicle
- df_incidents_person
- df_incidents_cause
- df_incidents_driver

Read the data into a DataFrame and register each DataFrame as a temporary view so we can query it with SQL

Temporary views:

- incidents
- incidents_type
- incidents_vehicle
- incidents_person
- incidents_cause
- incidents_driver

  

In [2]:
incidents_file_path = "../data/2023_accidents_gu_bcn.csv"
incidents_type_file_path = "../data/2023_accidents_tipus_gu_bcn.csv"
incidents_vehicle_file_path = "../data/2023_accidents_vehicles_gu_bcn.csv"
incidents_person_file_path = "../data/2023_accidents_persones_gu_bcn.csv"
incidents_cause_file_path = "../data/2023_accidents_causes_gu_bcn.csv"
incidentsdriver_file_path = "../data/2023_accidents_causa_conductor_gu_bcn.csv"

df_incidents = (spark.read.format("csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .option("mode", "FAILFAST") # exit if any error
    .option("nullValue", "") # replace any null data field with empty quotes
    .load(incidents_file_path))
df_incidents.createOrReplaceTempView("incidents")

df_incidents_type = (spark.read.format("csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .option("mode", "FAILFAST") # exit if any error
    .option("nullValue", "") # replace any null data field with empty quotes
    .load(incidents_type_file_path))
df_incidents.createOrReplaceTempView("incidents_type")

df_incidents_vehicle = (spark.read.format("csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .option("mode", "FAILFAST") # exit if any error
    .option("nullValue", "") # replace any null data field with empty quotes
    .load(incidents_vehicle_file_path))
df_incidents.createOrReplaceTempView("incidents_vehicle")

df_incidents_person = (spark.read.format("csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .option("mode", "FAILFAST") # exit if any error
    .option("nullValue", "") # replace any null data field with empty quotes
    .load(incidents_person_file_path))
df_incidents.createOrReplaceTempView("incidents_person")

df_incidents_cause = (spark.read.format("csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .option("mode", "FAILFAST") # exit if any error
    .option("nullValue", "") # replace any null data field with empty quotes
    .load(incidents_cause_file_path))
df_incidents.createOrReplaceTempView("incidents_cause")

df_incidents_driver = (spark.read.format("csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .option("mode", "FAILFAST") # exit if any error
    .option("nullValue", "") # replace any null data field with empty quotes
    .load(incidentsdriver_file_path))
df_incidents.createOrReplaceTempView("incidents_driver")


## Data exploration



### df_incidents

In [3]:
df_incidents.printSchema()

root
 |-- Numero_expedient: string (nullable = true)
 |-- Codi_districte: integer (nullable = true)
 |-- Nom_districte: string (nullable = true)
 |-- Codi_barri: integer (nullable = true)
 |-- Nom_barri: string (nullable = true)
 |-- Codi_carrer: integer (nullable = true)
 |-- Nom_carrer: string (nullable = true)
 |-- Num_postal : string (nullable = true)
 |-- Descripcio_dia_setmana: string (nullable = true)
 |-- NK_Any: integer (nullable = true)
 |-- Mes_any: integer (nullable = true)
 |-- Nom_mes: string (nullable = true)
 |-- Dia_mes: integer (nullable = true)
 |-- Hora_dia: integer (nullable = true)
 |-- Descripcio_torn: string (nullable = true)
 |-- Descripcio_causa_vianant: string (nullable = true)
 |-- Numero_morts: integer (nullable = true)
 |-- Numero_lesionats_lleus: integer (nullable = true)
 |-- Numero_lesionats_greus: integer (nullable = true)
 |-- Numero_victimes: integer (nullable = true)
 |-- Numero_vehicles_implicats: integer (nullable = true)
 |-- Coordenada_UTM_Y_ED5

In [4]:
df_incidents.show(5)
df_incidents.count()

+----------------+--------------+--------------+----------+-----------------+-----------+--------------------+-----------+----------------------+------+-------+-------+-------+--------+---------------+------------------------+------------+----------------------+----------------------+---------------+-------------------------+---------------------+---------------------+--------------+-------------+
|Numero_expedient|Codi_districte| Nom_districte|Codi_barri|        Nom_barri|Codi_carrer|          Nom_carrer|Num_postal |Descripcio_dia_setmana|NK_Any|Mes_any|Nom_mes|Dia_mes|Hora_dia|Descripcio_torn|Descripcio_causa_vianant|Numero_morts|Numero_lesionats_lleus|Numero_lesionats_greus|Numero_victimes|Numero_vehicles_implicats|Coordenada_UTM_Y_ED50|Coordenada_UTM_X_ED50|Longitud_WGS84|Latitud_WGS84|
+----------------+--------------+--------------+----------+-----------------+-----------+--------------------+-----------+----------------------+------+-------+-------+-------+--------+-------------

7724

In [5]:
spark.sql(
    """
    SELECT Nom_carrer, Dia_mes, Numero_morts, Numero_vehicles_implicats, Descripcio_causa_vianant
    FROM incidents WHERE Numero_morts > 0
    ORDER BY Numero_morts DESC
    """
).show(10)

+--------------------+-------+------------+-------------------------+------------------------+
|          Nom_carrer|Dia_mes|Numero_morts|Numero_vehicles_implicats|Descripcio_causa_vianant|
+--------------------+-------+------------+-------------------------+------------------------+
|Verdi            ...|      2|           1|                        1|    No és causa del  ...|
|Aragó / Comte d'U...|     11|           1|                        2|    No és causa del  ...|
|Agudes           ...|     23|           1|                        2|    No és causa del  ...|
|Mig (Ascendent)  ...|     24|           1|                        1|    No és causa del  ...|
|Meridiana        ...|     22|           1|                        3|    No és causa del  ...|
|Catalunya        ...|     26|           1|                        2|    No és causa del  ...|
|Mare de Déu de Mo...|     28|           1|                        2|    No és causa del  ...|
|Sant Antoni Maria...|     26|           1|       

In [6]:
# We could do the same query with the DataFram API
from pyspark.sql.functions import col, desc
(df_incidents.select("Nom_carrer", "Dia_mes", "Numero_morts", "Numero_vehicles_implicats", "Descripcio_causa_vianant", "Num_postal ")
#     .where(col("Num_postal ") IS NULL)
     .orderBy("Numero_morts", ascending=False).show(10))

+--------------------+-------+------------+-------------------------+------------------------+-----------+
|          Nom_carrer|Dia_mes|Numero_morts|Numero_vehicles_implicats|Descripcio_causa_vianant|Num_postal |
+--------------------+-------+------------+-------------------------+------------------------+-----------+
|Aragó / Comte d'U...|     11|           1|                        2|    No és causa del  ...|  0113 0113|
|Guinardó / Telègr...|     19|           1|                        2|    No és causa del  ...|  0030 0030|
|Agudes           ...|     23|           1|                        2|    No és causa del  ...|  0060 0082|
|Mare de Déu de Mo...|     28|           1|                        2|    No és causa del  ...|  0039 0041|
|Sant Antoni Maria...|     26|           1|                        1|    No és causa del  ...|  0268 0268|
|Verdi            ...|      2|           1|                        1|    No és causa del  ...|  0103 0103|
|Diagonal         ...|     31|       

In [8]:
#df_incidents_type.show(5)


In [21]:
spark.catalog.listDatabases()

[Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/Users/pau/coding-projects/ita-barcelona-traffic-incidents/notebook/spark-warehouse')]

In [25]:
#spark.catalog.listTables()

In [24]:
#spark.catalog.listColumns("incidents")

#### Causes according the pedestrian - Causes a causa del vianant

Anàlisis de les principals causes a causa del vianant.
Creem un llistat amb les causes i el nombre de vegades

In [9]:
causes = (df_incidents.select("Descripcio_causa_vianant")
          .rdd.map(lambda row: row["Descripcio_causa_vianant"])
          .map(lambda x: (x, 1))
          .reduceByKey(lambda a,b: a+b))

In [10]:
causes.take(20)

[('No és causa del  vianant', 7249),
 ('Creuar per fora pas de vianants', 147),
 ('Altres', 92),
 ('Desobeir el senyal del semàfor', 181),
 ('Transitar a peu per la calçada', 48),
 ('Desobeir altres senyals', 7)]

Podem aconseguir el mateix fent servir DataFrame API enlloc de RDD

In [11]:
causes = df_incidents.groupBy("Descripcio_causa_vianant").count()

In [12]:
causes.orderBy("count", ascending=False).show(truncate=False)

+-------------------------------+-----+
|Descripcio_causa_vianant       |count|
+-------------------------------+-----+
|No és causa del  vianant       |7249 |
|Desobeir el senyal del semàfor |181  |
|Creuar per fora pas de vianants|147  |
|Altres                         |92   |
|Transitar a peu per la calçada |48   |
|Desobeir altres senyals        |7    |
+-------------------------------+-----+



In [13]:
causes_list = [(row['Descripcio_causa_vianant'], row['count']) for row in causes.orderBy("count", ascending=False).collect()]
causes_list

[('No és causa del  vianant', 7249),
 ('Desobeir el senyal del semàfor', 181),
 ('Creuar per fora pas de vianants', 147),
 ('Altres', 92),
 ('Transitar a peu per la calçada', 48),
 ('Desobeir altres senyals', 7)]

In [14]:
percent_no_causa_vianant = 100 * causes_list[0][1] / df_incidents.count()

In [15]:
percent_no_causa_vianant

93.8503366131538

El 93.85% dels accidents de l'any 2023 no són causa del vianant. De la resta que si ho són, la principal causa ha estat desobeïr el senyal del semàfor, seguida per creuar fora del pas de vianants.

### df_incidents_type

In [16]:
df_incidents_type.show(5, truncate=False)
df_incidents_type.count()

+----------------+--------------+-------------+----------+----------+-----------+--------------------------------------------------+----------+----------------------+------+-------+--------+-------+--------+---------------+-------------------------+---------------------+---------------------+--------------+-------------+
|Numero_expedient|Codi_districte|Nom_districte|Codi_barri|Nom_barri |Codi_carrer|Nom_carrer                                        |Num_postal|Descripcio_dia_setmana|NK_Any|Mes_any|Nom_mes |Dia_mes|Hora_dia|Descripcio_torn|Descripcio_tipus_accident|Coordenada_UTM_X_ED50|Coordenada_UTM_Y_ED50|Longitud_WGS84|Latitud_WGS84|
+----------------+--------------+-------------+----------+----------+-----------+--------------------------------------------------+----------+----------------------+------+-------+--------+-------+--------+---------------+-------------------------+---------------------+---------------------+--------------+-------------+
|2023S004127     |-1           

7823

In [18]:
causes_type = df_incidents_type.groupBy("Descripcio_tipus_accident").count()
causes_type.orderBy("count", ascending=False).show(truncate=False)

+----------------------------------+-----+
|Descripcio_tipus_accident         |count|
+----------------------------------+-----+
|Col.lisió lateral                 |1812 |
|Abast                             |1638 |
|Col.lisió fronto-lateral          |1234 |
|Atropellament                     |927  |
|Xoc contra element estàtic        |653  |
|Caiguda (dues rodes)              |580  |
|Caiguda interior vehicle          |367  |
|Abast multiple                    |209  |
|Altres                            |181  |
|Col.lisió frontal                 |143  |
|Encalç                            |26   |
|Desconegut                        |19   |
|Xoc amb animal a la calçada       |13   |
|Sortida de via amb xoc o col.lisió|12   |
|Bolcada (més de dues rodes)       |7    |
|Sortida de via amb bolcada        |2    |
+----------------------------------+-----+



Les col·lisions per **abast** es produeixen si dos vehicles topen de manera que la part anterior de l’un impacta contra la part posterior de l’altre. Quan hi ha més de dos vehicles implicats es parla d’abast múltiple.

col·lisió per **encalç**: Col·lisió en què la part frontal d'un vehicle xoca amb la part posterior d'un altre vehicle.

Abast i encalç són el mateix.


In [20]:
causes_type_list = [(row['Descripcio_tipus_accident'], row['count']) for row in causes_type.orderBy("count", ascending=False).collect()]
causes_type_list

[('Col.lisió lateral', 1812),
 ('Abast', 1638),
 ('Col.lisió fronto-lateral', 1234),
 ('Atropellament', 927),
 ('Xoc contra element estàtic', 653),
 ('Caiguda (dues rodes)', 580),
 ('Caiguda interior vehicle', 367),
 ('Abast multiple', 209),
 ('Altres', 181),
 ('Col.lisió frontal', 143),
 ('Encalç', 26),
 ('Desconegut', 19),
 ('Xoc amb animal a la calçada', 13),
 ('Sortida de via amb xoc o col.lisió', 12),
 ('Bolcada (més de dues rodes)', 7),
 ('Sortida de via amb bolcada', 2)]