# Configuración Spark con Delta y AWS

> Es necesario agregar paquetes externos a la instancia de Spark, ya que son necesarios para conectar con MinIO o el servicio de AWS S3.

Ejemplo:
```python
builder = pyspark.sql.SparkSession.builder.appName("ETL_Fugas") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio_url:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "<access_key>") \
    .config("spark.hadoop.fs.s3a.secret.key", "<secret_key>") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
    .config("fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
```

In [2]:
import pyspark
from delta import *

In [None]:
builder = pyspark.sql.SparkSession.builder.appName("ETL_Fugas") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "GVTZ4kb69J0Gop6YaWSf") \
    .config("spark.hadoop.fs.s3a.secret.key", "WDkCNivmHhMRrBPgOWRIHZSHb5iMNyzBJgSgFk4f") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
    .config("fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.databricks.delta.retentionDurationCheck.enabled", "false")

# Configuración de paquetes
my_packages = ["org.apache.hadoop:hadoop-aws:3.3.4",
            "org.apache.hadoop:hadoop-client-runtime:3.3.4",
            "org.apache.hadoop:hadoop-client-api:3.3.4",
            "com.amazonaws:aws-java-sdk-bundle:1.12.779",
]

# Creación de la sesión de spark
spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

In [4]:
spark.sparkContext.setLogLevel("DEBUG")

try:
    # Leer el archivo
    df = spark.read.format("parquet").load("s3a://cero-fugas/reportes_agua_2024_01.parquet")
    df.show()
except Exception as e:
    print(f"Error al leer el archivo: {e}")
    import traceback
    traceback.print_exc()

