
<center><h1>Medallion Architecture con PySpark</h1>
<center><h5><b>Versión para Google Colab</b></h3>

En esta notebook se trabajará con una serie de archivos .csv y se realizará el proceso de ETL siguiendo la Medallion Architecture.

El proceso está adaptado para poder ejecutarse desde Google Colab, en caso de no contar con créditos en Azure para el uso de Databricks.
</center>


<p align="center">
 <img src=https://media.licdn.com/dms/image/C4E12AQFDZNNpFYcwLQ/article-inline_image-shrink_1000_1488/0/1628283147919?e=1689206400&v=beta&t=WlxNVNtfJ6qQqIZqT-8PTJxZjQf5bvZkTzTQRb9kKnE width=200px>
 </p>

<center><h1><b>Fase I: De Bronze a Silver</b></h1>
<h5>En esta fase se extraerán los datos de la fuente, en este caso en formato .csv . Se subirán a nuestro entorno de trabajo en Colab y, luego de las transformaciones pertinentes, se almacenarán en formato Parquet en la capa silver.</h5>

<h3><b>Para comenzar, debemos subir nuestros archivos .csv al almacenamiento de sesión de Google Colab dentro de la carpeta 'bronze'</h3></b></center>

# **0. Instalación de Apache Spark en Google Colab**

<p align="center">
  <img src="https://www.vectorlogo.zone/logos/apache_spark/apache_spark-ar21.png" width="200px">
</p>

## 0.1 Instalamos y actualizamos PySpark

In [None]:
#Comando para instalar Y actualizar PySpark
!pip install --upgrade pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


## 0.2 Importamos/creamos la SparkSession. Esto es necesario para poder operar con el framework Apache Spark.

In [None]:
#Importamos SparkSession de PySpark
from pyspark.sql import SparkSession

#La variable 'spark' va a alojar la SparkSession que vamos a crear
spark = SparkSession.builder.appName('myAppName').getOrCreate()

## 0.3 Importamos los datatypes necesarios para armar el schema

In [None]:
#Datatypes necesarios
from pyspark.sql.types import IntegerType, StringType, DoubleType, StructField, StructType

# **1. Trabajamos con 'orders.csv' de la capa bronce**

<p align="center">
<img src=https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcSaAFeb45X1cH0uyReSZXaPxvs-jvoisalsCw&usqp=CAU width=200px>

## 1.1 Definimos el path del csv 'orders' y definimos el schema como 'orders_schema'. 'orders_df' va a ser nuestro dataframe.

In [None]:
#Definimos el path de 'orders'
orders_path = '/content/bronze/orders.csv'

#Definimos el schema de 'orders'
orders_schema = StructType([
                    StructField('ORDER_ID', IntegerType(), False),
                    StructField('ORDER_DATETIME', StringType(), False),
                    StructField('CUSTOMER_ID', IntegerType(), False),
                    StructField('ORDER_STATUS', StringType(), False),
                    StructField('STORE_ID', IntegerType(), False)
                    ]
                    )
#Indicamos path del dataframe, tipo de encabezado y el schema, en la variable 'orders_df'
orders_df = spark.read.csv(path=orders_path, header=True, schema=orders_schema)

## 1.2 Importamos la función 'to_timestamp' que va a ser necesario para castear la columna 'ORDER_DATETIME' y hacemos el casteo en 'ORDER_TIMESTAMP'

In [None]:
#importamos 'to_timestamp'
from pyspark.sql.functions import to_timestamp

In [None]:
# Los cambios se realizarán sobre el mismo 'orders_df'
orders_df = orders_df.select('ORDER_ID', \
                             to_timestamp(orders_df['ORDER_DATETIME'], 'dd-MMM-yy kk.mm.ss.SS').alias('ORDER_TIMESTAMP'), \
                             'CUSTOMER_ID', \
                             'ORDER_STATUS', \
                             'STORE_ID'
                            )

## 1.3 Filtramos 'ORDER_STATUS' para que solo queden los que están completados

In [None]:
# Filtramos los 'complete' con .filter
orders_df = orders_df.filter(orders_df['ORDER_STATUS']=='COMPLETE')

## 1.4 Importar 'stores.csv' y hacer un JOIN, para mostrar los nombres de las tiendas, en lugar de los ID

In [None]:
# Definimos el path de 'stores'
stores_path = '/content/bronze/stores.csv'

