Clase 2: Carga, Lectura y Exploración Detallada de DataFrames 

Objetivo de la Clase: Que la alumna domine las opciones de carga de archivos CSV, entienda la importancia de los tipos de datos en Spark, y sea capaz de realizar una exploración profunda de los DataFrames utilizando métodos clave para comprender la estructura y las características de los datos bancarios.

Contenido Detallado:

Repaso Rápido de SparkSession y Carga Básica

Recordatorio de cómo inicializar la SparkSession.
Repaso de la carga simple: spark.read.csv("ruta", header=True, inferSchema=True).

Opciones Avanzadas al Leer CSV

header=True/False: Ya visto, pero reconfirmar su importancia para los nombres de columnas.
inferSchema=True/False:
Ventajas de True: Conveniente para explorar datos rápidamente.
Desventajas de True: Puede ser lento para archivos muy grandes (Spark necesita una pasada completa para inferir), y a veces infiere tipos incorrectos (ej. números como strings).
Cuando usar False: En producción o con archivos gigantes, es mejor definir el esquema manualmente.
sep (separador): Especifica el delimitador de las columnas (ej. sep=";" para archivos punto y coma).
nullValue: Define qué cadena de texto debe interpretarse como null (ej. nullValue="N/A").
dateFormat y timestampFormat: Crucial para datos bancarios. Cómo especificar el formato de fecha para que Spark lo parsee correctamente a tipo Date o Timestamp.
Ej. dateFormat="yyyy-MM-dd", timestampFormat="yyyy-MM-dd HH:mm:ss".
Esquema Definido Manualmente (Programmatic Schema):
La mejor práctica para robustez y rendimiento.
Se crea un objeto StructType con StructField para cada columna.
Define el nombre de la columna, el tipo de dato (StringType, IntegerType, DoubleType, DateType, TimestampType de pyspark.sql.types), y si es nullable.

In [6]:
from pyspark.sql import SparkSession

# Crear una SparkSession
# .builder: Inicia el constructor de la sesión.
# .appName("Nombre de la Aplicacion"): Asigna un nombre a tu aplicación Spark.
# .config("spark.executor.memory", "4g"): Configura propiedades de Spark (ej. memoria de los ejecutores).
# .getOrCreate(): Crea una nueva SparkSession si no existe, o devuelve la existente.
spark = SparkSession.builder \
    .appName("CursoPySparkBancario_Clase2") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

print("SparkSession creada con éxito.")

SparkSession creada con éxito.


In [7]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType

# Ejemplo de esquema para bank_data.csv
bank_schema = StructType([
    StructField("Branch_ID", IntegerType(), True),
    StructField("City", StringType(), True),
    StructField("Region", StringType(), True),
    StructField("Firm_Revenue", DoubleType(), True),
    StructField("Expenses", DoubleType(), True), # Cambiado a Double por consistencia con Profit_Margin
    StructField("Profit_Margin", DoubleType(), True)
])

# Cargar usando el esquema definido
df_bank_manual = spark.read.csv("/home/ravi/Documents/jenifer/data/bank_data.csv", header=True, schema=bank_schema)

Discusión: Ventajas de usar esquema manual para control de calidad y rendimiento, especialmente en un banco donde la consistencia de datos es vital.

Tipos de Datos en Spark y su Importancia 

Repaso de tipos comunes: StringType, IntegerType, DoubleType, BooleanType, DateType, TimestampType.
¿Por qué son importantes?
Exactitud de cálculos: Operaciones aritméticas solo funcionan con tipos numéricos.
Uso de memoria: Tipos correctos optimizan el almacenamiento.
Funciones específicas: Las funciones de fecha (year, month, dayofmonth) solo funcionan con tipos DateType o TimestampType.
Calidad de datos: Un tipo de dato incorrecto puede indicar un problema subyacente en los datos fuente.
Casting (Cambio de Tipo): Cómo convertir una columna de un tipo a otro (.cast()).

In [9]:
from pyspark.sql.functions import col
df_bank_manual = df_bank_manual.withColumn("Branch_ID_str", col("Branch_ID").cast(StringType()))

Métodos Esenciales de Exploración Detallada

printSchema(): (Repaso) Verificación rápida de la estructura.
show(): (Repaso) Inspección visual de filas.
describe(): Obtiene estadísticas descriptivas (count, mean, stddev, min, max) para columnas numéricas y strings. Muy útil para un primer vistazo a la distribución de los datos.

