## **Procesamiento y consumo incremental de datos con Spark Structured Streaming**

### **Consumo de datos hacia sistema local**

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import year, month, dayofmonth, lpad
from pyspark.sql.functions import to_date
import requests
import os
import subprocess

spark = SparkSession. \
    builder. \
    config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1'). \
    config('spark.sql.warehouse.dir', '/user/local/spark/warehouse'). \
    config('spark.master', 'local[*]'). \
    enableHiveSupport(). \
    appName('Consumo y procesamiento de datos con Spark Structured Streaming'). \
    getOrCreate()

Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3539a72a-3c55-4a6c-abb1-8cfbfd6f5842;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.1 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.7.5 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 5861ms :: artifacts dl 190ms
	:: modules in use:
	com.

In [2]:
def upload_gharchive_files_to_hdfs(file_name):
    year = file_name[:4]
    month = file_name[5:7]
    day = file_name[8:10]
    
    file_url = f'https://raw.githubusercontent.com/perezlino/data_fake/main/{file_name}'
    
    try:
        response = requests.get(file_url)
        response.raise_for_status()
        
        target_local_folder = f'/spark_streaming/data/ghactivity/anio={year}/mes={month}/dia={day}'
        os.makedirs(target_local_folder, exist_ok=True)

        local_file_path = os.path.join(target_local_folder, file_name)
        with open(local_file_path, 'w', newline='', encoding='utf-8') as target_file:
            target_file.write(response.text)
        
        target_hdfs_folder = f'/proyecto/spark/streaming/landing/ghactivity/anio={year}/mes={month}/dia={day}'
        subprocess.check_call(f'hdfs dfs -mkdir -p {target_hdfs_folder}', shell=True)
        subprocess.check_call(f'hdfs dfs -put {local_file_path} {target_hdfs_folder}', shell=True)
    
    except requests.exceptions.RequestException as e:
        print(f"Error al descargar el archivo: {e}")
    except subprocess.CalledProcessError as e:
        print(f"Error al ejecutar el comando: {e}")
    except Exception as e:
        print(f"Error inesperado: {e}")

### **Procesamiento y carga de datos para el 2024-10-06 en HDFS**

In [4]:
file_date = '2024-10-07'

In [5]:
def processing_files(file_date):

    for hour in range(0, 6):
        print (f'Procesando archivo {file_date}-{hour}.csv')
        upload_gharchive_files_to_hdfs (f'{file_date}-{hour}.csv')

In [6]:
processing_files(file_date)

Procesando archivo 2024-10-07-0.csv
Procesando archivo 2024-10-07-1.csv
Procesando archivo 2024-10-07-2.csv
Procesando archivo 2024-10-07-3.csv
Procesando archivo 2024-10-07-4.csv
Procesando archivo 2024-10-07-5.csv


In [7]:
# Establecer el número de particiones de mezcla
spark.conf.set("spark.sql.shuffle.partitions", 16)

In [None]:
# Tener en cuenta que, por lo general, no inferimos el schema, ya que se desperdiciaría capacidad de cómputo al escanear los datos 
# solo para inferir el schema. En su lugar, aplicamos el schema manualmente.|
# No lo usaremos
spark.conf.set('spark.sql.streaming.schemaInference', 'true')

In [8]:
# Estamos configurando 'cleanSource' en 'delete' para eliminar los archivos que ya se procesaron en ejecuciones anteriores
# Con cleanSource='delete', los archivos procesados se eliminan en la siguiente ejecución del micro-batch
# Con cleanSource='delete' vamos a eliminar todos los archivos que se encuentren en /landing/ghactivity de la ejecución 
# inmediatamente anterior, es decir el único job realizado en el script 'carga_2024_10_06.ipynb'
# Realizamos la definición del schema, dado que en .readStream no se permite inferir el schema en un archivo CSV

schema = StructType([
    StructField("Nombre", StringType(), True),
    StructField("Apellido", StringType(), True),
    StructField("Edad", IntegerType(), True),
    StructField("Ciudad", StringType(), True),
    StructField("Trabajo", StringType(), True),
    StructField("Telefono", StringType(), True),
    StructField("Fecha", StringType(), True)
])

