<p style="text-align:center">
    <img src="https://moodle.datahack.es/pluginfile.php/1/core_admin/logo/0x150/1634561859/Logo%20turquesa2.png" />
</p>

# <strong>Práctica NoSQL</strong>

<strong>Programa</strong>: Máster Experto en Big Data Architecture & Engineering

<strong>Universidad</strong>: Datahack

<strong>Profesor</strong>: Rafael Garrote

<strong>Bloque</strong>: NoSQL

<strong>Alumno</strong>: Marco García González

<strong>Fecha</strong>: 13 de febrero del 2025

# Tabla de Contenidos

<div class="alert alert-block alert-info" style="margin-top: 20px">
  <font size = 3>
    <ol>
      <li><a href="#item31">Elección del Dataset y Caso de Uso</a></li> 
      <li><a href="#item32">Diagrama del Modelo de Datos</a></li>  
      <li><a href="#item33">Configuración Inicial</a></li> 
      <li><a href="#item34">Carga y Procesado de Datos</a></li>
      <li><a href="#item35">Creación de Índices</a></li>
      <li><a href="#item36">Consultas</a></li>
      <li><a href="#item37">Conclusiones</a></li>
      <li><a href="#item38">Anexo</a></li>
    </ol>
  </font>
</div>

<h3 id="item31">Elección del Dataset y Caso de Uso</h3>

<strong>Dataset seleccionado</strong>: Calidad del aire en la ciudad de Madrid (formato CSV)

<strong>Caso de uso</strong>: Analizar la calidad del aire en Madrid a lo largo del tiempo, identificando las estaciones con mayores niveles de contaminación y las tendencias de contaminantes específicos (por ejemplo, NO2, PM10, O3) en diferentes zonas de la ciudad. Específicamente, se podría querer responder con exactitud a algunas de las preguntas que se muestran a continuación:

* ¿Cuáles son las estaciones con la peor calidad del aire durante un año específico?
* ¿Qué magnitudes de contaminación se están midiendo y cómo afectan a diferentes zonas de la ciudad?
* ¿Cuál es la tendencia de la calidad del aire a lo largo del tiempo?

<strong>Justificación del caso de uso</strong>

MongoDB es una base de datos NoSQL orientada a documentos, lo que la hace ideal para manejar datos semi-estructurados como los del dataset de calidad del aire. Además, su framework de agregación permite realizar análisis complejos sobre grandes volúmenes de datos, como agrupar por estaciones, fechas o contaminantes. La capacidad de indexar campos específicos también optimiza las consultas, lo que es crucial dado el tamaño del dataset.

<h3 id="item32">Diagrama del Modelo de Datos</h3>

El modelo de datos en MongoDB es flexible, ya que no requiere un esquema fijo. Sin embargo, los documentos se estructuran de la siguiente manera:

Colección: <strong>estaciones</strong> donde cada documento representa una estación de medición.

```python
{
  "numero": "int",
  "nombre": "string",
  "direccion": "string",
  "longitud": "string",
  "latitud": "string",
  "altitud": "float",
  "tipo": "string",
  "contaminantes_medidos": ["string"],
  "sensores_meteorologicos": ["string"]
}
```

Colección: <strong>mediciones</strong> donde cada documento representa una medición horaria

```python
{
  "punto_muestreo": "string",
  "ano": "int",
  "mes": "int",
  "dia": "int",
  "hora": "int",
  "magnitud": "int",
  "valor": "float",
  "valido": "string"
}
```

<h3 id="item33">Configuración Inicial</h3>

Para realizar la conexión con MongoDB, se utiliza el paquete MongoDB. Este paquete permite crear bases de datos, colecciones y realizar operaciones CRUD (Create Read Update and Delete).

El primer paso para utilizar el driver de Python para conectarse y operar sobre MongoDB es importar la librería del driver.

In [2]:
# Instalación de la librería de Python para interactuar con MongoDB
!pip install pymongo
!pip install openpyxl
!pip install xlrd
!pip install pandas

