## Tarea 3 - Procesamiento de Datos con Apache Spark
### Elaborado por: Jenny Bautista Vargas
### Grupo: 85 - UNAD - Big Data 

Este proyecto implementa una arquitectura Big Data usando Apache Spark y Apache Kafka, orientada al análisis en tiempo real de la Tasa Representativa del Mercado (TRM) publicada por la Superintendencia Financiera de Colombia.

El flujo completo abarca desde la descarga del dataset original en formato CSV, su procesamiento en modo batch, hasta la simulación de llegada de datos en tiempo real mediante un productor Kafka y el consumo con Spark Structured Streaming.

## Dataset Seleccionado: TRM - Tasa Representativa del Mercado: 

La Tasa Representativa del Mercado (TRM) refleja el valor del dólar estadounidense (USD) en pesos colombianos (COP), determinado diariamente por la Superintendencia Financiera: https://www.datos.gov.co/Econom-a-y-Finanzas/Tasa-de-Cambio-Representativa-del-Mercado-TRM/32sa-8pi3/about_data 

Campo	Descripción
VALOR	Valor de la tasa representativa en pesos colombianos (COP/USD).
UNIDAD	Unidad monetaria (generalmente “COP/USD”).
VIGENCIADESDE	Fecha de inicio de vigencia de la tasa.
VIGENCIAHASTA	Fecha final de vigencia (generalmente igual al día siguiente).

## Procesamiento Batch en Apache Spark: 
Cargar el conjunto de datos, limpiarlo, transformarlo y realizar un análisis exploratorio (EDA) usando RDDs y DataFrames. El propósito del script tarea3.py es procesar en modo batch el conjunto de datos de la TRM, aplicando pasos de limpieza, transformación, análisis exploratorio (EDA) y finalmente almacenamiento de resultados en formato optimizado (Parquet) en HDFS.

In [None]:
#Descargar los datos
wget https://www.datos.gov.co/api/views/mcec-87by/rows.csv -O rows.csv

In [None]:
#Iniciar park
pyspark

In [None]:
#Cargar y procesar el dataset
# tarea3.py
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import time

# Crear la sesión de Spark
spark = SparkSession.builder \
    .appName("Tarea3_ProcesamientoBatch") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# Ruta donde subí el archivo a HDFS
file_path = "hdfs://localhost:9000/Tarea3/rows.csv"

# Leer archivo CSV desde HDFS como DataFrame
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(file_path)

print("Dataset cargado correctamente desde HDFS")
df.printSchema()
df.show(10, truncate=False)

# Eliminar filas con valores nulos en columnas clave
df_clean = df.dropna(subset=["VALOR"])

# Convertir a tipo double la columna VALOR si es string
df_clean = df_clean.withColumn("VALOR", F.col("VALOR").cast("double"))

# Eliminar posibles duplicados
df_clean = df_clean.dropDuplicates()