# Definimos el schema de 'stores'
stores_schema = StructType([
                            StructField('STORE_ID', IntegerType(), False),
                            StructField('STORE_NAME', StringType(), False),
                            StructField('WEB_ADDRESS', StringType(), False),
                            StructField('LATITUDE', DoubleType(), False),
                            StructField('LONGITUDE', DoubleType(), False),
                            ]
                            )

# Indicamos el path del dataframe, el tipo de encabezado y el schema en la variable 'stores_df'
stores_df = spark.read.csv(path=stores_path, header=True, schema=stores_schema)

## 1.5 Hacemos un LEFT JOIN de orders con stores para agregar 'store_name' al dataframe. Seleccionamos sólo las columnas necesarias

In [None]:
# Mediante '.join' de PySpark podremos realizar un join de las tablas, luego un select para elegir con qué columnas nos quedamos
orders_df = orders_df.join(stores_df, orders_df['store_id']==stores_df['store_id'], 'left').select('ORDER_ID','ORDER_TIMESTAMP','CUSTOMER_ID','STORE_NAME')

## 1.6 Sobreescribimos los archivos en la capa 'Silver' como un archivo Parquet

In [None]:
# El comando '.write' va acompañado del formato en que queremos escribir. En este caso .parquet, pero podría haber sido .csv
orders_df.write.parquet('/content/silver/orders', mode='overwrite')

# **2. Trabajamos con 'order_items.csv' de la capa bronce**



<p align="center">
<img src=https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcSaAFeb45X1cH0uyReSZXaPxvs-jvoisalsCw&usqp=CAU width=200px>

## 2.1 Definimos el path e importamos el schema

In [None]:
# Definimos el path de 'order_items'
order_items_path = '/content/bronze/order_items.csv'

# Definimos el schema de 'order_items'
order_items_schema = StructType([
                                  StructField('ORDER_ID', IntegerType(), False),
                                  StructField('LINE_ITEM_ID', StringType(), False),
                                  StructField('PRODUCT_ID', IntegerType(), False),
                                  StructField('UNIT_PRICE', DoubleType(), False),
                                  StructField('QUANTITY', IntegerType(), False)
                                ]
                                )

# Indicamos el path del dataframe, el tipo de encabezado y el schema en la variable 'order_items_df'
order_items_df = spark.read.csv(path=order_items_path, header=True, schema=order_items_schema)

## 2.2 Seleccionamos las columnas necesarias y dropeamos las que no vamos a usar

In [None]:
# con '.drop' podemos remover las columnas que no necesitemos
order_items_df = order_items_df.drop('LINE_ITEM_ID')

## 2.3 Sobreescribimos 'order_items' en la capa Silver como un archivo Parquet

In [None]:
# El comando '.write' va acompañado del formato en que queremos escribir. En este caso .parquet, pero podría haber sido .csv
order_items_df.write.parquet('/content/silver/order_items', mode='overwrite')

# **3. Trabajamos con 'products.csv' de la capa bronce**

<p align="center">
<img src=https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcSaAFeb45X1cH0uyReSZXaPxvs-jvoisalsCw&usqp=CAU width=200px>

## 3.1 Definimos el path e importamos el schema

In [None]:
# Definimos el path de 'products'
products_path = '/content/bronze/products.csv'

# Definimos el schema de 'products'
products_schema = StructType([
                              StructField('PRODUCT_ID', IntegerType(), False),
                              StructField('PRODUCT_NAME', StringType(), False),
                              StructField('UNIT_PRICE', DoubleType(), False)
                             ]
                             )

# Indicamos el path del dataframe, el tipo de encabezado y el schema en la variable 'order_items_df'
products_df = spark.read.csv(path=products_path, header=True, schema=products_schema)

## 3.2 Sobreescribimos 'products' en la capa Silver como un archivo Parquet

In [None]:
# El comando '.write' va acompañado del formato en que queremos escribir. En este caso .parquet, pero podría haber sido .csv
products_df.write.parquet('/content/silver/products', mode='overwrite')

# **4. Trabajamos con 'customers.csv' de la capa bronce**

<p align="center">
<img src=https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcSaAFeb45X1cH0uyReSZXaPxvs-jvoisalsCw&usqp=CAU width=200px>

## 4.1 Definimos el path e importamos el schema

