# Práctica NoSQL Sergio Yunta Martín
El conjunto de datos elegido es el de la calidad del aire de la ciudad de Madrid, es un tema que me llama mucho a atención, además que recientemente me he fijado en las aplicaciones relacionadas con este tema en el teléfono móvil y ordenador.

## Descripción de caso de uso

La idea con este conjunto de datos es estudiar la calidad del aire en diferentes momentos del año en la ciudad de Madrid, así como las zonas más y menos afectadas por la contaminación para poder generar métricas y tomar decisiones para mejorar la situación y avanzar hacia una ciudad más sostenible.

Se desea acceder a los datos con la granularidad diaria, mensual e incluso obtener una media del año.

En este caso de uso, ya que estamos de analítica, tenemos que usar el data store **mongoDB**. Queremos generar métricas y realizar operaciones sobre un gran número de datos, si lo indexamos correctamente (como veremos más adelante) este data store es idóneo para este caso de uso. Para usar **neo4j** necesitaríamos estar hablando de patrones y/o recomendaciones, que no es el caso.

## Modelado
De cara a modelar los datos, tenemos que tener en cuenta que nos interesa tener la mayor cantidad de información de golpe, las relaciones no son idóneas por lo que idealmente guardaremos toda la información en una única colección.

Esto lo tenemos que tener en cuenta a la hora de leer los datos. La idea es interactuar sobre mediciones, y no tanto sobre estaciones de medida, además, el número de estas no es muy grande ya que estamos hablando de datos sólo de la ciudad de Madrid, por lo que la redundancia no es un problema.

También es importante saber el patrón de acceso a los datos, qué consultas vamos a hacer y por tanto qué datos vamos a necesitar para ellas.

Con esto en mente, el modelo de datos quedaría así:

![Modelo_Sergio_Yunta](./Modelo_SY.jpg)


Finalmente la decisión ha sido embeber los datos de las estaciones dentro de las mediciones ya que la redundancia no es un problema porque el tamaño del documento no aumenta mucho. Por otro lado, se han omitido un montón de datos que para este caso de uso no aplican, o que se indican de maneras diferentes en el fichero de datos. Por último, se ha decidido que las mediciones diarias sean un array de valores así como separar la fecha para mejorar el acceso.

## Paso 1: Limpieza de los datos
Para saber cómo usar los datos y por tanto, limpiarlos, debemos informarnos un poco sobre lo que estamos manejando. En este caso, he mirado diferentes referencias sobre la medición de la capacidad del aire en España, las referencias utilizadas son las siguientes:

