**Tabla de contenido**

- [Preparación entorno spark](#Preparacion-entorno-spark)
- [Lectura de datos](#Lectura-de-datos)
- [Limpieza y transformacion](#Limpieza-y-transformacion)
    - [Duplicados](#Duplicados)
    - [Convertir columnas a su respectivo formato](#Convertir-columnas-a-su-respectivo-formato)
    - [Valores faltantes](#Valores-faltantes)
    - [Eliminar caracteres especiales](#Eliminar-caracteres-especiales)
- [Broadcast join](#Broadcast-join)

# Preparacion entorno spark

Vamos a preparar el entorno de trabajo con las siguientes configuraciones:

1. Importaciones:

- `SparkContext:` Punto de entrada principal para funcionalidades de Spark
- `SparkSession:` Interfaz unificada para trabajar con datos en Spark


2. `sc = SparkContext('local')`:

- Crea un contexto de Spark en modo local. 
- Significa que Spark correrá en tu máquina, usando todos los núcleos disponibles.
- Ideal para desarrollo y pruebas
3. `spark = SparkSession(sc):`

- Crea una sesión de Spark usando el contexto previamente creado
- Permite trabajar con DataFrames y realizar operaciones SQL
- Es la forma moderna de interactuar con Spark (reemplaza RDDs antiguos)

In [1]:
from pyspark import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext('local')
spark = SparkSession(sc)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/23 14:47:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Lectura de datos

Vamos a leer el dataframe y lo almacenaremos en el cache por la siguientes razones:
1. Estaremos llamando el dataframe muchas veces ya que lo vamos a limpiar. Si no cacheas, Spark recalculará desde el origen en cada paso intermedio (ineficiente).
2. El dataframe es pequeño, por lo cual cabe en memoria.

`Beneficios`

- Velocidad: Operaciones como dropDuplicates() o groupBy() serán más rápidas al evitar recalcular desde el origen.

- Simplicidad: No necesitas preocuparte por repetir lecturas del archivo.

- Debugging: Puedes revisar resultados intermedios con show() sin penalización de rendimiento.

`¿Cuándo NO cachear?`

- Si el DataFrame solo se usa una vez.
- Si haces una sola acción después de todas las transformaciones.

`Para DataFrames grandes que no caben en memoria pero requieren reutilización:̣`

- Usa MEMORY_AND_DISK como nivel de persistencia.

- Reparticiona para manejar trozos más pequeños.

- Monitoriza el uso de recursos en la UI de Spark.

In [2]:
import os

file_path = lambda file: os.path.join(os.getcwd(),'data',file)
df = spark.read.csv(file_path('Table_Store.csv'),sep=',',header=True)
df.show(10)

+--------+--------------+----------------+--------+----------+--------------------+
|Store_ID|          City|       Open_Date|Zip_Code|     Phone|               Email|
+--------+--------------+----------------+--------+----------+--------------------+
|       1| *San Antonio*|24/07/2003 21:45|   78015|2136875667| *store1@retail.com*|
|       2| @San Antonio!| 27/10/2005 7:01|   78201|2135167236| @store2@retail.com!|
|       3| #San Antonio#|01/02/2007 16:59|   78112|2123501192|#store3@walmart.com#|
|       4|     &Houston&| 17/07/2004 0:28|   77001|3129778178|&store4@walmart.com&|
|       5|     !Houston!| 23/04/2009 1:08|   77002|7136191883|  !store5@store.com!|
|       6| $Los Angeles$|10/08/2006 19:42|   90001|2127216342|  $store6@store.com$|
|       7|     %Phoenix%| 16/10/2007 8:02|   60601|7138018755|  %store7@store.com%|
|       8|   #San Diego#| 10/03/2002 5:40|   94023|2125445446| #store8@retail.com#|
|       9|    @New York@| 24/07/2007 4:31|   10001|3122242101|  @store9@stor

Veamos la estructura del dataframe

In [3]:
df.printSchema()

root
 |-- Store_ID: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Open_Date: string (nullable = true)
 |-- Zip_Code: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- Email: string (nullable = true)



Podemos observar lo siguiente:
1. Todas las columnas son de tipo String
1. Todas las columnas permiten valores nulos `nullable = true`

# Limpieza y transformacion

La limpieza y transformación de los datos la vamos a realizar siguiendo los siguientes pasos:

1. Consultar la cantidad de registros dúplicados
2. Convertir las columnas a su respectivo formato.
3. Consultar la cantidad de valores faltantes
4. Eliminar caracteres especiales


## Duplicados

In [4]:
duplicados = df.groupBy(df.columns).count().filter('count > 1').count()
print(f"Existen un total de {duplicados} registros dúplicados")
df= df.dropDuplicates() # elimina los dúplicados

Existen un total de 0 registros dúplicados


## Convertir columnas a su respectivo formato

In [5]:
from pyspark.sql.functions import to_timestamp, col

columns_to_num = ["Store_ID"] # columnas a convertir a entero
for col_name in columns_to_num:
    df = df.withColumn(col_name, df[col_name].cast("int"))

# convierte la clumna Open_Date a datetime 
df = df.withColumn("Open_Date", to_timestamp(col("Open_Date"), "dd/MM/yyyy H:mm"))
df = df.sort("Store_ID")
df.printSchema()

root
 |-- Store_ID: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Open_Date: timestamp (nullable = true)
 |-- Zip_Code: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- Email: string (nullable = true)



In [6]:
df.show(10)

+--------+--------------+-------------------+--------+----------+--------------------+
|Store_ID|          City|          Open_Date|Zip_Code|     Phone|               Email|
+--------+--------------+-------------------+--------+----------+--------------------+
|       1| *San Antonio*|2003-07-24 21:45:00|   78015|2136875667| *store1@retail.com*|
|       2| @San Antonio!|2005-10-27 07:01:00|   78201|2135167236| @store2@retail.com!|
|       3| #San Antonio#|2007-02-01 16:59:00|   78112|2123501192|#store3@walmart.com#|
|       4|     &Houston&|2004-07-17 00:28:00|   77001|3129778178|&store4@walmart.com&|
|       5|     !Houston!|2009-04-23 01:08:00|   77002|7136191883|  !store5@store.com!|
|       6| $Los Angeles$|2006-08-10 19:42:00|   90001|2127216342|  $store6@store.com$|
|       7|     %Phoenix%|2007-10-16 08:02:00|   60601|7138018755|  %store7@store.com%|
|       8|   #San Diego#|2002-03-10 05:40:00|   94023|2125445446| #store8@retail.com#|
|       9|    @New York@|2007-07-24 04:31:0

## Valores faltantes

Para la consulta de datos faltantes, tenemos dos opciones:
1. utilizar la acción `count`, pero esto se ejecutaria en cada columna, lo que seria costoso.
2. Verifica en cada columna si el valor es nulo, asignando 1 si así lo es, y cero si no, posteriormente suma los 1

In [7]:
# Contar valores nulos por columna
nulos = {col:df.filter(df[col].isNull()).count() for col in df.columns}
print(nulos)

{'Store_ID': 0, 'City': 0, 'Open_Date': 0, 'Zip_Code': 0, 'Phone': 0, 'Email': 0}


In [8]:
from pyspark.sql.functions import col, sum, when

df.select([ sum(when(col(columna).isNull(),1).otherwise(0)).alias(columna) for columna in df.columns]).show()

+--------+----+---------+--------+-----+-----+
|Store_ID|City|Open_Date|Zip_Code|Phone|Email|
+--------+----+---------+--------+-----+-----+
|       0|   0|        0|       0|    0|    0|
+--------+----+---------+--------+-----+-----+



## Eliminar caracteres especiales

En este caso, vamos a eliminar los caracteres especiales de los nombres de las ciudades y de los correos electrónicos.

- `[^a-zA-Z]` elimina todo menos letras
- `[^a-zA-Z ]` elimina todo menos letras y espacios
- `trim()` Elimina espacios al inicio y al final de un texto

In [9]:
from pyspark.sql.functions import regexp_replace, col, trim
# Eliminar caracteres especiales de la columna City
df = df.withColumn(
    'City',
    trim(regexp_replace(col('City'),"[^a-zA-Z ]",""))
                   )
df.show(2)

+--------+-----------+-------------------+--------+----------+-------------------+
|Store_ID|       City|          Open_Date|Zip_Code|     Phone|              Email|
+--------+-----------+-------------------+--------+----------+-------------------+
|       1|San Antonio|2003-07-24 21:45:00|   78015|2136875667|*store1@retail.com*|
|       2|San Antonio|2005-10-27 07:01:00|   78201|2135167236|@store2@retail.com!|
+--------+-----------+-------------------+--------+----------+-------------------+
only showing top 2 rows


Ahora limpiemos los correos electrónicos.
- lower(...) Convierte todo el correo electrónico a minuscula
- trim (...) Elimina espacios al inicio y final
- Patrón regex: `r'^[*#&!$%@]+|[*#&!$%@]+$'`  
    - ^[*#&!$%@]+ : Busca uno o más caracteres especiales (*, #, &, !, $, %, @) al inicio del string (^)

    - | : Operador OR (o)

    - [*#&!$%@]+$ : Busca uno o más caracteres especiales al final del string ($)

- Reemplazo: Sustituye los caracteres encontrados por cadena vacía ('')

In [10]:
# Eliminemos caracteres especiales de correo electrónico
from pyspark.sql.functions import lower
df = df.withColumn(
    "Email",
    lower(trim(regexp_replace(col("Email"), r'^[*#&!$%@]+|[*#&!$%@]+$', '')))
)
df.show(10)

+--------+------------+-------------------+--------+----------+-------------------+
|Store_ID|        City|          Open_Date|Zip_Code|     Phone|              Email|
+--------+------------+-------------------+--------+----------+-------------------+
|       1| San Antonio|2003-07-24 21:45:00|   78015|2136875667|  store1@retail.com|
|       2| San Antonio|2005-10-27 07:01:00|   78201|2135167236|  store2@retail.com|
|       3| San Antonio|2007-02-01 16:59:00|   78112|2123501192| store3@walmart.com|
|       4|     Houston|2004-07-17 00:28:00|   77001|3129778178| store4@walmart.com|
|       5|     Houston|2009-04-23 01:08:00|   77002|7136191883|   store5@store.com|
|       6| Los Angeles|2006-08-10 19:42:00|   90001|2127216342|   store6@store.com|
|       7|     Phoenix|2007-10-16 08:02:00|   60601|7138018755|   store7@store.com|
|       8|   San Diego|2002-03-10 05:40:00|   94023|2125445446|  store8@retail.com|
|       9|    New York|2007-07-24 04:31:00|   10001|3122242101|   store9@sto

# Broadcast join

Finalmente, uniremos dos dataframes utilizando el método `broadcast`, ya que uno de ellos es significativamente más pequeño. La unión se realizará con el tipo `left`, lo que nos permitirá conservar todos los registros del primer dataframe y solo los coincidentes del segundo.

In [11]:
df1 = spark.read.csv(file_path('TFM_Table_Walmart_Sales.csv'),sep=';',header=True)
df1.show(5)

+-----+----------+------------+------------+------------------+------------------+------------------+------------------+--------------+
|Store|      Date|Weekly_Sales|Holiday_Flag|       Temperature|        Fuel_Price|               CPI|      Unemployment|Holiday_Events|
+-----+----------+------------+------------+------------------+------------------+------------------+------------------+--------------+
|    1|05-02-2010|    16436909|           0|42.310000000000002|2.5720000000000001|211.09635840000001|8.1059999999999999|     No aplica|
|    1|12-02-2010|   164195744|           1|38.509999999999998|             2.548|        211.242176|8.1059999999999999|    Super Bowl|
|    1|19-02-2010|   161196816|           0|             39.93|2.5139999999999998|211.28913919999999|8.1059999999999999|     No aplica|
|    1|26-02-2010|   140972752|           0|46.630000000000003|2.5609999999999999|211.31964160000001|8.1059999999999999|     No aplica|
|    1|05-03-2010|   155480672|           0|    

Veamos las dimensiones del dataframe

In [12]:
print((df1.count(), len(df1.columns)))
print((df.count(), len(df.columns)))

(6435, 9)
(45, 6)


Ahora, realicemos el join, en este caso :
- broadcast(df): ndica a Spark que df es un DataFrame pequeño, así que lo puede enviar (broadcast) a todas las particiones del clúster para optimizar la unión. Esto evita shuffles costosos al unir un DataFrame grande con uno pequeño.

In [13]:
from pyspark.sql.functions import broadcast

# Join cuando las columnas tienen nombres diferentes
df_selected = df.select("Store_ID", "City")  # selecciona todas las que
df1 = df1.join(broadcast(df_selected), df1["Store"] == df_selected["Store_ID"], "left")
df1 = df1.drop("Store_ID") 
df1.show(15)

+-----+----------+------------+------------+------------------+------------------+------------------+------------------+--------------+-----------+
|Store|      Date|Weekly_Sales|Holiday_Flag|       Temperature|        Fuel_Price|               CPI|      Unemployment|Holiday_Events|       City|
+-----+----------+------------+------------+------------------+------------------+------------------+------------------+--------------+-----------+
|    1|05-02-2010|    16436909|           0|42.310000000000002|2.5720000000000001|211.09635840000001|8.1059999999999999|     No aplica|San Antonio|
|    1|12-02-2010|   164195744|           1|38.509999999999998|             2.548|        211.242176|8.1059999999999999|    Super Bowl|San Antonio|
|    1|19-02-2010|   161196816|           0|             39.93|2.5139999999999998|211.28913919999999|8.1059999999999999|     No aplica|San Antonio|
|    1|26-02-2010|   140972752|           0|46.630000000000003|2.5609999999999999|211.31964160000001|8.105999999

# Graficar

Vamos a graficar una serie temporal, para esto debemos traer los datos a la memoria, por lo cual usaremos pandas y plotly

In [14]:
import pandas as pd
# Seleccionar columnas específicas
df_selected = df1.select('Store', 'Date', 'Temperature')
df_selected = df_selected.filter(df_selected['Store'] == '1')



In [15]:
df_pd = df_selected.toPandas()
df_pd.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 143 entries, 0 to 142
Data columns (total 3 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   Store        143 non-null    object
 1   Date         143 non-null    object
 2   Temperature  143 non-null    object
dtypes: object(3)
memory usage: 3.5+ KB


In [17]:
df_pd.head()

Unnamed: 0,Store,Date,Temperature
0,1,2010-02-05,42.31
1,1,2010-02-12,38.51
2,1,2010-02-19,39.93
3,1,2010-02-26,46.63
4,1,2010-03-05,46.5


In [18]:
import plotly.graph_objects as go
df_pd['Date'] = pd.to_datetime(df_pd['Date'], format = '%d-%m-%Y')
df_pd['Temperature'] = df_pd['Temperature'].astype(float)

def plot_time_series(df, x_var, y_var, title, xaxis_title, yaxis_title,dash='dash'):
    # Asegurar que la variable del eje x esté en formato datetime
    if not df[x_var].dtype.name.startswith('datetime'):
        df[x_var] = pd.to_datetime(df[x_var])

    # Crear figura
    fig = go.Figure()

    # Agregar la serie con línea punteada
    fig.add_trace(go.Scatter(
        x=df[x_var],
        y=df[y_var],
        mode='lines',
        name=y_var.replace('_', ' ').capitalize(),
        line=dict(color='blue', width=2, dash=dash)
    ))

    # Actualizar diseño
    fig.update_layout(
        title=f'<b>{title}</b>',
        title_font=dict(size=20, family='Arial', color='black'),
        xaxis_title=f'<b>{xaxis_title}</b>',
        yaxis_title=f'<b>{yaxis_title}</b>',
        font=dict(family='Arial', size=12, color='black'),
        plot_bgcolor='white',
        paper_bgcolor='white',
        hovermode='x unified',
        height=400,
        margin=dict(l=50, r=50, b=50, t=80)
    )

    # Formato año-mes en eje x
    fig.update_xaxes(showgrid=True, gridcolor='lightgray', tickformat='%Y-%m')
    fig.update_yaxes(showgrid=True, gridcolor='lightgray')

    fig.show()
    return fig

fig =plot_time_series(df_pd, 'Date', 'Temperature', 
                 title='Comportamiento de la temperatura',
                 xaxis_title='Fecha',
                 yaxis_title='Temperatura',
                     dash='dash')