In [None]:
# Definimos el path de 'customers'
customers_path = '/content/bronze/customers.csv'

# Definimos el schema de 'customers'
customers_schema = StructType([
                                StructField('CUSTOMER_ID', IntegerType(), False),
                                StructField('FULL_NAME', StringType(), False),
                                StructField('EMAIL_ADRESS', StringType(), False)
                              ]
                              )

# Indicamos el path del dataframe, el tipo de encabezado y el schema en la variable 'customers_df'
customers_df = spark.read.csv(path=customers_path, header=True, schema=customers_schema)

## 4.2 Sobreescribimos 'customers' en la capa Silver como un archivo Parquet

In [None]:
# El comando '.write' va acompañado del formato en que queremos escribir. En este caso .parquet, pero podría haber sido .csv
customers_df.write.parquet('/content/silver/customers', mode='overwrite')

<center><h1><b>Fase II: De Silver a Gold</b></h1></center>

<center>En esta etapa se realizará el enriquecimiento de los datos previamente tratados. Se importarán los archivos en formato parquet desde la carpeta 'silver', previamente creada.</center>

## 0.1 Definimos el path y leemos los Parquet desde la capa Silver

In [None]:
# En las siguientes variables alojaremos la ruta de cada uno de los parquet, a modo de acceso directo
customers_silver_path = '/content/silver/customers'
order_items_silver_path = '/content/silver/order_items'
orders_silver_path = '/content/silver/orders'
products_silver_path = '/content/silver/products'

# Mediante 'spark.read.parquet' leemos la carpeta donde está alojada cada tabla guardada anteriormente
customers_silver_df = spark.read.parquet(customers_silver_path)
order_items_silver_df = spark.read.parquet(order_items_silver_path)
orders_silver_df = spark.read.parquet(orders_silver_path)
products_silver_df = spark.read.parquet(products_silver_path)

<h2>1.1 Comenzamos a trabajar con 'orders', de la capa silver. Podemos hacer un '.show()' para constatar que la información se haya levantado correctamente y un '.dtypes' para comprobar el tipo de dato de cada columna

In [None]:
# order_silver_df.show(5)

In [None]:
# orders_silver_df.dtypes

[('ORDER_ID', 'int'),
 ('ORDER_TIMESTAMP', 'timestamp'),
 ('CUSTOMER_ID', 'int'),
 ('STORE_NAME', 'string')]

<h2>1.2 Creamos el dataframe 'order_details_df' que contendrá la información de 'orders', pero con la columna 'DATE', utilizando la función 'to_date' de PySpark

In [None]:
# Importamos la función 'to_date' que será necesaria para realizar casteos
from pyspark.sql.functions import to_date

In [None]:
# Mediante el '.select', elegimos las columnas que vamos a usar y además realizamos el casteo de 'ORDER_TIMESTAMP' a 'DATE'
order_details_df = orders_silver_df.select(
    'ORDER_ID',
    to_date('ORDER_TIMESTAMP').alias('DATE'),
    'CUSTOMER_ID',
    'STORE_NAME'
)

<h2>1.3 Hacemos un LEFT JOIN del nuevo dataframe 'order_details' con 'order_items' de la capa silver y seleccionamos únicamente las columnas necesarias

In [None]:
# Hacemos el JOIN, declarando el tipo y la condición unir las tablas. Con '.select' agarramos las columnas necesarias de ambas tablas
order_details_df = order_details_df.join(
    order_items_silver_df,
    order_items_silver_df['ORDER_ID'] == order_details_df['ORDER_ID'],
    'left'
).select(
    order_details_df['ORDER_ID'],
    order_details_df['DATE'],
    order_details_df['STORE_NAME'],
    order_details_df['CUSTOMER_ID'],
    order_items_silver_df['UNIT_PRICE'],
    order_items_silver_df['QUANTITY'])

<h2> 1.4 Creamos una nueva columna llamada 'TOTAL_SALES_AMOUNT', en la que vamos a tener 'UNIT_PRICE' multiplicado por 'QUANTITY'

In [None]:
# Se realizan los cambios sobre el mismo 'order_details_df'
order_details_df = order_details_df.withColumn('TOTAL_SALES_AMOUNT',order_details_df['UNIT_PRICE']*order_details_df['QUANTITY'])