import os
import pymongo
import pandas as pd
from pymongo import MongoClient



<strong>Conexión a MongoDB<strong>

In [31]:
# Conexión a MongoDB
client = MongoClient('mongodb://nosql:nosql@mongo:27017/')
db = client["calidad_aire_madrid"]

In [32]:
# Verificación de que la conexión funciona
try:
    client.server_info()
    print("Conexión a MongoDB exitosa.")
except Exception as e:
    print(f"Error al conectar a MongoDB: {e}")

Conexión a MongoDB exitosa.


<h3 id="item34">Carga y Procesado de Datos</h3>


<strong>Carga del Fichero de Estaciones</strong>

El fichero informacion_estaciones_red_calidad_aire.csv contiene información sobre las estaciones de medición. Se carga en un DataFrame de Pandas y luego se inserta en MongoDB.

In [33]:
ruta_estaciones = "./datos/informacion_estaciones_red_calidad_aire.csv"
try:
    df_estaciones = pd.read_csv(ruta_estaciones, encoding="utf-8-sig", sep=",")
    print("Fichero de estaciones cargado correctamente.")
    print("Columnas del DataFrame:", df_estaciones.columns.tolist())  # Comprobar columnas
    print("\nPrimeras 3 filas del DataFrame:")
    print(df_estaciones.head(3))
except Exception as e:
    print(f"Error al cargar el fichero de estaciones: {e}")

Fichero de estaciones cargado correctamente.
Columnas del DataFrame: ['CODIGO', 'CODIGO_CORTO', 'ESTACION', 'DIRECCION', 'LONGITUD_ETRS89', 'LATITUD_ETRS89', 'ALTITUD', 'COD_TIPO', 'NOM_TIPO', 'NO2', 'SO2', 'CO', 'PM10', 'PM2_5', 'O3', 'BTX', 'COD_VIA', 'VIA_CLASE', 'VIA_PAR', 'VIA_NOMBRE', 'Fecha alta', 'COORDENADA_X_ETRS89', 'COORDENADA_Y_ETRS89', 'LONGITUD', 'LATITUD']

Primeras 3 filas del DataFrame:
     CODIGO  CODIGO_CORTO          ESTACION  \
0  28079004             4   Plaza de España   
1  28079008             8  Escuelas Aguirre   
2  28079011            11     Ramón y Cajal   

                                          DIRECCION LONGITUD_ETRS89  \
0                                   Plaza de España    3°42'43.91"O   
1                   Entre C/ Alcalá y C/ O’ Donell     3°40'56.22"O   
2  Avda. Ramón y Cajal  esq. C/ Príncipe de Vergara    3°40'38.50"O   

  LATITUD_ETRS89  ALTITUD COD_TIPO        NOM_TIPO NO2  ...  BTX COD_VIA  \
0  40°25'25.98"N      637       UT  Urbana

In [35]:
df_estaciones.head(5)

Unnamed: 0,CODIGO,CODIGO_CORTO,ESTACION,DIRECCION,LONGITUD_ETRS89,LATITUD_ETRS89,ALTITUD,COD_TIPO,NOM_TIPO,NO2,...,BTX,COD_VIA,VIA_CLASE,VIA_PAR,VIA_NOMBRE,Fecha alta,COORDENADA_X_ETRS89,COORDENADA_Y_ETRS89,LONGITUD,LATITUD
0,28079004,4,Plaza de España,Plaza de España,"3°42'43.91""O","40°25'25.98""N",637,UT,Urbana tráfico,X,...,,273600,PLAZA,DE,ESPAÑA,01/12/1998,439579.3291,4475049.263,-3.712257,40.423882
1,28079008,8,Escuelas Aguirre,Entre C/ Alcalá y C/ O’ Donell,"3°40'56.22""O","40°25'17.63""N",672,UT,Urbana tráfico,X,...,X,18900,CALLE,DE,ALCALA,01/12/1998,442117.2366,4474770.696,-3.682316,40.421553
2,28079011,11,Ramón y Cajal,Avda. Ramón y Cajal esq. C/ Príncipe de Vergara,"3°40'38.50""O","40°27'5.29""N",709,UT,Urbana tráfico,X,...,X,610450,CALLE,DEL,PRINCIPE DE VERGARA,01/12/1998,442564.0457,4478088.595,-3.677349,40.451473
3,28079016,16,Arturo Soria,C/ Arturo Soria esq. C/ Vizconde de los Asilos,"3°38'21.17""O","40°26'24.20""N",695,UF,Urbana fondo,X,...,,798700,CALLE,DEL,VIZCONDE DE LOS ASILOS,01/12/1998,445786.1729,4476796.019,-3.639242,40.440046
4,28079017,17,Villaverde,C/ Juan Peñalver,"3°42'47.89""O","40°20'49.74""N",601,UF,Urbana fondo,X,...,,417200,CALLE,DE,JUAN PEÑALVER,01/12/1998,439420.7015,4466532.455,-3.713317,40.347147