# Crear una columna de año si existen columnas de fecha
if "VIGENCIADESDE" in df_clean.columns:
    df_clean = df_clean.withColumn("AÑO_INICIO", F.year(F.to_date("VIGENCIADESD>

print("Datos limpiados y transformados:")
df_clean.show(10, truncate=False)

print("Estadísticas generales:")
df_clean.describe().show()

# Valor promedio y máximo
df_clean.agg(
    F.avg("VALOR").alias("Promedio_VALOR"),
    F.max("VALOR").alias("Máximo_VALOR"),
    F.min("VALOR").alias("Mínimo_VALOR")
).show()

# Top 10 de valores más altos
print("Top 10 valores más altos:")
df_clean.orderBy(F.desc("VALOR")).show(10, truncate=False)

# Si tiene año, promedio por año
if "AÑO_INICIO" in df_clean.columns:
    print("Promedio por año:")
    df_clean.groupBy("AÑO_INICIO") \
        .agg(F.avg("VALOR").alias("Promedio_VALOR")) \

        .orderBy("AÑO_INICIO") \
        .show()

output_path = "hdfs://localhost:9000/Tarea3/processed"
df_clean.write.mode("overwrite").parquet(output_path)
print(f"Resultados procesados guardados en {output_path}")

print("Ejecución completa. Spark permanecerá activo 2 minutos para revisión en >
time.sleep(120)
spark.stop()



## Explicación del desarrollo en Spark
Primero se inicio la sesión de Spark que permite ejecutar operaciones distribuidas sobre grandes volúmenes de datos.
El nombre de la aplicación (“Tarea3_ProcesamientoBatch”) sirve para identificar el trabajo dentro del entorno de Spark. Además, se ajusta el nivel de log a “WARN” para mostrar solo mensajes importantes.

Despues se realizo la carga del dataset desde HDFS , con el archivo rows.csv directamente desde el sistema distribuido HDFS.

Posteriormente, se empezó a realizar la limpieza de datos, donde se realizó:
- Eliminación de valores nulos: Se borran filas donde el campo “VALOR” (la TRM) no tenga dato.
- Conversión de tipo: Se asegura que “VALOR” sea numérico (tipo double), lo cual es necesario para análisis estadístico.
- Eliminación de duplicados: Se eliminan filas repetidas para evitar distorsión en los cálculos.

En seguida se realizó la creación de una nueva columna derivada, se extrajo el año de la columna VIGENCIADESDE (fecha de inicio de vigencia de la TRM) para permitir análisis temporales por año.

Luego se logró realizar un análisis exploratorio (EDA): eL codigo no permitio ver la vista general y esquema por medio del tipo de cada columna y las primeras filas del dataset para entender su estructura,
se oncluyeron estadísticas descriptivas, visualización de indicadores agregados: Promedio histórico, valor máximo registrado, valor mínimo registrado; 
y el promedio de año y el top 10 de valores más altos.

Finalmente, el DataFrame limpio y transformado se guarda en HDFS en formato Parquet, que es más eficiente que CSV (mejor compresión y lectura distribuida).

## Procesamiento en Tiempo Real (Streaming) 
Simular la llegada de datos en tiempo real mediante Kafka, procesarlos con Spark Structured Streaming, y calcular métricas agregadas (promedio, máximo, mínimo, conteo) por ventana de tiempo.

In [None]:
#Kafka Producer (Python)
#Archivo:
kafka_producer.py

import csv
import json
import time
from kafka import KafkaProducer

# === CONFIGURACIÓN ===
CSV_PATH = "/home/vboxuser/rows.csv"   # Ruta del dataset TRM
KAFKA_BOOTSTRAP = "192.168.80.223:9092"
TOPIC = "trm_data"
MAX_MESSAGES = 1000
SLEEP_SEC = 0.5

def create_producer():
    return KafkaProducer(
        bootstrap_servers=[KAFKA_BOOTSTRAP],
        value_serializer=lambda v: json.dumps(v).encode('utf-8'),
        linger_ms=10
    )

def row_to_message(row, header):
    d = {h: row[i] for i, h in enumerate(header)}
    if 'VALOR' in d:
        try:
            d['VALOR'] = float(d['VALOR'])
        except:
            pass
    return d

def main():
    producer = create_producer()
    sent = 0
    with open(CSV_PATH, newline='', encoding='utf-8') as csvfile:
        reader = csv.reader(csvfile)
        header = next(reader)
        for row in reader:
            msg = row_to_message(row, header)
            producer.send(TOPIC, value=msg)
            sent += 1
            if sent % 50 == 0:
                producer.flush()
            print(f"Sent {sent}: {msg}")
            if MAX_MESSAGES and sent >= MAX_MESSAGES:
                break
            time.sleep(SLEEP_SEC)
    producer.flush()
    producer.close()
    print("✅ Producer finished. Sent total:", sent)

if __name__ == "__main__":
    main()


In [None]:
#Spark Streaming Consumer
#Archivo:
spark_streaming_consumer.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, to_timestamp, expr
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

KAFKA_BOOTSTRAP = "192.168.80.223:9092"
TOPIC = "trm_data"
CHECKPOINT_LOC = "/tmp/spark_checkpoint_kafka"

schema = StructType([
    StructField("VALOR", DoubleType(), True),
    StructField("UNIDAD", StringType(), True),
    StructField("VIGENCIADESDE", StringType(), True),
    StructField("VIGENCIAHASTA", StringType(), True)
])

spark = SparkSession.builder \
    .appName("SparkKafkaConsumer_TRM") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

print("✅ Spark Streaming Consumer iniciado y conectado a Kafka")

raw = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP) \
    .option("subscribe", TOPIC) \
    .option("startingOffsets", "earliest") \
    .load()

json_str = raw.selectExpr("CAST(value AS STRING) as json_str")

parsed = json_str.select(from_json(col("json_str"), schema).alias("data")).select("data.*")

parsed = parsed.filter(col("VALOR").isNotNull())

parsed = parsed.withColumn(
    "event_time",
    to_timestamp(col("VIGENCIADESDE"), "yyyy-MM-dd")
)

parsed = parsed.withColumn(
    "event_time",
    expr("coalesce(event_time, current_timestamp())")
)

agg = parsed.groupBy(
    window(col("event_time"), "1 minute"),
    col("UNIDAD")
).agg(
    expr("count(*) as cnt"),
    expr("avg(VALOR) as avg_valor"),
    expr("max(VALOR) as max_valor"),
    expr("min(VALOR) as min_valor")
).select(
    col("window.start").alias("window_start"),
    col("window.end").alias("window_end"),
    col("UNIDAD"),
    col("cnt"),
    col("avg_valor"),
    col("max_valor"),
    col("min_valor")
)

query = agg.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .option("numRows", 50) \
    .option("checkpointLocation", CHECKPOINT_LOC) \
    .start()

query.awaitTermination()

## Instrucciones de Ejecución

In [None]:
#Iniciar servicios de Kafka
cd /opt/kafka
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties

#Crear el topic
bin/kafka-topics.sh --create --topic trm_data --bootstrap-server 192.168.80.223:9092 --partitions 1 --replication-factor 1

#Ejecutar el productor
python3 kafka_producer.py

#Ejecutar el consumidor (Spark)
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3 spark_streaming_consumer.py

## Resultados esperados

En consola, Spark mostrará la salida en formato tabular por ventana de tiempo (Batch1, 2, 3, 4....):

## Tecnologías utilizadas

Apache Spark 3.5.3

Apache Kafka 3.7.2

Python 3.10

pyspark, kafka-python

VirtualBox / Ubuntu 20.04

Dataset oficial TRM - Datos Abiertos de Colombia