# RDD

Un **RDD** es una estructura de datos fundamental en Apache Spark. Piensa en él como una **cinta transportadora** que distribuye los datos entre varias máquinas para que puedan ser procesados de manera paralela (¡muy rápido!). Es muy útil para realizar operaciones con grandes volúmenes de datos de forma eficiente.

**Resilient** significa que los RDDs pueden **recuperarse** de errores si alguna parte de la operación falla. **Distributed** significa que los datos pueden estar distribuidos en varias máquinas, y **Dataset** se refiere a un conjunto de datos.

## Fuentes de datos a las que prodrémos conectaros

- **Colecciones locales**
  - Listas o arrays en Python usando `parallelize`.

- **Archivos locales o distribuidos**
  - Archivos de texto (`.txt`).
  - Archivos CSV.
  - Archivos JSON.
  - Archivos Parquet.
  - Archivos ORC.
  - Archivos en HDFS.
  - Archivos en S3 (Amazon Simple Storage Service).
  - Archivos en Azure Blob Storage.

- **Bases de datos relacionales**
  - MySQL.
  - PostgreSQL.
  - Oracle.
  - Microsoft SQL Server.
  - SQLite.

- **Bases de datos NoSQL**
  - Cassandra.
  - MongoDB.
  - HBase.
  - Redis.

- **Streams y mensajes**
  - Kafka.
  - Flume.
  - Sockets TCP/UDP.

- **APIs externas**
  - REST APIs.
  - Servicios web SOAP.

- **Otras fuentes**
  - Elasticsearch.
  - Google BigQuery.
  - Snowflake.
  - Data Lakes (genéricos).
  - Hive Metastore.

## Traemos nuestros datos de ejemplo

In [1]:
from pyspark import SparkContext
sc = SparkContext("local", "CSV to RDD")
clientes_ruta_csv = "../resources/clientes.csv"
ordenes_ruta_csv = "../resources/ordenes.csv"

rdd_clientes = sc.textFile(clientes_ruta_csv)
rdd_ordenes = sc.textFile(ordenes_ruta_csv)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/10 20:45:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Funciones RDD

### Convertir nuestro archivo en filas y columnas

En caso de que estemos trabajando con datos tabulados debemos separarlos para poder operar como si se tratará de filas y columnas

In [2]:
rdd_clientes = rdd_clientes.map(lambda line: line.split(","))
rdd_ordenes = rdd_ordenes.map(lambda line: line.split(","))

# Los encabezados de nuestro csv no son relevantes en este caso por lo que los vamos a eliminar

# Extraer los encabezados una sola vez
header_clientes = rdd_clientes.first()
header_ordenes = rdd_ordenes.first()

# Filtrar las filas para eliminar los encabezados
rdd_clientes = rdd_clientes.filter(lambda x: x != header_clientes)
rdd_ordenes = rdd_ordenes.filter(lambda x: x != header_ordenes)

                                                                                

### Mostrar un determinado numero de filas

In [3]:
rdd_ordenes.take(3)

[['0', '1', '9743', '365.04', '2022-02-16', 'Cancelada'],
 ['1', '2', '6515', '750.04', '2022-01-28', 'Cancelada'],
 ['2', '3', '9812', '167.25', '2023-10-31', 'Cancelada']]

In [4]:
rdd_clientes.take(1)

[['0',
  '1',
  'Tyler Jones',
  'johnsonjuan@example.net',
  '2020-05-15',
  'Henrytown']]

### Aplicar una transforación a los elementos

Para modificar los valorres dentro de nuestro RDD debemos de aplicarle una función  que nos ayude a transformalo, esto lo podemos lograr con .map()

In [5]:
# por ejemplo si queremos calcula el valor del iva en las ordenes:

iva = rdd_ordenes.map(lambda x: float(x[3])*0.19)
iva.take(3)

[69.3576, 142.5076, 31.7775]

In [6]:
# Si quieremos agregar una culomna con el iva
rdd_ordenes = rdd_ordenes.map(lambda x: x + [float(x[3])*0.19])
rdd_ordenes.take(5)

[['0', '1', '9743', '365.04', '2022-02-16', 'Cancelada', 69.3576],
 ['1', '2', '6515', '750.04', '2022-01-28', 'Cancelada', 142.5076],
 ['2', '3', '9812', '167.25', '2023-10-31', 'Cancelada', 31.7775],
 ['3', '4', '324', '351.19', '2024-02-03', 'Pendiente', 66.7261],
 ['4', '5', '8519', '466.03', '2024-02-10', 'Pendiente', 88.5457]]

### Filtrar filas por dada una condición

In [7]:
# Pdemos filtrar por nuestras columnas por ejemplo

# Filtro por fecha
rdd_ordenes_dates = rdd_ordenes.filter(lambda x: x[4] == '2024-02-01')

# Filtrar por estado
rdd_ordenes_status = rdd_ordenes.filter(lambda x: x[5] == 'Pendiente')
rdd_ordenes_status.take(5)

# Filtrar por dos condiciones
rdd_ordenes_double_check = rdd_ordenes.filter(lambda x: (x[5] == 'Pendiente') and (x[4] == '2024-02-01'))


### Agregar valores

In [8]:
## Queremos saber el total de compras aprovadas para el año 2024
response = rdd_ordenes.filter(
    lambda x: (x[5]=='Completada') and 
    (x[4]>='2024-01-01') and
    (x[4]<='2024-12-31')).map(
        lambda x: float(x[3])).reduce(lambda x, y: x+y)
print(int(response))



263005168


                                                                                

### Agregar valores por una clave

In [11]:
# Podemos agregar  nuestros valores por uno o mas llaves, por ejemplo, vamos a agregar por usuario y por el status de la transacción

response = rdd_ordenes.map(lambda x: ((x[1], x[5]), float(x[3])))  # Tupla de 2 elementos como clave
response = response.reduceByKey(lambda x, y: x + y)  # Reducir por clave
#print(response.collect())
print(response.take(10))



[(('8', 'Pendiente'), 765.72), (('14', 'Completada'), 942.59), (('17', 'Cancelada'), 791.26), (('19', 'Pendiente'), 788.49), (('22', 'Cancelada'), 376.08), (('26', 'Cancelada'), 11.24), (('27', 'Pendiente'), 709.15), (('44', 'Cancelada'), 361.51), (('55', 'Completada'), 853.29), (('76', 'Completada'), 849.3)]


                                                                                

### Flat map

Este nos permite aplanar datos que contengan varias dimesiones

In [None]:
# En este ejemplo estamos creando un rdd con registros de una dimensión, flatmap los desglosa y los convierte en elementos dse 0 dim ( por eso se repiden )
response = rdd_clientes.map(lambda x: [x[0], x[0]]).flatMap(lambda x: x)
print(response.take(10))

['0', '0', '1', '1', '2', '2', '3', '3', '4', '4']
