# ETL Usando PySpark con DataFrames:

## Configuración inicial del google Colab:

## Configuración Inicial en Google Colab

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz


!tar xf  spark-3.5.5-bin-hadoop3.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.5-bin-hadoop3"

import findspark
findspark.init()

## Agregamos una librería extra para que me permita usar Drive

In [2]:
# Extra Libs
from google.colab import drive
drive.mount('drive/')

Mounted at drive/


## Creamos una sesión de Spark

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("E-commerce_Data_Transformation").getOrCreate()
sc = spark.sparkContext

## Cargamos el csv en el cuál tenemos los datos

In [5]:
archivo_content = spark.read.csv("/content/drive/MyDrive/ITESO/Cursada/Proyecto de Ingeniería de datos/Proyecto/Data/DataOriginal/E-commerceDataset.csv",header=True)

### Ahora lo procesamos para ver que no haya repetidos:

In [6]:
# Veamos los primeros 5 elementos para ver la estructura:
archivo_content.show(5)

# Ahora vamos a ver si hay combinaciones de order_Date y Customer_Id que se repitan:
tablas_agrupadas = archivo_content.select("Order_Date","Customer_Id","Time","Product").groupBy(["Order_Date","Customer_Id","Time"]).agg(F.count("Time").alias("Total_Repetidos"))
tablas_filtradas = tablas_agrupadas.filter(F.col("Total_Repetidos") > 1)
tablas_filtradas.show()

# Cómo podemos apreciar no se devuelven datos del DataFrame de tablas_filtradas, por consiguiente sólo se permite una compra por orden

+----------+--------+-----+-----------+------+-----------+-------------------+------------------+-----------------+-----+--------+--------+------+-------------+--------------+--------------+
|Order_Date|    Time|Aging|Customer_Id|Gender|Device_Type|Customer_Login_type|  Product_Category|          Product|Sales|Quantity|Discount|Profit|Shipping_Cost|Order_Priority|Payment_method|
+----------+--------+-----+-----------+------+-----------+-------------------+------------------+-----------------+-----+--------+--------+------+-------------+--------------+--------------+
|2018-01-02|10:56:33|  8.0|      37077|Female|        Web|             Member|Auto & Accessories|Car Media Players|140.0|     1.0|     0.3|  46.0|          4.6|        Medium|   credit_card|
|2018-07-24|20:41:37|  2.0|      59173|Female|        Web|             Member|Auto & Accessories|     Car Speakers|211.0|     1.0|     0.3| 112.0|         11.2|        Medium|   credit_card|
|2018-11-08|08:38:49|  8.0|      41066|Female

Ya que no hubo información acerca de que hubiera dos ordenes con el mismo id de cliente en el mismo tiempo, podemos asumir que en realidad no es necesario crear una entidad de detalle dado que sólo se puede comprar un producto por vez (Horrible)

## Hacemos limpieza

In [None]:
# Primero cargamos la tabla de casos particulares para ver que hacemos con esos después:

# Lista de columnas del DataFrame
columnas = archivo_content.columns

# Crear condición para encontrar filas con NULL o valores vacíos ("")
condicion = None
for c in columnas:
    nueva_cond = (F.col(c).isNull()) | (F.col(c) == "")
    condicion = nueva_cond if condicion is None else condicion | nueva_cond

# Aplicar el filtro
Ordenes_con_null = archivo_content.filter(condicion)

# Mostrar las filas con al menos un NULL o valor vacío
Ordenes_con_null.show()

# Guardamos los datos cómo un csv:
Ordenes_con_null.write.csv("/content/drive/MyDrive/Colab-Notebooks/E-commerce_Data_PID/DataModificado",header=True,mode="overwrite")

+----------+--------+-----+-----------+------+-----------+-------------------+------------------+--------------------+-----+--------+--------+------+-------------+--------------+--------------+
|Order_Date|    Time|Aging|Customer_Id|Gender|Device_Type|Customer_Login_type|  Product_Category|             Product|Sales|Quantity|Discount|Profit|Shipping_Cost|Order_Priority|Payment_method|
+----------+--------+-----+-----------+------+-----------+-------------------+------------------+--------------------+-----+--------+--------+------+-------------+--------------+--------------+
|2018-05-02|11:45:38| NULL|      26058|Female|        Web|             Member|Auto & Accessories|   Car Media Players|140.0|     1.0|     0.3|  55.8|          5.6|          High|   credit_card|
|2018-04-22|11:32:22|  5.0|      52267|  Male|        Web|             Member|Auto & Accessories|          Bike Tyres| 72.0|    NULL|     0.1|  36.0|          3.6|      Critical|   credit_card|
|2018-08-05|17:27:54|  6.0|   