In [10]:
df_customer = spark.read.csv("/home/ravi/Documents/jenifer/data/customer_data.csv", header=True)
df_customer.describe().show()
df_transaction = spark.read.csv("/home/ravi/Documents/jenifer/data/transaction_data.csv", header=True)
df_transaction.select("Total_Balance", "Transaction_Amount").describe().show()

25/06/09 12:59:49 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+------------------+------------------+-------------+--------+------+---------+-----------------+
|summary|       Customer_ID|               Age|Customer_Type|    City|Region|Bank_Name|        Branch_ID|
+-------+------------------+------------------+-------------+--------+------+---------+-----------------+
|  count|             10000|              9500|         9500|    9500| 10000|    10000|            10000|
|   mean|          204999.5| 48.75442105263158|         null|    null|  null|     null|        1497.8293|
| stddev|2886.8956799071675|17.919178673847643|         null|    null|  null|     null|288.7582371006225|
|    min|            200000|              18.0|     Business|Kolhapur|  East|HDFC Bank|             1000|
|    max|            209999|              79.0|   Individual| Solapur|  West|HDFC Bank|             1999|
+-------+------------------+------------------+-------------+--------+------+---------+-----------------+

+-------+------------------+-----------------

summary(): Similar a describe(), pero ofrece más estadísticas como cuartiles, IQR.

In [11]:
df_transaction.summary().show()

                                                                                

+-------+------------------+------------------+------------+------------------+------------------+------------------+-----------------+----------------+
|summary|    Transaction_ID|       Customer_ID|Account_Type|     Total_Balance|Transaction_Amount| Investment_Amount|  Investment_Type|Transaction_Date|
+-------+------------------+------------------+------------+------------------+------------------+------------------+-----------------+----------------+
|  count|             10000|             10000|       10000|             10000|             10000|             10000|            10000|           10000|
|   mean|          304999.5|       205028.1826|        null|        50221.5058| 2542.708056138515|        25550.2484|             null|            null|
| stddev|2886.8956799071675|2909.0081392963934|        null|28540.392010807747|1432.6774203740142|14108.052077783103|             null|            null|
|    min|            300000|            200000|    Business|             10002|   

count(): (Repaso) Número total de filas.
columns / dtypes: Lista de nombres de columnas / tuplas (nombre, tipo de dato).
distinct() y countDistinct(): Para explorar valores únicos.
df.select("Region").distinct().show(): Muestra los valores únicos en una columna.
df.select(countDistinct("Region")).show(): Cuenta el número de valores únicos. (Requiere from pyspark.sql.functions import countDistinct).
groupBy().count() y groupBy().agg(): Para ver la distribución de valores en columnas categóricas.

In [12]:
df_customer.groupBy("Customer_Type").count().show()
df_bank_manual.groupBy("Region").count().show()

+-------------+-----+
|Customer_Type|count|
+-------------+-----+
|         null|  500|
|   Individual| 3132|
|     Employee| 3182|
|     Business| 3186|
+-------------+-----+

+------+-----+
|Region|count|
+------+-----+
| South|  256|
|  East|  265|
|  West|  239|
| North|  240|
+------+-----+



select(): (Repaso) Selección de columnas.
filter() / where(): (Repaso) Filtrado de filas.

Ejemplos Prácticos en Clase (con el instructor):

Cargar customer_data.csv y transaction_data.csv:
Primero con inferSchema=True, mostrar printSchema().
Identificar Customer_Type como string y Age como double.
Identificar Transaction_Date como string en transaction_data.csv (¡problema común!).
Definir Esquema Manual para transaction_data.csv:
Construir el StructType para transaction_data.csv, asegurándose de que Transaction_Date sea DateType.
Recargar el DataFrame con el esquema manual y mostrar printSchema() de nuevo para ver la corrección.
Discutir por qué esto es más robusto para un banco.
Uso de describe() y summary():
Ejecutar df_customer.describe().show() y analizar las estadísticas para Age.
Ejecutar df_transaction.select("Total_Balance", "Transaction_Amount").describe().show().
Comparar describe() y summary().
Explorar valores únicos y distribuciones:
Mostrar df_customer.select("Customer_Type").distinct().show().
Contar tipos de cliente: df_customer.groupBy("Customer_Type").count().show().
Contar bancos únicos: df_customer.groupBy("Bank_Name").count().show().
Casting de columna: Convertir Branch_ID en df_bank a StringType como ejemplo de cast().