## 1.5 Agrupamos el dataframe 'order_details' tomando la suma del monto total y creamos una nueva columna llamada 'TOTAL_ORDER_AMOUNT' con la cantidad de pedidos.

In [None]:
# Se realizan los cambios sobre el mismo 'order_details_df'
order_details_df = order_details_df.groupBy('ORDER_ID', 'DATE', 'CUSTOMER_ID', 'STORE_NAME') \
    .sum('TOTAL_SALES_AMOUNT')\
    .withColumnRenamed('sum(TOTAL_SALES_AMOUNT)', 'TOTAL_ORDER_AMOUNT')

## 1.6 Redondeamos la columna de 'TOTAL_ORDER_AMOUNT' a únicamente dos decimales

In [None]:
# Importamos la función 'round', que sirve para redondear un valor float a, por ejemplo, dos decimales
from pyspark.sql.functions import round

In [None]:
order_details_df = order_details_df.withColumn('TOTAL_ORDER_AMOUNT', round('TOTAL_ORDER_AMOUNT',2))

## 1.7 Guardamos la tabla modificada en la capa gold como 'ORDER_DETAILS'

In [None]:
# El comando '.write' va acompañado del formato en que queremos escribir. En este caso .parquet, pero podría haber sido .csv
order_details_df.write.parquet('/content/gold/order_details', mode='overwrite')

## 2. Creamos una tabla agregada llamada 'monthly_sales_df', para mostrar el total de ventas por mes y lo guardamos en la capa gold

## 2.1 Creamos una columna que extraiga el mes y el año de la columna 'DATE'

In [None]:
# Importamos 'date_format', que nos va a servir para especificar el formato de la fecha
from pyspark.sql.functions import date_format

In [None]:
# Especificamos cómo aparecerá la fecha en 'MONTH_YEAR'
sales_with_month = order_details_df.withColumn('MONTH_YEAR', date_format('DATE','yyyy-MM'))

## 2.2 Agrupamos, sumamos, redondeamos, ordenamos y seleccionamos datos relacionados con las ventas mensuales.

In [None]:
# Agrupamos segun 'MONTH_YEAR'
monthly_sales_df = sales_with_month.groupBy('MONTH_YEAR') \
    .sum('TOTAL_ORDER_AMOUNT') \
    .withColumn('TOTAL_SALES', round('sum(TOTAL_ORDER_AMOUNT)', 2)) \
    .sort(sales_with_month['MONTH_YEAR'].desc()) \
    .select('MONTH_YEAR', 'TOTAL_SALES')

## 2.3 Sobreescribimos 'monthly_sales' en la capa Gold, en formato Parquet

In [None]:
# El comando '.write' va acompañado del formato en que queremos escribir. En este caso .parquet, pero podría haber sido .csv
monthly_sales_df.write.parquet('/content/gold/monthly_sales',mode='overwrite')

## 3. Creamos una tabla agregada llamada 'STORE_MONTHLY_SALES', donde vamos a mostrar el total de ventas por mes de cada tienda y lo guardamos en la capa gold

## 3.1 Hacemos lo mismo que en el punto anterior, pero en este caso agrupamos por 'STORE_NAME'.

In [None]:
#  Agrupamos, sumamos, redondeamos, ordenamos y seleccionamos datos relacionados con las ventas mensuales por tienda
store_monthly_sales_df = sales_with_month.groupBy('MONTH_YEAR', 'STORE_NAME') \
    .sum('TOTAL_ORDER_AMOUNT') \
    .withColumn('TOTAL_SALES', round('sum(TOTAL_ORDER_AMOUNT)', 2)) \
    .sort(sales_with_month['MONTH_YEAR'].desc()) \
    .select('MONTH_YEAR', 'STORE_NAME', 'TOTAL_SALES')

## 3.2 Sobreescribimos 'store_monthly_sales' en la capa gold, en formato parquet

In [None]:
# # El comando '.write' va acompañado del formato en que queremos escribir. En este caso .parquet, pero podría haber sido .csv
store_monthly_sales_df.write.parquet('/content/gold/store_monthly_sales')

# <center> Resumen </center>

### <center> En esta notebook, se utilizó la arquitectura Medallion para realizar un ETL completo.

### <center>Con este enfoque de capas de almacenamiento y procesamiento, se logró mantener una estructura ordenada y escalable para el flujo de datos.</center>