+---------------+------------------------+---------------+-------------+------------+-------------+-------------+--------------------+-----------------+--------------------+------------------+------------------+
|          folio|fecha_registro_incidente|     id_reporte|fecha_reporte|hora_reporte|clasificacion|tipo_de_falla|     medio_recepcion|alcaldia_catalogo|    colonia_catalogo|           latitud|          longitud|
+---------------+------------------------+---------------+-------------+------------+-------------+-------------+--------------------+-----------------+--------------------+------------------+------------------+
|I-20220101-0001|              2022-01-02|R-20220101-0105|   2022-01-01|    18:33:08| Agua Potable|         Fuga|Ciudadano (Call C...|         Coyoacán|Cuadrante De San ...|19.341799572539976| -99.1682732538209|
|I-20220101-0003|              2022-01-02|R-20220101-0103|   2022-01-01|    18:12:39| Agua Potable|         Fuga|Ciudadano (Call C...|          Tláhuac|

In [6]:
df.printSchema()

root
 |-- folio: string (nullable = true)
 |-- fecha_registro_incidente: date (nullable = true)
 |-- id_reporte: string (nullable = true)
 |-- fecha_reporte: date (nullable = true)
 |-- hora_reporte: string (nullable = true)
 |-- clasificacion: string (nullable = true)
 |-- tipo_de_falla: string (nullable = true)
 |-- medio_recepcion: string (nullable = true)
 |-- alcaldia_catalogo: string (nullable = true)
 |-- colonia_catalogo: string (nullable = true)
 |-- latitud: double (nullable = true)
 |-- longitud: double (nullable = true)



In [None]:
# Convertimos a Delta y Particionamos los datos por fecha_reporte y alcaldia
df.write \
    .partitionBy("fecha_reporte", "alcaldia_catalogo") \
    .format("delta").save("s3a://cero-fugas/delta-tables")

# Carga de datos del historico

In [10]:
spark.sparkContext.setLogLevel("DEBUG")

try:
    # Leer el archivo
    df_hist = spark.read.format("parquet").load("s3a://cero-fugas/reportes_agua_hist.parquet")
    df_hist.show()
except Exception as e:
    print(f"Error al leer el archivo: {e}")
    import traceback
    traceback.print_exc()

+-------------+-------------+-----------------+----------+-----------+-------------+----------+-----------------------+----------------------+-----------------+
|        folio|tipo_de_falla|    quien_atiende|   latitud|   longitud|codigo_postal|     fecha|colonia_registro_sacmex|colonia_datos_abiertos|         alcaldia|
+-------------+-------------+-----------------+----------+-----------+-------------+----------+-----------------------+----------------------+-----------------+
|20210107-0129|         Fuga|         TlÃ¡huac|19.3101922|-99.0477861|         NULL|2021-01-07|      Ciudad de MÃ©xico|         LAS ARBOLEDAS|          TLAHUAC|
|20210101-0008|         Fuga|       Iztapalapa|19.3706151| -99.030413|         9239|2021-01-01|      Ciudad de MÃ©xico|  EJERCITO DE ORIEN...|       IZTAPALAPA|
|20210101-0020|         Fuga| Ãlvaro ObregÃ³n|19.3758572|-99.2338191|         NULL|2021-01-01|      Ciudad de MÃ©xico|  2DA  JALALPA TEPI...|   ALVARO OBREGON|
|20210101-0048|         Fuga| Ãlv

In [11]:
df_hist.printSchema()

root
 |-- folio: string (nullable = true)
 |-- tipo_de_falla: string (nullable = true)
 |-- quien_atiende: string (nullable = true)
 |-- latitud: double (nullable = true)
 |-- longitud: double (nullable = true)
 |-- codigo_postal: short (nullable = true)
 |-- fecha: date (nullable = true)
 |-- colonia_registro_sacmex: string (nullable = true)
 |-- colonia_datos_abiertos: string (nullable = true)
 |-- alcaldia: string (nullable = true)



# Unión Datos y Delta

In [19]:
df.createOrReplaceTempView("fugas_actual")

In [20]:
res = spark.sql("""
    SELECT folio, fecha_registro_incidente, id_reporte, fecha_reporte, hora_reporte, clasificacion, tipo_de_falla, medio_recepcion,
            UPPER(alcaldia_catalogo) AS alcaldia_catalogo, UPPER(colonia_catalogo) AS colonia_catalogo, latitud, longitud
    FROM fugas_actual
""")

In [21]:
res.show()

+---------------+------------------------+---------------+-------------+------------+-------------+-------------+--------------------+-----------------+--------------------+------------------+------------------+
|          folio|fecha_registro_incidente|     id_reporte|fecha_reporte|hora_reporte|clasificacion|tipo_de_falla|     medio_recepcion|alcaldia_catalogo|    colonia_catalogo|           latitud|          longitud|
+---------------+------------------------+---------------+-------------+------------+-------------+-------------+--------------------+-----------------+--------------------+------------------+------------------+
|I-20220101-0001|              2022-01-02|R-20220101-0105|   2022-01-01|    18:33:08| Agua Potable|         Fuga|Ciudadano (Call C...|         COYOACÁN|CUADRANTE DE SAN ...|19.341799572539976| -99.1682732538209|
|I-20220101-0003|              2022-01-02|R-20220101-0103|   2022-01-01|    18:12:39| Agua Potable|         Fuga|Ciudadano (Call C...|          TLÁHUAC|

In [22]:
res.createTempView("fugas_actual_convertido")

In [None]:
# Creamos la tabla temporal
df_hist.createTempView("fugas_historico")

In [28]:
results = spark.sql("""SELECT folio, NULL AS fecha_registro_incidente, 'NA' AS id_reporte, fecha AS fecha_reporte, 'NA' AS hora_reporte, 
        'Agua Potable' AS clasificacion, tipo_de_falla, 'Ciudadano (Call Center)' AS medio_recepcion, UPPER(alcaldia) AS alcaldia_catalogo,
        UPPER(colonia_datos_abiertos) AS colonia_catalogo, latitud, longitud
        FROM fugas_historico""")

In [29]:
results.show()

+-------------+------------------------+----------+-------------+------------+-------------+-------------+--------------------+-----------------+--------------------+----------+-----------+
|        folio|fecha_registro_incidente|id_reporte|fecha_reporte|hora_reporte|clasificacion|tipo_de_falla|     medio_recepcion|alcaldia_catalogo|    colonia_catalogo|   latitud|   longitud|
+-------------+------------------------+----------+-------------+------------+-------------+-------------+--------------------+-----------------+--------------------+----------+-----------+
|20210107-0129|                    NULL|        NA|   2021-01-07|          NA| Agua Potable|         Fuga|Ciudadano (Call C...|          TLAHUAC|       LAS ARBOLEDAS|19.3101922|-99.0477861|
|20210101-0008|                    NULL|        NA|   2021-01-01|          NA| Agua Potable|         Fuga|Ciudadano (Call C...|       IZTAPALAPA|EJERCITO DE ORIEN...|19.3706151| -99.030413|
|20210101-0020|                    NULL|        NA

In [30]:
results.createOrReplaceTempView("fugas_hist_convertido")

In [31]:
# Revisar el schema
spark.sql("DESCRIBE fugas_hist_convertido").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|               folio|   string|   NULL|
|fecha_registro_in...|     void|   NULL|
|          id_reporte|   string|   NULL|
|       fecha_reporte|     date|   NULL|
|        hora_reporte|   string|   NULL|
|       clasificacion|   string|   NULL|
|       tipo_de_falla|   string|   NULL|
|     medio_recepcion|   string|   NULL|
|   alcaldia_catalogo|   string|   NULL|
|    colonia_catalogo|   string|   NULL|
|             latitud|   double|   NULL|
|            longitud|   double|   NULL|
+--------------------+---------+-------+



In [32]:
spark.sql("DESCRIBE fugas_actual_convertido").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|               folio|   string|   NULL|
|fecha_registro_in...|     date|   NULL|
|          id_reporte|   string|   NULL|
|       fecha_reporte|     date|   NULL|
|        hora_reporte|   string|   NULL|
|       clasificacion|   string|   NULL|
|       tipo_de_falla|   string|   NULL|
|     medio_recepcion|   string|   NULL|
|   alcaldia_catalogo|   string|   NULL|
|    colonia_catalogo|   string|   NULL|
|             latitud|   double|   NULL|
|            longitud|   double|   NULL|
+--------------------+---------+-------+



In [33]:
# Uniendo los datos
result_2 = spark.sql("""
    SELECT folio, fecha_registro_incidente, id_reporte, fecha_reporte, hora_reporte, clasificacion, tipo_de_falla, medio_recepcion, 
        alcaldia_catalogo, colonia_catalogo, latitud, longitud
    FROM fugas_hist_convertido
    UNION
    SELECT folio, fecha_registro_incidente, id_reporte, fecha_reporte, hora_reporte, clasificacion, tipo_de_falla, medio_recepcion,
            alcaldia_catalogo, colonia_catalogo, latitud, longitud
    FROM fugas_actual_convertido
""")

In [34]:
result_2.show()

+---------------+------------------------+---------------+-------------+------------+-------------+-------------+--------------------+-----------------+--------------------+------------------+------------------+
|          folio|fecha_registro_incidente|     id_reporte|fecha_reporte|hora_reporte|clasificacion|tipo_de_falla|     medio_recepcion|alcaldia_catalogo|    colonia_catalogo|           latitud|          longitud|
+---------------+------------------------+---------------+-------------+------------+-------------+-------------+--------------------+-----------------+--------------------+------------------+------------------+
|I-20220101-0021|              2022-01-02|R-20220101-0025|   2022-01-01|    11:04:13| Agua Potable|         Fuga|Ciudadano (Call C...|GUSTAVO A. MADERO| CHALMA DE GUADALUPE|19.552268341892827|-99.15106415748596|
|I-20220103-0083|              2022-01-03|R-20220105-0320|   2022-01-05|    14:18:19| Agua Potable|         Fuga|Redes Sociales (T...|        IZTACALCO|

In [35]:
# Export a Delta y MinIO
result_2.write \
    .partitionBy("fecha_reporte", "alcaldia_catalogo") \
    .format("delta").save("s3a://cero-fugas/delta-tables/fugas")

# Conexion a PostgreSQL

In [3]:
builder = pyspark.sql.SparkSession.builder.appName("ETL_Fugas") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "GVTZ4kb69J0Gop6YaWSf") \
    .config("spark.hadoop.fs.s3a.secret.key", "WDkCNivmHhMRrBPgOWRIHZSHb5iMNyzBJgSgFk4f") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
    .config("fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.databricks.delta.retentionDurationCheck.enabled", "false")

# Configuración de paquetes
my_packages = ["org.apache.hadoop:hadoop-aws:3.3.4",
            "org.apache.hadoop:hadoop-client-runtime:3.3.4",
            "org.apache.hadoop:hadoop-client-api:3.3.4",
            "com.amazonaws:aws-java-sdk-bundle:1.12.779",
            "org.postgresql:postgresql:42.7.4"
]

# Creación de la sesión de spark
spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

In [4]:
# Cargamos los datos del Data lake
fugas_df = spark.read.format("delta").load("s3a://cero-fugas/delta-tables/fugas")
fugas_df.show()

+-------------+------------------------+----------+-------------+------------+-------------+-------------+--------------------+-----------------+--------------------+----------+-----------+
|        folio|fecha_registro_incidente|id_reporte|fecha_reporte|hora_reporte|clasificacion|tipo_de_falla|     medio_recepcion|alcaldia_catalogo|    colonia_catalogo|   latitud|   longitud|
+-------------+------------------------+----------+-------------+------------+-------------+-------------+--------------------+-----------------+--------------------+----------+-----------+
|20210104-0005|                    NULL|        NA|   2021-01-04|          NA| Agua Potable|         Fuga|Ciudadano (Call C...|GUSTAVO A. MADERO|CASAS ALEMAN (AMP...|19.4804576|-99.0819222|
|20210104-0281|                    NULL|        NA|   2021-01-04|          NA| Agua Potable|         Fuga|Ciudadano (Call C...|GUSTAVO A. MADERO|           MALACATES|19.5737057|-99.1298101|
|20210104-0101|                    NULL|        NA

In [7]:
# Datos de conexion
url = "jdbc:postgresql://localhost:5432/sacmex"
properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

In [8]:
fugas_df.write.jdbc(url, "reportes", mode="append", properties=properties)