In [7]:
#Eliminar espacios en blanco de New
archivo_content_2 = archivo_content.withColumn("Customer_Login_type", F.trim(F.col("Customer_Login_type")))
archivo_content_2.filter(F.col('Customer_Login_type') == 'New').show()

+----------+--------+-----+-----------+------+-----------+-------------------+------------------+--------------------+-----+--------+--------+------+-------------+--------------+--------------+
|Order_Date|    Time|Aging|Customer_Id|Gender|Device_Type|Customer_Login_type|  Product_Category|             Product|Sales|Quantity|Discount|Profit|Shipping_Cost|Order_Priority|Payment_method|
+----------+--------+-----+-----------+------+-----------+-------------------+------------------+--------------------+-----+--------+--------+------+-------------+--------------+--------------+
|2018-02-18|18:51:12|  7.0|      48651|  Male|        Web|                New|Auto & Accessories|                Tyre|250.0|     4.0|     0.3| 140.0|         14.0|          High|   credit_card|
|2018-11-21|14:08:52|  5.0|      53848|Female|        Web|                New|Auto & Accessories|Car Pillow & Neck...|231.0|     5.0|     0.1| 139.5|         13.9|      Critical|   credit_card|
|2018-07-16|19:40:09|  6.0|   

In [9]:
import shutil
import os
import glob

# Paso 1: Filtrar las filas con NULL o ""
columnas = archivo_content_2.columns
condicion = None
for c in columnas:
    nueva_cond = (F.col(c).isNull()) | (F.col(c) == "")
    condicion = nueva_cond if condicion is None else condicion | nueva_cond

Ordenes_con_null = archivo_content_2.filter(condicion)

# Paso 2: Reparticionar a una sola partición
Ordenes_con_null = Ordenes_con_null.coalesce(1)

# Paso 3: Guardar como CSV en carpeta temporal
ruta_temporal = "/content/Ordenes_con_null_tmp"
Ordenes_con_null.write.csv(ruta_temporal, header=True, mode="overwrite")

# Paso 4: Mover el archivo CSV y renombrarlo
csv_generado = glob.glob(ruta_temporal + "/part-*.csv")[0]  # Busca el archivo CSV
csv_destino = "/content/drive/MyDrive/ITESO/Cursada/Proyecto de Ingeniería de datos/Proyecto/Data/DataModificada/Ordenes_con_null.csv"

# Crear carpeta destino si no existe
os.makedirs(os.path.dirname(csv_destino), exist_ok=True)

# Mover y renombrar
shutil.move(csv_generado, csv_destino)

# Paso 5: Eliminar la carpeta temporal
shutil.rmtree(ruta_temporal)

In [11]:
# Anti join para quitar los nulos
clean_data = archivo_content_2.join(Ordenes_con_null, on=['Order_Date', 'Time'], how='anti')
clean_data.show()

# Paso 0: Reparticionar a una sola partición
clean_data = clean_data.coalesce(1)

# Paso 1: Guardar como CSV en carpeta temporal
ruta_temporal = "/content/clean_data_tmp"
clean_data.write.csv(ruta_temporal, header=True, mode="overwrite")

# Paso 2: Mover el archivo CSV y renombrarlo
csv_generado = glob.glob(ruta_temporal + "/part-*.csv")[0]  # Busca el archivo CSV
csv_destino = "/content/drive/MyDrive/ITESO/Cursada/Proyecto de Ingeniería de datos/Proyecto/Data/DataModificada/clean_data.csv"

# Crear carpeta destino si no existe
os.makedirs(os.path.dirname(csv_destino), exist_ok=True)

# Mover y renombrar
shutil.move(csv_generado, csv_destino)

# Paso 5: Eliminar la carpeta temporal
shutil.rmtree(ruta_temporal)