ghactivity_df = spark. \
    readStream. \
    format('csv'). \
    option('cleanSource', 'delete'). \
    option("header", "true"). \
    option("delimiter", ","). \
    schema(schema). \
    load ('/proyecto/spark/streaming/landing/ghactivity')

In [9]:
ghactivity_df.isStreaming

True

In [10]:
ghactivity_df.printSchema()

root
 |-- Nombre: string (nullable = true)
 |-- Apellido: string (nullable = true)
 |-- Edad: integer (nullable = true)
 |-- Ciudad: string (nullable = true)
 |-- Trabajo: string (nullable = true)
 |-- Telefono: string (nullable = true)
 |-- Fecha: string (nullable = true)
 |-- anio: integer (nullable = true)
 |-- mes: integer (nullable = true)
 |-- dia: integer (nullable = true)



In [11]:
# Debemos utilizar las mismas ubicaciones que utilizamos para todas las cargas tanto para 'path' como para 'checkpoint'. 
ghactivity_df. \
  writeStream. \
  format('parquet'). \
  partitionBy('anio', 'mes', 'dia'). \
  option("checkpointLocation", "/proyecto/spark/streaming/bronze/checkpoint/ghactivity"). \
  option("path", "/proyecto/spark/streaming/bronze/data/ghactivity"). \
  trigger(once=True). \
  start()

<pyspark.sql.streaming.StreamingQuery at 0x7f7d66f07b70>

### **Verificación de datos consumidos y procesados**

In [12]:
# Validación de la ubicación de los datos para ver si los archivos de la ejecución del micro-batch anterior fueron eliminados. 
# No deberias ver los archivos relacionados con la ejecución anterior
!hdfs dfs -ls -R /proyecto/spark/streaming/landing/ghactivity

[Stage 0:>                                                          (0 + 3) / 3]

drwxr-xr-x   - root supergroup          0 2024-10-11 17:58 /proyecto/spark/streaming/landing/ghactivity/anio=2024
drwxr-xr-x   - root supergroup          0 2024-10-11 18:19 /proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10
drwxr-xr-x   - root supergroup          0 2024-10-11 18:21 /proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=06
drwxr-xr-x   - root supergroup          0 2024-10-11 18:20 /proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=07
-rw-r--r--   3 root supergroup      11942 2024-10-11 18:19 /proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=07/2024-10-07-0.csv
-rw-r--r--   3 root supergroup      11967 2024-10-11 18:19 /proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=07/2024-10-07-1.csv
-rw-r--r--   3 root supergroup      12013 2024-10-11 18:19 /proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=07/2024-10-07-2.csv
-rw-r--r--   3 root supergroup      11956 2024-10-11 18:19 /proyecto/spark


                                                                                

In [15]:
# No me devuelve archivos, lo que nos dice que fueron eliminados
!hdfs dfs -ls /proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=06

In [16]:
!hdfs dfs -ls /proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=07

Found 6 items
-rw-r--r--   3 root supergroup      11942 2024-10-11 18:19 /proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=07/2024-10-07-0.csv
-rw-r--r--   3 root supergroup      11967 2024-10-11 18:19 /proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=07/2024-10-07-1.csv
-rw-r--r--   3 root supergroup      12013 2024-10-11 18:19 /proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=07/2024-10-07-2.csv
-rw-r--r--   3 root supergroup      11956 2024-10-11 18:19 /proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=07/2024-10-07-3.csv
-rw-r--r--   3 root supergroup      12052 2024-10-11 18:20 /proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=07/2024-10-07-4.csv
-rw-r--r--   3 root supergroup      11972 2024-10-11 18:20 /proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=07/2024-10-07-5.csv


In [17]:
!hdfs dfs -ls /proyecto/spark/streaming/bronze/checkpoint/ghactivity

