<a href="https://colab.research.google.com/github/prometricas/Abeba_Datos_Masivos/blob/main/personaYMetodosDePago.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **1. Instalación de Java y Spark**

## **Instalar JDK de Java**

In [None]:
!apt-get install -y openjdk-11-jdk-headless -qq > /dev/null

## **Instalar Spark**

In [None]:
# Descargar Spark
!wget https://dlcdn.apache.org/spark/spark-3.5.7/spark-3.5.7-bin-hadoop3.tgz

--2026-01-13 12:32:43--  https://dlcdn.apache.org/spark/spark-3.5.7/spark-3.5.7-bin-hadoop3.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400914067 (382M) [application/x-gzip]
Saving to: ‘spark-3.5.7-bin-hadoop3.tgz’


2026-01-13 12:32:45 (289 MB/s) - ‘spark-3.5.7-bin-hadoop3.tgz’ saved [400914067/400914067]



In [None]:
# Descomprimir
!tar xf spark-3.5.7-bin-hadoop3.tgz

# Mostrar folder descomprimido
!ls /content

sample_data  spark-3.5.7-bin-hadoop3  spark-3.5.7-bin-hadoop3.tgz


## **Definir variables de entorno**


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.7-bin-hadoop3"

## **Instalar e Iniciar findspark**
Esto permite usar *PySpark* como una librería estándar en Python.

In [None]:
!pip install findspark
import findspark
findspark.init()

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


# **`2. Importar bases de datos`**

In [None]:
# Importar archivo
from google.colab import files
files.upload()

Saving casoDePrueba3.txt to casoDePrueba3.txt


{'casoDePrueba3.txt': b'Alice;Tarjeta de cr\xc3\xa9dito;1000\nAlice;Tarjeta de cr\xc3\xa9dito;1800\nAlice;Tarjeta de cr\xc3\xa9dito;2100\nBob;Bizum;2000\nAlice;Bizum;1000\nBob;Tarjeta de cr\xc3\xa9dito;1100\n'}

# **3. Crear script**

Esta solución de abajo lee el dataset una sola vez, calcula dos contadores a la vez por persona (mayor y menor/igual) y hace un único reduceByKey. Luego persiste el agregado para escribir las dos salidas sin recalcular todo.

El programa recibe 2 parámetros:
* Ruta del archivo de entrada.
* Carpeta base de salida: dentro se crean las dos carpetas exigidas por el enunciado.

In [None]:
%%writefile personaYMetodosDePago.py
import sys
from pyspark import SparkContext
from pyspark.storagelevel import StorageLevel

def _strip_accents(s: str) -> str:
    # Normalizo acentos de forma ligera para comparar el método de pago
    return (s.replace("á", "a").replace("é", "e").replace("í", "i")
             .replace("ó", "o").replace("ú", "u"))

def _to_pair(linea: str):
    # Leo y separo: persona;método_pago;dinero_gastado
    persona, metodo, gasto = linea.split(";")
    persona = persona.strip()

    metodo_norm = _strip_accents(metodo.strip().lower())
    gasto_num = float(gasto.strip())

    # Cuento solo tarjeta de crédito; para otros métodos devuelvo ceros
    if metodo_norm == "tarjeta de credito":
        mayor_1500 = 1 if gasto_num > 1500.0 else 0
        menor_igual_1500 = 1 if gasto_num <= 1500.0 else 0
        return (persona, (mayor_1500, menor_igual_1500))

    return (persona, (0, 0))

def main():
    # Valido parámetros: entrada y carpeta base de salida
    if len(sys.argv) != 3:
        print("Uso: personaYMetodosDePago.py <ruta_entrada> <carpeta_salida_base>", file=sys.stderr)
        sys.exit(1)

    ruta_entrada = sys.argv[1]
    salida_base = sys.argv[2].rstrip("/")

    out_a = f"{salida_base}/comprasConTDCMayorDe1500"
    out_b = f"{salida_base}/comprasConTDCMenoroIgualDe1500"

    sc = SparkContext.getOrCreate()
    sc.setLogLevel("WARN")

    # Cargo, transformo a (persona, (cnt_mayor, cnt_menorIgual)) y agrego en un único reduce
    rdd = sc.textFile(ruta_entrada)
    agreg = (
        rdd
        .map(_to_pair)
        .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
        .persist(StorageLevel.MEMORY_ONLY)
    )

    # Genero salida (a)
    agreg.map(lambda kv: f"{kv[0]};{kv[1][0]}").saveAsTextFile(out_a)

    # Genero salida (b)
    agreg.map(lambda kv: f"{kv[0]};{kv[1][1]}").saveAsTextFile(out_b)

    sc.stop()

if __name__ == "__main__":
    main()

Writing personaYMetodosDePago.py


# **4. Ejecutar con Spark**

In [None]:
# Antes de ejecutar, conviene borrar salidas previas: Spark falla si la carpeta ya existe
!rm -rf comprasConTDCMayorDe1500 comprasConTDCMenoroIgualDe1500

# Ejecutar con spark-submit
!$SPARK_HOME/bin/spark-submit personaYMetodosDePago.py casoDePrueba3.txt /content

26/01/13 12:53:04 INFO SparkContext: Running Spark version 3.5.7
26/01/13 12:53:04 INFO SparkContext: OS info Linux, 6.6.105+, amd64
26/01/13 12:53:04 INFO SparkContext: Java version 17.0.17
26/01/13 12:53:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/01/13 12:53:04 INFO ResourceUtils: No custom resources configured for spark.driver.
26/01/13 12:53:04 INFO SparkContext: Submitted application: personaYMetodosDePago.py
26/01/13 12:53:04 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
26/01/13 12:53:04 INFO ResourceProfile: Limiting resource is cpu
26/01/13 12:53:04 INFO ResourceProfileManager: Added ResourceProfile id: 0
26/01/13 12:53:04 INFO SecurityMana

In [None]:
# Ver salida 1
!cat comprasConTDCMayorDe1500/part-*

Alice;2
Bob;0


In [None]:
# Ver salida 2
!cat comprasConTDCMenoroIgualDe1500/part-*

Alice;1
Bob;1