+----------+--------+-----+-----------+------+-----------+-------------------+------------------+--------------------+-----+--------+--------+------+-------------+--------------+--------------+
|Order_Date|    Time|Aging|Customer_Id|Gender|Device_Type|Customer_Login_type|  Product_Category|             Product|Sales|Quantity|Discount|Profit|Shipping_Cost|Order_Priority|Payment_method|
+----------+--------+-----+-----------+------+-----------+-------------------+------------------+--------------------+-----+--------+--------+------+-------------+--------------+--------------+
|2018-01-02|10:56:33|  8.0|      37077|Female|        Web|             Member|Auto & Accessories|   Car Media Players|140.0|     1.0|     0.3|  46.0|          4.6|        Medium|   credit_card|
|2018-07-24|20:41:37|  2.0|      59173|Female|        Web|             Member|Auto & Accessories|        Car Speakers|211.0|     1.0|     0.3| 112.0|         11.2|        Medium|   credit_card|
|2018-11-08|08:38:49|  8.0|   

In [12]:
# Crear ids
from pyspark.sql.functions import monotonically_increasing_id
clean_data_id = clean_data.withColumn("Order_ID", monotonically_increasing_id())
clean_data_id.show()

+----------+--------+-----+-----------+------+-----------+-------------------+------------------+--------------------+-----+--------+--------+------+-------------+--------------+--------------+--------+
|Order_Date|    Time|Aging|Customer_Id|Gender|Device_Type|Customer_Login_type|  Product_Category|             Product|Sales|Quantity|Discount|Profit|Shipping_Cost|Order_Priority|Payment_method|Order_ID|
+----------+--------+-----+-----------+------+-----------+-------------------+------------------+--------------------+-----+--------+--------+------+-------------+--------------+--------------+--------+
|2018-01-02|10:56:33|  8.0|      37077|Female|        Web|             Member|Auto & Accessories|   Car Media Players|140.0|     1.0|     0.3|  46.0|          4.6|        Medium|   credit_card|       0|
|2018-07-24|20:41:37|  2.0|      59173|Female|        Web|             Member|Auto & Accessories|        Car Speakers|211.0|     1.0|     0.3| 112.0|         11.2|        Medium|   credit_

In [13]:
# crear las dimension category
dim_category = clean_data_id.groupBy("Product_Category").agg(F.count("*").alias("borrable"))

dim_category_id = (
    dim_category
    .withColumn("Category_ID", monotonically_increasing_id())
    .drop("borrable")
    .withColumnRenamed("Product_Category", "Category_Name")
)

dim_category_id.show()

# Paso 0: Reparticionar a una sola partición
dim_category_id = dim_category_id.coalesce(1)

# Paso 1: Guardar como CSV en carpeta temporal
ruta_temporal = "/content/dim_category_tmp"
dim_category_id.write.csv(ruta_temporal, header=True, mode="overwrite")

# Paso 2: Mover el archivo CSV y renombrarlo
csv_generado = glob.glob(ruta_temporal + "/part-*.csv")[0]  # Busca el archivo CSV
csv_destino = "/content/drive/MyDrive/ITESO/Cursada/Proyecto de Ingeniería de datos/Proyecto/Data/DataModificada/dims/dim_category.csv"

# Crear carpeta destino si no existe
os.makedirs(os.path.dirname(csv_destino), exist_ok=True)

# Mover y renombrar
shutil.move(csv_generado, csv_destino)

# Paso 5: Eliminar la carpeta temporal
shutil.rmtree(ruta_temporal)

+------------------+-----------+
|     Category_Name|Category_ID|
+------------------+-----------+
|Auto & Accessories|          0|
|           Fashion|          1|
|        Electronic|          2|
|  Home & Furniture|          3|
+------------------+-----------+



In [14]:
# Reemplazamos Category_Name por sus ID's
clean_data_id = clean_data_id.withColumnRenamed("Product_Category", "Category_Name")
clean_data_id = clean_data_id.join(dim_category_id, on='Category_Name', how='inner').drop("Category_Name")

clean_data_id.show()