Found 4 items
drwxr-xr-x   - root supergroup          0 2024-10-11 18:21 /proyecto/spark/streaming/bronze/checkpoint/ghactivity/commits
-rw-r--r--   3 root supergroup         45 2024-10-11 18:04 /proyecto/spark/streaming/bronze/checkpoint/ghactivity/metadata
drwxr-xr-x   - root supergroup          0 2024-10-11 18:21 /proyecto/spark/streaming/bronze/checkpoint/ghactivity/offsets
drwxr-xr-x   - root supergroup          0 2024-10-11 18:04 /proyecto/spark/streaming/bronze/checkpoint/ghactivity/sources


In [18]:
!hdfs dfs -ls -R /proyecto/spark/streaming/bronze/checkpoint/ghactivity/sources

drwxr-xr-x   - root supergroup          0 2024-10-11 18:21 /proyecto/spark/streaming/bronze/checkpoint/ghactivity/sources/0
-rw-r--r--   3 root supergroup        932 2024-10-11 18:04 /proyecto/spark/streaming/bronze/checkpoint/ghactivity/sources/0/0
-rw-r--r--   3 root supergroup        932 2024-10-11 18:21 /proyecto/spark/streaming/bronze/checkpoint/ghactivity/sources/0/1


In [20]:
!hdfs dfs -cat /proyecto/spark/streaming/bronze/checkpoint/ghactivity/sources/0/0

v1
{"path":"hdfs://namenode:9000/proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=06/2024-10-06-0.csv","timestamp":1728669535124,"batchId":0}
{"path":"hdfs://namenode:9000/proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=06/2024-10-06-1.csv","timestamp":1728669565158,"batchId":0}
{"path":"hdfs://namenode:9000/proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=06/2024-10-06-2.csv","timestamp":1728669583832,"batchId":0}
{"path":"hdfs://namenode:9000/proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=06/2024-10-06-3.csv","timestamp":1728669607100,"batchId":0}
{"path":"hdfs://namenode:9000/proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=06/2024-10-06-4.csv","timestamp":1728669620971,"batchId":0}
{"path":"hdfs://namenode:9000/proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=06/2024-10-06-5.csv","timestamp":1728669635791,"batchId":0}

In [21]:
!hdfs dfs -cat /proyecto/spark/streaming/bronze/checkpoint/ghactivity/sources/0/1

v1
{"path":"hdfs://namenode:9000/proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=07/2024-10-07-0.csv","timestamp":1728670749696,"batchId":1}
{"path":"hdfs://namenode:9000/proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=07/2024-10-07-1.csv","timestamp":1728670767999,"batchId":1}
{"path":"hdfs://namenode:9000/proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=07/2024-10-07-2.csv","timestamp":1728670780998,"batchId":1}
{"path":"hdfs://namenode:9000/proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=07/2024-10-07-3.csv","timestamp":1728670797594,"batchId":1}
{"path":"hdfs://namenode:9000/proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=07/2024-10-07-4.csv","timestamp":1728670810378,"batchId":1}
{"path":"hdfs://namenode:9000/proyecto/spark/streaming/landing/ghactivity/anio=2024/mes=10/dia=07/2024-10-07-5.csv","timestamp":1728670823733,"batchId":1}

In [22]:
!hdfs dfs -ls /proyecto/spark/streaming/bronze/checkpoint/ghactivity/offsets

Found 2 items
-rw-r--r--   3 root supergroup        471 2024-10-11 18:04 /proyecto/spark/streaming/bronze/checkpoint/ghactivity/offsets/0
-rw-r--r--   3 root supergroup        471 2024-10-11 18:21 /proyecto/spark/streaming/bronze/checkpoint/ghactivity/offsets/1


In [23]:
!hdfs dfs -cat /proyecto/spark/streaming/bronze/checkpoint/ghactivity/offsets/0

v1
{"batchWatermarkMs":0,"batchTimestampMs":1728669871299,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"16"}}
{"logOffset":0}

In [24]:
!hdfs dfs -cat /proyecto/spark/streaming/bronze/checkpoint/ghactivity/offsets/1

v1
{"batchWatermarkMs":0,"batchTimestampMs":1728670861090,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"16"}}
{"logOffset":1}

In [25]:
!hdfs dfs -ls -R /proyecto/spark/streaming/bronze/data/ghactivity