10 Ejercicios para la Estudiante (Requerimientos Laborales Simulados):

Contexto General: Eres un especialista en datos en el Departamento de Análisis de Clientes del Banco y el Departamento de Transacciones, encargado de asegurar la calidad y el entendimiento de nuestros datos.

Archivos a Usar: customer_data.csv y transaction_data.csv.

Instrucciones: Para cada ejercicio, crea una nueva SparkSession si no la tienes activa, o usa la existente. Asegúrate de importar las librerías necesarias.

Inicialización Robusta de Datos de Clientes:
Requerimiento: Nuestro equipo de desarrollo de ETL necesita una carga confiable de los datos de clientes. El tipo de cliente (Customer_Type) a veces viene con el valor "N/A" que debe tratarse como nulo, y la edad (Age) a veces no se registra.
Ejercicio: Carga customer_data.csv en un DataFrame llamado df_clientes. Asegúrate de que N/A en cualquier columna se interprete como nulo y que Spark infiera los tipos de datos. Muestra el esquema.
Preparación del Esquema de Transacciones:
Requerimiento: Las fechas de transacción (Transaction_Date) son críticas para la auditoría y los reportes financieros. Necesitamos asegurarnos de que Spark las reconozca explícitamente como fechas (DateType) con el formato AAAA-MM-DD para evitar errores.
Ejercicio: Define un StructType para transaction_data.csv donde Transaction_Date sea DateType con el formato yyyy-MM-dd. Carga transaction_data.csv en un DataFrame llamado df_transacciones usando este esquema. Muestra el esquema resultante.
Validación de Carga de Transacciones:
Requerimiento: El equipo de Riesgos necesita una confirmación rápida del volumen de transacciones que hemos procesado.
Ejercicio: Muestra el número total de transacciones en df_transacciones.
Análisis Descriptivo de Saldos:
Requerimiento: El equipo de Gestión de Cartera quiere conocer las estadísticas básicas (mínimo, máximo, promedio, etc.) de los saldos totales (Total_Balance) y los montos de inversión (Investment_Amount) para entender la distribución de activos de nuestros clientes.
Ejercicio: Realiza un describe() o summary() solo sobre las columnas Total_Balance y Investment_Amount de df_transacciones y muestra los resultados.
Perfiles de Clientes por Edad:
Requerimiento: El equipo de Marketing está interesado en la distribución de edades de nuestros clientes para segmentar campañas.
Ejercicio: Utiliza describe() sobre la columna Age del df_clientes para obtener sus estadísticas descriptivas.
Diversidad de Tipos de Cuenta:
Requerimiento: El Departamento de Productos Bancarios quiere saber cuántos tipos de cuentas bancarias diferentes manejamos y cuáles son.
Ejercicio: Muestra todos los valores únicos de la columna Account_Type en df_transacciones.
Conteo de Clientes por Tipo:
Requerimiento: Marketing también quiere saber cuántos clientes tenemos en cada Customer_Type (ej. Employee, Business, Individual).
Ejercicio: Agrupa df_clientes por Customer_Type y cuenta el número de clientes en cada grupo.
Verificación de Nombres de Bancos:
Requerimiento: Necesitamos una lista de todos los nombres de bancos que aparecen en nuestros datos de clientes para una auditoría.
Ejercicio: Muestra los nombres únicos de los bancos (Bank_Name) presentes en df_clientes.
Identificación de Transacciones con Valores Extremos:
Requerimiento: El equipo de Fraude está realizando una revisión y pide identificar cualquier transacción con un Transaction_Amount inusualmente alto (ej. mayor a 4000).
Ejercicio: Filtra df_transacciones para mostrar solo las transacciones donde Transaction_Amount sea mayor que 4000. Muestra las primeras 5 de estas transacciones.
Preparación para Próxima Clase (Recarga y Conversión):
Requerimiento: Para futuras transformaciones, necesitamos asegurarnos de que Branch_ID en df_clientes sea del tipo IntegerType (si no lo es ya por el inferSchema).
Ejercicio: Si Branch_ID en df_clientes no es IntegerType, conviértelo a IntegerType usando .cast() y .withColumn(). Muestra el printSchema() para confirmar el cambio. Detén tu SparkSession al finalizar.

Recursos Adicionales para la Alumna:

Documentación de pyspark.sql.types: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.StructType.html
Documentación de pyspark.sql.DataFrameReader (para opciones de lectura): https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html