+----------+--------+-----+-----------+------+-----------+-------------------+--------------------+-----+--------+--------+------+-------------+--------------+--------------+--------+-----------+
|Order_Date|    Time|Aging|Customer_Id|Gender|Device_Type|Customer_Login_type|             Product|Sales|Quantity|Discount|Profit|Shipping_Cost|Order_Priority|Payment_method|Order_ID|Category_ID|
+----------+--------+-----+-----------+------+-----------+-------------------+--------------------+-----+--------+--------+------+-------------+--------------+--------------+--------+-----------+
|2018-01-02|10:56:33|  8.0|      37077|Female|        Web|             Member|   Car Media Players|140.0|     1.0|     0.3|  46.0|          4.6|        Medium|   credit_card|       0|          0|
|2018-07-24|20:41:37|  2.0|      59173|Female|        Web|             Member|        Car Speakers|211.0|     1.0|     0.3| 112.0|         11.2|        Medium|   credit_card|       1|          0|
|2018-11-08|08:38:49

In [15]:
# crear la dimension product

dim_product = clean_data_id.groupBy(["Product", 'Category_ID']).agg(F.count("*").alias("borrable"))

dim_product_id = (
    dim_product
    .withColumn("Product_ID", monotonically_increasing_id())
    .drop("borrable")
    .withColumnRenamed("Product", "Product_Name")
)

dim_product_id.show()

# Paso 0: Reparticionar a una sola partición
dim_product_id = dim_product_id.coalesce(1)

# Paso 1: Guardar como CSV en carpeta temporal
ruta_temporal = "/content/dim_product_tmp"
dim_product_id.write.csv(ruta_temporal, header=True, mode="overwrite")

# Paso 2: Mover el archivo CSV y renombrarlo
csv_generado = glob.glob(ruta_temporal + "/part-*.csv")[0]  # Busca el archivo CSV
csv_destino = "/content/drive/MyDrive/ITESO/Cursada/Proyecto de Ingeniería de datos/Proyecto/Data/DataModificada/dims/dim_product.csv"

# Crear carpeta destino si no existe
os.makedirs(os.path.dirname(csv_destino), exist_ok=True)

# Mover y renombrar
shutil.move(csv_generado, csv_destino)

# Paso 5: Eliminar la carpeta temporal
shutil.rmtree(ruta_temporal)

+--------------------+-----------+----------+
|        Product_Name|Category_ID|Product_ID|
+--------------------+-----------+----------+
|   Car Media Players|          0|         0|
|        Car Speakers|          0|         1|
|     Car Body Covers|          0|         2|
|     Car & Bike Care|          0|         3|
|                Tyre|          0|         4|
|          Bike Tyres|          0|         5|
|             Car Mat|          0|         6|
|     Car Seat Covers|          0|         7|
|Car Pillow & Neck...|          0|         8|
|              Shirts|          1|         9|
|               Jeans|          1|        10|
|               Suits|          1|        11|
|         Sports Wear|          1|        12|
|        Casula Shoes|          1|        13|
|       Running Shoes|          1|        14|
|        Formal Shoes|          1|        15|
|            Sneakers|          1|        16|
|         Titak watch|          1|        17|
|        Fossil Watch|          1|

In [16]:
# Reemplazamos Product_Name por sus ID's
clean_data_id_2 = clean_data_id.withColumnRenamed("Product", "Product_Name").drop("Category_ID")
clean_data_id_3 = clean_data_id_2.join(dim_product_id, on='Product_Name', how='inner').drop("Product_Name").drop("Category_ID")

clean_data_id_3.show()

+----------+--------+-----+-----------+------+-----------+-------------------+-----+--------+--------+------+-------------+--------------+--------------+--------+----------+
|Order_Date|    Time|Aging|Customer_Id|Gender|Device_Type|Customer_Login_type|Sales|Quantity|Discount|Profit|Shipping_Cost|Order_Priority|Payment_method|Order_ID|Product_ID|
+----------+--------+-----+-----------+------+-----------+-------------------+-----+--------+--------+------+-------------+--------------+--------------+--------+----------+
|2018-05-10|00:20:09|  8.0|      26816|  Male|        Web|             Member|250.0|     3.0|     0.2| 132.5|         13.3|        Medium|   credit_card|   17762|        27|
|2018-12-19|21:22:19|  2.0|      27968|Female|        Web|             Member|250.0|     2.0|     0.2| 160.0|         16.0|          High|   credit_card|   17774|        27|
|2018-08-05|18:24:33|  6.0|      16360|  Male|        Web|             Member|250.0|     4.0|     0.1| 160.0|         16.0|       