drwxr-xr-x   - root supergroup          0 2024-10-11 18:21 /proyecto/spark/streaming/bronze/data/ghactivity/_spark_metadata
-rw-r--r--   3 root supergroup        866 2024-10-11 18:04 /proyecto/spark/streaming/bronze/data/ghactivity/_spark_metadata/0
-rw-r--r--   3 root supergroup        866 2024-10-11 18:21 /proyecto/spark/streaming/bronze/data/ghactivity/_spark_metadata/1
drwxr-xr-x   - root supergroup          0 2024-10-11 18:04 /proyecto/spark/streaming/bronze/data/ghactivity/anio=2024
drwxr-xr-x   - root supergroup          0 2024-10-11 18:21 /proyecto/spark/streaming/bronze/data/ghactivity/anio=2024/mes=10
drwxr-xr-x   - root supergroup          0 2024-10-11 18:04 /proyecto/spark/streaming/bronze/data/ghactivity/anio=2024/mes=10/dia=6
-rw-r--r--   3 root supergroup       7331 2024-10-11 18:04 /proyecto/spark/streaming/bronze/data/ghactivity/anio=2024/mes=10/dia=6/part-00000-5a7e831e-4342-44cc-880f-7b633c9cbd2f.c000.snappy.parquet
-rw-r--r--   3 root supergroup       7335 20

### **Validación de los datos consumidos**

In [26]:
!hdfs dfs -ls /proyecto/spark/streaming/bronze/data/ghactivity/anio=2024/mes=10

Found 2 items
drwxr-xr-x   - root supergroup          0 2024-10-11 18:04 /proyecto/spark/streaming/bronze/data/ghactivity/anio=2024/mes=10/dia=6
drwxr-xr-x   - root supergroup          0 2024-10-11 18:21 /proyecto/spark/streaming/bronze/data/ghactivity/anio=2024/mes=10/dia=7


In [27]:
ghactivity = spark. \
    read. \
    parquet(f'/proyecto/spark/streaming/bronze/data/ghactivity')

In [28]:
ghactivity.count()

                                                                                

2400

In [29]:
ghactivity.printSchema()

root
 |-- Nombre: string (nullable = true)
 |-- Apellido: string (nullable = true)
 |-- Edad: integer (nullable = true)
 |-- Ciudad: string (nullable = true)
 |-- Trabajo: string (nullable = true)
 |-- Telefono: string (nullable = true)
 |-- Fecha: string (nullable = true)
 |-- anio: integer (nullable = true)
 |-- mes: integer (nullable = true)
 |-- dia: integer (nullable = true)



In [30]:
# Ahora estoy tratando de obtener el recuento para 'Ingenieros' que trabajen en 'Madrid'. Ahora tenemos datos de dos dias.
ghactivity. \
    filter("Ciudad = 'Madrid' AND Trabajo = 'Ingeniero' "). \
    count()

                                                                                

87

In [31]:
# Ahora estoy agrupando por fecha y luego obtengo el recuento. Aquí obtenemos el recuento total de dos dias.
ghactivity. \
    groupBy('ciudad'). \
    count(). \
    show()

+---------+-----+
|   ciudad|count|
+---------+-----+
|Barcelona|  508|
|   Madrid|  457|
| Valencia|  485|
|  Sevilla|  460|
|   Bilbao|  490|
+---------+-----+



In [32]:
ghactivity. \
    groupby('anio', 'mes', 'dia'). \
    count(). \
    show()

+----+---+---+-----+
|anio|mes|dia|count|
+----+---+---+-----+
|2024| 10|  6| 1200|
|2024| 10|  7| 1200|
+----+---+---+-----+



In [33]:
# Esto realmente da un recuento diario de 'Ingenieros' que trabajen en 'Madrid'. Ahora tenemos datos de dos dias.
ghactivity. \
    filter("Ciudad = 'Madrid' AND Trabajo = 'Ingeniero' "). \
    groupby('anio', 'mes', 'dia'). \
    count(). \
    show()

+----+---+---+-----+
|anio|mes|dia|count|
+----+---+---+-----+
|2024| 10|  6|   42|
|2024| 10|  7|   45|
+----+---+---+-----+