- [Interpretación csv estaciones](https://datos.madrid.es/FWProjects/egob/Catalogo/MedioAmbiente/Aire/Ficheros/Estructura_C.A.Estaciones.pdf)
- [Interpretación csv mediciones](https://datos.madrid.es/FWProjects/egob/Catalogo/MedioAmbiente/Aire/Ficheros/Interprete_ficheros_%20calidad_%20del_%20aire_global.pdf)
- [Explicación del Índice de Calidad del Aire (ICA) del gobierno de España](https://www.miteco.gob.es/es/calidad-y-evaluacion-ambiental/temas/atmosfera-y-calidad-del-aire/calidad-del-aire/ica.html)
- [¿Cómo se mide la calidad del aire? de fundación aquae](https://www.fundacionaquae.org/wiki/como-se-mide-calidad-aire/#:~:text=La%20temperatura%2C%20la%20humedad%2C%20los,medici%C3%B3n%20del%20aire%20que%20respiramos.)
- [Otra explicación sobre el ICA de eurofins (valores tabla inferior)](https://www.eurofins-environment.es/es/indice-de-calidad-del-aire/)

De estas referencias, podemos extraer ciertas conclusiones. Realmente nos interesan las 5 magnitudes que forman el código ICA junto con sus valores recomendados (se simplifican). Estas son:

| Código |      Magnitud        | Calidad Buena  | Calidad Regular | Calidad Mala   |
|:------:|:--------------------:|:--------------:|:---------------:|:--------------:|
|   01   | Dióxido de Azufre    |     0 - 200    |    201 - 350    |     > 351      |
|   08   | Dióxido de Nitrógeno |     0 - 100    |    101 - 200    |     > 201      |
|   09   | Partículas < 2.5 µm  |     0 - 20     |     21 - 25     |     > 26       |
|   10   | Partículas < 10 µm   |     0 - 40     |     41 - 50     |     > 51      |
|   14   | Ozono                |     0 - 100    |    101 - 130    |     > 131      |

Otro aspecto que puede interesar limpiar son las medidas que no son válidas, en los csv junto con la medida horaria se indica si es válida o no, así que también nos podemos deshacer de ellas.


In [1]:
VALID_MAGNITUDES = [1, 8, 9, 10, 14]

In [2]:
!pip install pandas



In [3]:
import pandas as pd

def read_stations(file_path: str) -> dict:
    """
    Read a file with air quality stations.
    
    Parameters
    ----------
    file_path : str
        Name of the file with the data

    Returns
    -------
    dict
        dictionary with station information ready to insert to mongoDB
    """
    
    stations_df = pd.read_csv(file_path, sep=";")

    # keep columns that we need
    stations_df = stations_df[["CODIGO", "ESTACION", "NOM_TIPO", "Fecha alta", "LONGITUD", "LATITUD"]]
    stations_df["location"] = stations_df.apply(lambda row: [row["LONGITUD"], row["LATITUD"]], axis=1)
    stations_df = stations_df[["CODIGO", "ESTACION", "NOM_TIPO", "Fecha alta", "location"]]
    stations_df.rename(columns={"CODIGO": "_id", "ESTACION": "location_name", "NOM_TIPO": "type", "Fecha alta": "start_date"}, inplace=True)
    stations = stations_df.to_dict("records") # it generates a list, not a dict

    stations_dict = {}
    for station in stations:
        stations_dict[str(station["_id"])] = station

    return stations_dict


In [4]:
from typing import List

def _convert_columns_into_list(row):
    """
    Convert dataframe row into a list

    Parameters
    ----------
    row : Dataframe Row
        row of a dataframe for applying condition
    
    Returns
    -------
    List :
        list with transformed columns
    """
    values = []
    for i in range(1, 25):
        key = f"0{i}" if i < 10 else str(i)
        if row[f"V{key}"] == "V":
            values.append(row[f"H{key}"])
    
    return values


def read_measurements(file_path: str, stations: dict) -> List[dict]:
    """
    Read files in a path with air quality measurements.

    Parameters
    ----------
    file_path : str
        path with multiple csv files with data
    stations : dict
        dictionary with stations data
    """
    measurements_df = pd.read_csv(file_path, sep=";")
    measurements_df = measurements_df[measurements_df["MAGNITUD"].isin(VALID_MAGNITUDES)]
    # Transform colums into array of values
    measurements_df["values"] = measurements_df.apply(_convert_columns_into_list, axis=1)
    measurements_df["station"] = measurements_df.apply(lambda row: stations[row["PUNTO_MUESTREO"].split("_")[0]], axis=1)
    # Keep columns that we need
    measurements_df = measurements_df[["ANO", "MES", "DIA", "MAGNITUD", "values", "station"]]
    measurements_df.rename(columns={"ANO": "year", "MES": "month", "DIA": "day", "MAGNITUD": "magnitude"}, inplace=True)

    return measurements_df.to_dict("records") # it generates a list, not a dict


## Paso 2: Insertamos los registros en la base de datos
Ya tenemos las funciones que van a limpiar y formatear los datos según nos interesan para las operaciones que realizaremos a continuación.

El siguiente paso es conectarnos con la base de datos, recrearla y rellenarla con los datos que vamos a parsear.

In [5]:
!pip install pymongo



In [6]:
import os
from pymongo import MongoClient

# client = MongoClient("mongodb://nosql:nosql@mongo:27017/")
client = MongoClient("mongodb://nosql:nosql@localhost:27017/")

# Reset database
client.drop_database("datahack_task2")

db = client["datahack_task2"]
measurements = db["measurements"]

stations_dict = read_stations("./data/informacion_estaciones_red_calidad_aire.csv")
data_path = "./data/metrics"

# Insert documents by month
for file in os.listdir(data_path):
    values = read_measurements(os.path.join(data_path, file), stations_dict)
    measurements.insert_many(values)

measurement = db.measurements.count_documents({})
print(measurement)

22309


Ahora vamos a crear funciones que hagan el código de la consulta más legible, pero que nos ayuden a dar formato a la salida de la misma, por ejemplo, transformando los códigos de magnitudes por su nombre o el valor por su interpretación en la tabla mencionada anteriormente.

In [39]:
def transform_code_to_magnitude(magnitude):
    return {
        "$switch": {
            "branches": [
                { "case": { "$eq": [magnitude, 1] }, "then": "sulfur dioxide" },
                { "case": { "$eq": [magnitude, 8] }, "then": "nitrogen dioxide" },
                { "case": { "$eq": [magnitude, 9] }, "then": "PM2.5" },
                { "case": { "$eq": [magnitude, 10] }, "then": "PM10" },
                { "case": { "$eq": [magnitude, 14] }, "then": "Ozone" }
            ]
        }
    }

def transform_number_to_quality(key, value):
    return { 
        "$switch": {
            "branches": [
                # Dioxido de azufre (cod 1)
                { "case": { "$and": [{"$eq": [key, 1]}, { "$gte": [value, 0] }, { "$lte": [value, 200] }] }, "then": { "quality": "good", "order": 3 } },
                { "case": { "$and": [{"$eq": [key, 1]}, { "$gt": [value, 200] }, { "$lte": [value, 350] }] }, "then": { "quality": "moderate", "order": 2 } },
                { "case": { "$and": [{"$eq": [key, 1]}, { "$gt": [value, 350] }] }, "then": { "quality": "poor", "order": 1 } },
                # Dioxido de nitrogeno (cod 8)
                { "case": { "$and": [{"$eq": [key, 8]}, { "$gte": [value, 0] }, { "$lte": [value, 100] }] }, "then": { "quality": "good", "order": 3 } },
                { "case": { "$and": [{"$eq": [key, 8]}, { "$gt": [value, 100] }, { "$lte": [value, 200] }] }, "then": { "quality": "moderate", "order": 2 } },
                { "case": { "$and": [{"$eq": [key, 8]}, { "$gt": [value, 200] }] }, "then": { "quality": "poor", "order": 1 } },
                # PM2.5 (cod 9)
                { "case": { "$and": [{"$eq": [key, 9]}, { "$gte": [value, 0] }, { "$lte": [value, 20] }] }, "then": { "quality": "good", "order": 3 } },
                { "case": { "$and": [{"$eq": [key, 9]}, { "$gt": [value, 20] }, { "$lte": [value, 25] }] }, "then": { "quality": "moderate", "order": 2 } },
                { "case": { "$and": [{"$eq": [key, 9]}, { "$gt": [value, 25] }] }, "then": { "quality": "poor", "order": 1 } },
                # PM10 (cod 10)
                { "case": { "$and": [{"$eq": [key, 10]}, { "$gte": [value, 0] }, { "$lte": [value, 40] }] }, "then": { "quality": "good", "order": 3 } },
                { "case": { "$and": [{"$eq": [key, 10]}, { "$gt": [value, 40] }, { "$lte": [value, 50] }] }, "then": { "quality": "moderate", "order": 2 } },
                { "case": { "$and": [{"$eq": [key, 10]}, { "$gt": [value, 50] }] }, "then": { "quality": "poor", "order": 1 } },
                # Ozono (cod 14)
                { "case": { "$and": [{"$eq": [key, 14]}, { "$gte": [value, 0] }, { "$lte": [value, 100] }] }, "then": { "quality": "good", "order": 3 } },
                { "case": { "$and": [{"$eq": [key, 14]}, { "$gt": [value, 100] }, { "$lte": [value, 130] }] }, "then": { "quality": "moderate", "order": 2 } },
                { "case": { "$and": [{"$eq": [key, 14]}, { "$gt": [value, 130] }] }, "then": { "quality": "poor", "order": 1 } },
            ],
            "default": { "quality": "unknown", "order": 4 }
        }
    }

## Calidad del aire en un día específico

In [42]:
day = 26
month = 7
year = 2024

query_pipeline  = [
    { "$match": {"day": day, "month": month, "year": year} },
    { "$unwind": "$values" },
    { "$group": {
        "_id": {"magnitude": "$magnitude", "station": "$station.location_name"},
        "avg_value": {"$avg": "$values"}
    }},
    { "$group": {
        "_id": "$_id.station",
        "magnitudes": {
            "$push": { 
                "k": "$_id.magnitude",
                "v": "$avg_value"
            }
        }
    }},
    { "$addFields": {
        "magnitudes": {
            "$map": {
                "input": "$magnitudes",
                "as": "current",
                "in": {
                    "k": transform_code_to_magnitude("$$current.k"),
                    "v": transform_number_to_quality("$$current.k", "$$current.v")
                }
            }
        }
    }},
    { "$addFields": {
        "min_priority": { "$min": {
            "$map": {
                "input": "$magnitudes",
                "as": "current",
                "in": "$$current.v.order"
            }}
        }
    }},
    { "$sort": { "min_priority": 1 }},
    { "$project": {
        "_id": 1,
        "magnitudes": { "$arrayToObject": "$magnitudes" }
    }},
]

results = db.measurements.aggregate(query_pipeline)

for result in results:
    print(result)

{'_id': 'Barajas Pueblo', 'magnitudes': {'Ozone': {'quality': 'moderate', 'order': 2}, 'nitrogen dioxide': {'quality': 'good', 'order': 3}}}
{'_id': 'Villaverde', 'magnitudes': {'nitrogen dioxide': {'quality': 'good', 'order': 3}, 'Ozone': {'quality': 'moderate', 'order': 2}}}
{'_id': 'Barrio del Pilar', 'magnitudes': {'nitrogen dioxide': {'quality': 'good', 'order': 3}, 'Ozone': {'quality': 'moderate', 'order': 2}}}
{'_id': 'Plaza del Carmen', 'magnitudes': {'nitrogen dioxide': {'quality': 'good', 'order': 3}, 'sulfur dioxide': {'quality': 'good', 'order': 3}, 'Ozone': {'quality': 'moderate', 'order': 2}}}
{'_id': 'Juan Carlos I', 'magnitudes': {'nitrogen dioxide': {'quality': 'good', 'order': 3}, 'Ozone': {'quality': 'moderate', 'order': 2}}}
{'_id': 'Parque del Retiro', 'magnitudes': {'Ozone': {'quality': 'moderate', 'order': 2}, 'nitrogen dioxide': {'quality': 'good', 'order': 3}}}
{'_id': 'Arturo Soria', 'magnitudes': {'nitrogen dioxide': {'quality': 'good', 'order': 3}, 'Ozone': 

Para mejorar la consulta, debemos crear los índices, se accede por día/mes/año, así que debemos crear los índices ahí

In [49]:
import pymongo

explain_output = db.command('aggregate', 'measurements', pipeline=query_pipeline, explain=True)

print(explain_output)

# Creamos índice para la consulta
try:
    db.measurements.drop_index("DateIdx")
except Exception as _: # does not exist
    pass

db.measurements.create_index( 
    [("day", pymongo.ASCENDING), ("month", pymongo.ASCENDING), ("year", pymongo.ASCENDING)], name = "DateIdx")


explain_output = db.command('aggregate', 'measurements', pipeline=query_pipeline, explain=True)

print(explain_output)

{'explainVersion': '1', 'stages': [{'$cursor': {'queryPlanner': {'namespace': 'datahack_task2.measurements', 'indexFilterSet': False, 'parsedQuery': {'$and': [{'day': {'$eq': 26}}, {'month': {'$eq': 7}}, {'year': {'$eq': 2024}}]}, 'queryHash': 'BCCE341F', 'planCacheKey': '7B84F6E3', 'maxIndexedOrSolutionsReached': False, 'maxIndexedAndSolutionsReached': False, 'maxScansToExplodeReached': False, 'winningPlan': {'stage': 'PROJECTION_DEFAULT', 'transformBy': {'magnitude': 1, 'station.location_name': 1, 'values': 1, '_id': 0}, 'inputStage': {'stage': 'COLLSCAN', 'filter': {'$and': [{'day': {'$eq': 26}}, {'month': {'$eq': 7}}, {'year': {'$eq': 2024}}]}, 'direction': 'forward'}}, 'rejectedPlans': []}}}, {'$unwind': {'path': '$values'}}, {'$group': {'_id': {'magnitude': '$magnitude', 'station': '$station.location_name'}, 'avg_value': {'$avg': '$values'}}}, {'$group': {'_id': '$_id.station', 'magnitudes': {'$push': {'k': '$_id.magnitude', 'v': '$avg_value'}}}}, {'$addFields': {'magnitudes': {'