In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=76addde4ddd8fdede0779848dc2ee3427184e801d580cb6d05fcc2c2512a57c4
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


Debido a la dificultad que nos encontramos al intentar realizar un ETL Job desde glue sobre los archivos JSON en el S3, decidimos procesarlos antes de subirlos al S3 para poder sobrellevar esta dificultad, esto lo hicimos con el siguiente código que permite transformar los archivos JSON a archivos PARQUET que nos permite trabajar con ellos de manera más sencilla como los necesitamos.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col
from pathlib import Path

# Crear una sesión de Spark
spark = SparkSession.builder.appName("Procesar SIATA JSON").getOrCreate()

# Definir rutas de entrada y salida
input_path = "./siata/raw/"
output_path = "./siata/processed/"

# Lista de archivos a procesar
json_files = ["co", "no", "so2", "no2", "ozono", "pm1", "pm10", "pm25"]

for file_name in json_files:
    # Cargar el archivo JSON
    df = spark.read.json(f"{input_path}/Datos_SIATA_Aire_{file_name}.json")

    # Descomponer el array 'datos' en filas individuales
    df = df.withColumn("datos", explode(col("datos")))

    # Seleccionar las columnas relevantes, incluyendo los campos dentro de 'datos'
    df = df.select(
        col("latitud"),
        col("codigoSerial"),
        col("datos.variableConsulta"),
        col("datos.fecha"),
        col("datos.calidad"),
        col("datos.valor"),
        col("nombre"),
        col("nombreCorto"),
        col("longitud")
    )



    # Guardar el resultado en formato Parquet
    df.coalesce(1).write.mode("overwrite").parquet(f"{output_path}{file_name}_processed.parquet")


    # Si prefieres guardarlo en CSV, puedes usar la siguiente línea en lugar del Parquet:
    # df.write.mode("overwrite").csv(f"{output_path}{file_name}_processed.csv", header=True)

    print(f"Archivo {file_name} procesado y guardado en {output_path}")

# Detener la sesión de Spark
spark.stop()


Archivo co procesado y guardado en ./siata/processed/
Archivo no procesado y guardado en ./siata/processed/
Archivo so2 procesado y guardado en ./siata/processed/
Archivo no2 procesado y guardado en ./siata/processed/
Archivo ozono procesado y guardado en ./siata/processed/
Archivo pm1 procesado y guardado en ./siata/processed/
Archivo pm10 procesado y guardado en ./siata/processed/
Archivo pm25 procesado y guardado en ./siata/processed/


Este es el código que nos permite descargar los archivos de Google Colab

In [None]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
%cd /content/siata

/content/siata


In [None]:
!zip -r siata.zip /

[1;30;43mStreaming output truncated to the last 5000 lines.[0m


zip error: Interrupted (aborting)