In [17]:
# crear la dimension product

dim_costumer = clean_data_id.groupBy(["Customer_Id", 'Customer_Login_type', 'Gender']).agg(F.count("*").alias("borrable"))

dim_costumer_id = (
    dim_costumer
    .drop("borrable")
)

dim_costumer_id.show()

# Paso 0: Reparticionar a una sola partición
dim_costumer_id = dim_costumer_id.coalesce(1)

# Paso 1: Guardar como CSV en carpeta temporal
ruta_temporal = "/content/dim_costumer_tmp"
dim_costumer_id.write.csv(ruta_temporal, header=True, mode="overwrite")

# Paso 2: Mover el archivo CSV y renombrarlo
csv_generado = glob.glob(ruta_temporal + "/part-*.csv")[0]  # Busca el archivo CSV
csv_destino = "/content/drive/MyDrive/ITESO/Cursada/Proyecto de Ingeniería de datos/Proyecto/Data/DataModificada/dims/dim_costumer.csv"

# Crear carpeta destino si no existe
os.makedirs(os.path.dirname(csv_destino), exist_ok=True)

# Mover y renombrar
shutil.move(csv_generado, csv_destino)

# Paso 5: Eliminar la carpeta temporal
shutil.rmtree(ruta_temporal)

+-----------+-------------------+------+
|Customer_Id|Customer_Login_type|Gender|
+-----------+-------------------+------+
|      37077|             Member|Female|
|      59173|             Member|Female|
|      41066|             Member|Female|
|      50741|             Member|Female|
|      53639|             Member|Female|
|      39783|             Member|Female|
|      26767|             Member|Female|
|      20719|             Member|Female|
|      46947|             Member|Female|
|      31839|             Member|Female|
|      22249|             Member|Female|
|      15109|             Member|Female|
|      18622|             Member|Female|
|      56296|             Member|Female|
|      34138|             Member|Female|
|      51112|             Member|Female|
|      57057|             Member|Female|
|      50942|             Member|Female|
|      42384|             Member|Female|
|      26127|             Member|Female|
+-----------+-------------------+------+
only showing top

In [18]:
# mostramos la fact_table
fact_table = (
    clean_data_id_3
    .drop("Gender")
    .drop("Customer_Login_type")
)

fact_table.show()

# Paso 0: Reparticionar a una sola partición
fact_table = fact_table.coalesce(1)

# Paso 1: Guardar como CSV en carpeta temporal
ruta_temporal = "/content/fact_table_tmp"
fact_table.write.csv(ruta_temporal, header=True, mode="overwrite")

# Paso 2: Mover el archivo CSV y renombrarlo
csv_generado = glob.glob(ruta_temporal + "/part-*.csv")[0]  # Busca el archivo CSV
csv_destino = "/content/drive/MyDrive/ITESO/Cursada/Proyecto de Ingeniería de datos/Proyecto/Data/DataModificada/dims/fact_table.csv"

# Crear carpeta destino si no existe
os.makedirs(os.path.dirname(csv_destino), exist_ok=True)

# Mover y renombrar
shutil.move(csv_generado, csv_destino)

# Paso 5: Eliminar la carpeta temporal
shutil.rmtree(ruta_temporal)

+----------+--------+-----+-----------+-----------+-----+--------+--------+------+-------------+--------------+--------------+--------+----------+
|Order_Date|    Time|Aging|Customer_Id|Device_Type|Sales|Quantity|Discount|Profit|Shipping_Cost|Order_Priority|Payment_method|Order_ID|Product_ID|
+----------+--------+-----+-----------+-----------+-----+--------+--------+------+-------------+--------------+--------------+--------+----------+
|2018-05-10|00:20:09|  8.0|      26816|        Web|250.0|     3.0|     0.2| 132.5|         13.3|        Medium|   credit_card|   17762|        27|
|2018-12-19|21:22:19|  2.0|      27968|        Web|250.0|     2.0|     0.2| 160.0|         16.0|          High|   credit_card|   17774|        27|
|2018-08-05|18:24:33|  6.0|      16360|        Web|250.0|     4.0|     0.1| 160.0|         16.0|          High|   credit_card|   17786|        27|
|2018-07-13|13:14:25| 10.0|      30034|        Web|250.0|     3.0|     0.2| 132.5|         13.3|          High|   mone