Se transforma el DataFrame en un formato adecuado para MongoDB y se inserta en la colección estaciones.

In [36]:
# Creación de la colección de estaciones
estaciones = db["estaciones"]

# Conversión del DataFrame a una lista de diccionarios
datos_estaciones = df_estaciones.to_dict("records")

# Inserción de los datos en MongoDB
try:
    estaciones.insert_many(df_estaciones.to_dict("records"))
    print("\nDatos de estaciones insertados correctamente.")
    # Mostrar 2 documentos de ejemplo
    print("\nEjemplo de documentos en 'estaciones':")
    for doc in estaciones.find().limit(2):
        print("\n", doc)
except Exception as e:
    print(f"Error al insertar datos de estaciones: {e}")


Datos de estaciones insertados correctamente.

Ejemplo de documentos en 'estaciones':

 {'_id': ObjectId('67aedd84362dc848425cb919'), 'CODIGO': 28079004, 'CODIGO_CORTO': 4, 'ESTACION': 'Plaza de España', 'DIRECCION': 'Plaza de España', 'LONGITUD_ETRS89': '3°42\'43.91"O', 'LATITUD_ETRS89': '40°25\'25.98"N', 'ALTITUD': 637, 'COD_TIPO': 'UT', 'NOM_TIPO': 'Urbana tráfico', 'NO2': 'X', 'SO2': 'X', 'CO': 'X', 'PM10': nan, 'PM2_5': nan, 'O3': nan, 'BTX': nan, 'COD_VIA': 273600, 'VIA_CLASE': 'PLAZA', 'VIA_PAR': 'DE', 'VIA_NOMBRE': 'ESPAÑA', 'Fecha alta': '01/12/1998', 'COORDENADA_X_ETRS89': 439579.3291, 'COORDENADA_Y_ETRS89': 4475049.263, 'LONGITUD': -3.7122567, 'LATITUD': 40.4238823}

 {'_id': ObjectId('67aedd84362dc848425cb91a'), 'CODIGO': 28079008, 'CODIGO_CORTO': 8, 'ESTACION': 'Escuelas Aguirre', 'DIRECCION': 'Entre C/ Alcalá y C/ O’ Donell ', 'LONGITUD_ETRS89': '3°40\'56.22"O', 'LATITUD_ETRS89': '40°25\'17.63"N', 'ALTITUD': 672, 'COD_TIPO': 'UT', 'NOM_TIPO': 'Urbana tráfico', 'NO2': 'X'

<strong>Carga de los Ficheros de Mediciones</strong>

Se cargan los 12 ficheros de mediciones correspondientes al año 2024 (por ejemplo, ene_mo24.csv, feb_mo24.csv, etc.).

In [37]:
# Lista de ficheros de mediciones
ficheros_mediciones = [
    "ene_mo24.csv", "feb_mo24.csv", "mar_mo24.csv", "abr_mo24.csv",
    "may_mo24.csv", "jun_mo24.csv", "jul_mo24.csv", "ago_mo24.csv",
    "sep_mo24.csv", "oct_mo24.csv", "nov_mo24.csv", "dic_mo24.csv"
]

# Carga y concatenación de los ficheros
df_mediciones = pd.DataFrame()
for fichero in ficheros_mediciones:
    ruta_fichero = os.path.join("./datos", fichero)
    try:
        df_temp = pd.read_csv(ruta_fichero, encoding="utf-8-sig", sep=",")
        df_mediciones = pd.concat([df_mediciones, df_temp], ignore_index=True)
        print(f"Fichero {fichero} cargado correctamente.")
    except Exception as e:
        print(f"Error al cargar el fichero {fichero}: {e}")

# Verificación de columnas y datos
print("\nColumnas del DataFrame de mediciones:", df_mediciones.columns.tolist())
print("\nPrimeras 2 filas del DataFrame de mediciones:")
print(df_mediciones.head(2))

Fichero ene_mo24.csv cargado correctamente.
Fichero feb_mo24.csv cargado correctamente.
Fichero mar_mo24.csv cargado correctamente.
Fichero abr_mo24.csv cargado correctamente.
Fichero may_mo24.csv cargado correctamente.
Fichero jun_mo24.csv cargado correctamente.
Fichero jul_mo24.csv cargado correctamente.
Fichero ago_mo24.csv cargado correctamente.
Fichero sep_mo24.csv cargado correctamente.
Fichero oct_mo24.csv cargado correctamente.
Fichero nov_mo24.csv cargado correctamente.
Fichero dic_mo24.csv cargado correctamente.

Columnas del DataFrame de mediciones: ['PROVINCIA', 'MUNICIPIO', 'ESTACION', 'MAGNITUD', 'PUNTO_MUESTREO', 'ANO', 'MES', 'DIA', 'H01', 'V01', 'H02', 'V02', 'H03', 'V03', 'H04', 'V04', 'H05', 'V05', 'H06', 'V06', 'H07', 'V07', 'H08', 'V08', 'H09', 'V09', 'H10', 'V10', 'H11', 'V11', 'H12', 'V12', 'H13', 'V13', 'H14', 'V14', 'H15', 'V15', 'H16', 'V16', 'H17', 'V17', 'H18', 'V18', 'H19', 'V19', 'H20', 'V20', 'H21', 'V21', 'H22', 'V22', 'H23', 'V23', 'H24', 'V24']

Primer

Los ficheros de mediciones tienen una estructura horaria (H01, V01, H02, V02, etc.). Se transforman estos datos en un formato más adecuado para MongoDB.

In [38]:
# Creación de una lista para almacenar los documentos de mediciones
documentos_mediciones = []

# Iteración sobre cada fila del DataFrame
for _, row in df_mediciones.iterrows():
    for hora in range(1, 25):
        documento = {
            "punto_muestreo": row["PUNTO_MUESTREO"],
            "ano": row["ANO"],
            "mes": row["MES"],
            "dia": row["DIA"],
            "hora": hora,
            "magnitud": row["MAGNITUD"],
            "valor": row[f"H{hora:02}"],
            "valido": row[f"V{hora:02}"] == "V"
        }
        documentos_mediciones.append(documento)

# Verificación el número de documentos generados
print(f"Número de documentos generados: {len(documentos_mediciones)}")

Número de documentos generados: 1121688


Se insertan los datos de mediciones en MongoDB:

In [39]:
# Creación de la colección de mediciones
mediciones = db["mediciones"]

# Inserción de los documentos en MongoDB
try:
    mediciones.insert_many(documentos_mediciones)
    print("\nDatos de mediciones insertados correctamente.")
    # Mostrar 2 documentos de ejemplo
    print("\nEjemplo de documentos en 'mediciones':")
    for doc in mediciones.find().limit(2):
        print("\n", doc)
except Exception as e:
    print(f"Error al insertar datos de mediciones: {e}")


Datos de mediciones insertados correctamente.

Ejemplo de documentos en 'mediciones':

 {'_id': ObjectId('67aeddc3362dc848425cb931'), 'punto_muestreo': '28079004_6_48', 'ano': 2024, 'mes': 1, 'dia': 1, 'hora': 1, 'magnitud': 6, 'valor': 0.4, 'valido': True}

 {'_id': ObjectId('67aeddc3362dc848425cb932'), 'punto_muestreo': '28079004_6_48', 'ano': 2024, 'mes': 1, 'dia': 1, 'hora': 2, 'magnitud': 6, 'valor': 0.4, 'valido': True}


<h3 id="item35">Creación de Índices</h3>


Para optimizar las consultas, se crean índices en los campos más utilizados. Para este caso de uso se ha decidido crear los índices "punto_muestreo" y "magnitud", ya que son campos importantes para después realizar las consultas.

In [40]:
# Creación de índices
mediciones.create_index([("punto_muestreo", pymongo.ASCENDING)])
mediciones.create_index([("magnitud", pymongo.ASCENDING)])

print("\nÍndices creados correctamente.")


Índices creados correctamente.


<h3 id="item36">Consultas</h3>


Para realizar las consultas correspondientes se utiliza el framework de agregación. Se trata de una de las ventajas que incorpora MongoDB para la realización de consultas complejas.

<strong>Obtención de las 5 estaciones con mayores niveles de NO2 en 2024</strong>

In [41]:
result = mediciones.aggregate([
    {"$match": {"magnitud": 8, "ano": 2024, "valido": True}},  # Filtrar por NO2 en 2024
    {"$group": {"_id": "$punto_muestreo", "promedio_no2": {"$avg": "$valor"}}},  # Agrupación por estación
    {"$sort": {"promedio_no2": -1}},  # Ordenación de mayor a menor
    {"$limit": 5},  # Límite a las 5 estaciones
    # Unión con la colección 'estaciones' para obtener el nombre
    {"$lookup": {
        "from": "estaciones",
        "localField": "_id",
        "foreignField": "CODIGO",  # Uso del campo correcto para hacer el join
        "as": "estacion_info"
    }},
    {"$unwind": "$estacion_info"},  # Deshacer el array resultante del lookup
    {"$project": {
        "nombre_estacion": "$estacion_info.ESTACION",
        "promedio_no2": 1,
        "_id": 0
    }}
])

print("\nTop 5 estaciones con mayor NO2 en 2024:")
for doc in result:
    print(doc)


Top 5 estaciones con mayor NO2 en 2024:


<strong>Tendencia mensual de PM10</strong>

In [42]:
# Obtener un punto_muestreo válido para PM10 (magnitud 9)
punto_valido = mediciones.find_one({"magnitud": 9}, {"punto_muestreo": 1})["punto_muestreo"]

result = mediciones.aggregate([
    {"$match": {"punto_muestreo": punto_valido, "magnitud": 9, "ano": 2024, "valido": True}},  # Filtrar por PM10 en 2024
    {"$group": {"_id": "$mes", "promedio_pm10": {"$avg": "$valor"}}},  # Agrupación por mes
    {"$sort": {"_id": 1}}  # Ordenar por mes
])

print(f"\nTendencias mensuales de PM10 en {punto_valido} (2024):")
for doc in result:
    print(doc)


Tendencias mensuales de PM10 en 28079008_9_47 (2024):
{'_id': 1, 'promedio_pm10': 9.663487738419619}
{'_id': 2, 'promedio_pm10': 8.681034482758621}
{'_id': 3, 'promedio_pm10': 8.895500725689406}
{'_id': 4, 'promedio_pm10': 8.172920065252855}
{'_id': 5, 'promedio_pm10': 6.471567267683772}
{'_id': 6, 'promedio_pm10': 10.103786816269285}
{'_id': 7, 'promedio_pm10': 9.666666666666666}
{'_id': 8, 'promedio_pm10': 12.050135501355014}
{'_id': 9, 'promedio_pm10': 11.73813169984686}
{'_id': 10, 'promedio_pm10': 13.21360544217687}
{'_id': 11, 'promedio_pm10': 10.255289139633286}
{'_id': 12, 'promedio_pm10': 11.227150537634408}


<strong>Top 10 estaciones con más días de niveles peligrosos de O3 en 2024</strong>

In [43]:
result = mediciones.aggregate([
    {"$match": {"magnitud": 7, "ano": 2024, "valido": True}},  # Filtrar por O3 en 2024
    {"$group": {
        "_id": {
            "punto_muestreo": "$punto_muestreo",
            "ano": "$ano",
            "mes": "$mes",
            "dia": "$dia"
        },
        "max_o3": {"$max": "$valor"}  # Obtenención del máximo valor de O3 por día
    }},
    {"$match": {"max_o3": {"$gt": 120}}},  # Filtrar días con O3 > 120 µg/m³
    {"$group": {
        "_id": "$_id.punto_muestreo",
        "dias_peligrosos": {"$sum": 1}  # Contado de los días peligrosos por estación
    }},
    {"$sort": {"dias_peligrosos": -1}},  # Ordenar de mayor a menor
    {"$limit": 10}  # Límite a las 10 estaciones
])

print("\nTop 10 estaciones con más días de O3 peligroso en 2024:")
for doc in result:
    print(doc)


Top 10 estaciones con más días de O3 peligroso en 2024:
{'_id': '28079017_7_8', 'dias_peligrosos': 90}
{'_id': '28079056_7_8', 'dias_peligrosos': 63}
{'_id': '28079054_7_8', 'dias_peligrosos': 58}
{'_id': '28079039_7_8', 'dias_peligrosos': 53}
{'_id': '28079011_7_8', 'dias_peligrosos': 30}
{'_id': '28079038_7_8', 'dias_peligrosos': 29}
{'_id': '28079055_7_8', 'dias_peligrosos': 29}
{'_id': '28079057_7_8', 'dias_peligrosos': 28}
{'_id': '28079047_7_8', 'dias_peligrosos': 27}
{'_id': '28079027_7_8', 'dias_peligrosos': 26}


<h3 id="item37">Conclusiones</h3>


<strong>¿Qué te parece la base de datos seleccionada como data store?</strong>

MongoDB es una excelente opción para el caso de uso presentado. Permite una estructura de datos flexible, facilita la carga de grandes volúmenes de datos y ofrece un potente framework de agregación que resulta muy útil para realizar un análisis y consultas complejas.

<strong>¿Qué te ha parecido el ejercicio?</strong>

Ha resultado un ejercicio enriquecedor y práctico, aunque desafiante. El trabajo con MongoDB y el uso de consultas de agregación son habilidades fundamentales en el cada vez más amplio mundo de la ingeniería de datos.

<strong>¿Qué has aprendido?</strong>

He aprendido a utilizar MongoDB para almacenamiento y consulta de datos. He entendido como la utilización de índices y el propio framework de agregación de MongoDB facilita la realización del análisis sobre un vaolúmen considerable de datos.

<strong>¿Cómo mejorarías la prática?</strong>

Sería útil incluir un componente práctico sobre cómo visualizar los resultados de las consultas en gráficos interactivos y, así, obtener una visión más clara de las tendencias de la calidad del aire. Aunque es posible que habría que utilizar alguna herramienta que no estuviera relacionado con las bases de datos no relacionales, quedando fuera del temario del bloque. También resultaría interesante explorar el uso de MongoDB Atlas para manejar grandes volúmenes de datos en la nube.

<h3 id="item38">Anexo</h3>


Borrado de colecciones para asegurar una nueva ejecución desde el principio:

In [30]:
# Eliminar colecciones existentes (si las hay)
if "estaciones" in db.list_collection_names():
    db["estaciones"].drop()
    print("Colección 'estaciones' eliminada.")

if "mediciones" in db.list_collection_names():
    db["mediciones"].drop()
    print("Colección 'mediciones' eliminada.")