In [None]:
pip install pyspark py4j


Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=6a88c54c3ce760053fd9a0316330375a318787eb32c477f0e1ade656cad6eea4
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [None]:
from pyspark.sql import SparkSession

#Crear sesion de Spark
spark = SparkSession.builder\
        .master("local[*]")\
        .appName('nubi_chall')\
        .getOrCreate()


In [43]:
import os
from os import truncate
from google.colab import drive
from pyspark.sql.types import  StructField, DoubleType, IntegerType, StringType, StructType, LongType
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, sum, count, rank, lit, year, month
from functools import reduce

#Montar Google Drive
drive.mount('/content/drive')

# Ruta base de los archivos Parquet
base_path = "/content/drive/MyDrive/compressedData/year=2024/month=07/"

#Lista de días esperados del 1 al 7
diasSemana = [f'day=2024070{dia}' for dia in range(1, 8)]

#Lista de carpetas presentes en la ruta
diasEncontrados = [folder for folder in os.listdir(base_path) if folder.startswith('day=')]

#Comparar días esperados con los que existen
diasFaltantes = [dia for dia in diasSemana if dia not in diasEncontrados]

#Mostrar los resultados
if diasFaltantes:
    print(f"Faltan los siguientes días: {diasFaltantes}")
else:
    print("No faltan días.")

# Lista para almacenar los DataFrames
dfs = []

# Iterar sobre los días y leer los archivos Parquet
for day in diasEncontrados:
    file_path = os.path.join(base_path, day)

    # Leer el archivo Parquet
    df = spark.read.parquet(file_path)

    # Imprimir el esquema del DataFrame para verificar las columnas
    df.printSchema()

    for column in ['category', 'currency', 'id', 'price', 'sales', 'sellerId', 'title']:
        if column not in df.columns:
            df = df.withColumn(column, lit(None).cast(StringType()))

    dfs.append(df)

#Unimos todos los DataFrames en uno solo usando unionByName (si es necesario)
df_final = reduce(lambda x, y: x.unionByName(y), dfs)
df_final.show()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Faltan los siguientes días: ['day=20240704']
root
 |-- category: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- id: string (nullable = true)
 |-- price: long (nullable = true)
 |-- sales: long (nullable = true)
 |-- sellerId: integer (nullable = true)

root
 |-- currency: string (nullable = true)
 |-- id: string (nullable = true)
 |-- price: long (nullable = true)
 |-- sales: long (nullable = true)
 |-- sellerId: integer (nullable = true)

root
 |-- category: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- id: string (nullable = true)
 |-- price: long (nullable = true)
 |-- sales: long (nullable = true)
 |-- title: string (nullable = true)
 |-- sellerId: integer (nullable = true)

root
 |-- category: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- id: string (nullable = true)
 |-- price: doubl

In [25]:

#2. Agregar una columna al dataframe leído en el punto 1 que calcule la facturación de cada venta (cada fila es una venta).
df_final = df_final.withColumn("GMV", when(col("sales").isNull() | col("price").isNull(), 0)
                                    .otherwise(col("sales") * col("price")))

df_final.show(30)
df_final.printSchema()

+-----------+--------+---------+-------+-----+---------+--------------------+-------+
|   category|currency|       id|  price|sales| sellerId|               title|    GMV|
+-----------+--------+---------+-------+-----+---------+--------------------+-------+
|Electronics|     ARS|MLA123456|  160.0|    5|999888777|                NULL|  800.0|
|Electronics|     ARS|MLA654321|   60.0|    1|999888777|                NULL|   60.0|
|Electronics|     ARS|MLA333333|   50.0|   16|444555666|                NULL|  800.0|
|Electronics|     ARS|MLA121212|   10.0|    6|444555666|                NULL|   60.0|
|      Tools|     ARS|MLA999000|  150.0|    2|999888777|                NULL|  300.0|
|      Shoes|     ARS|MLA991199|   90.0|    2|999888777|                NULL|  180.0|
|      Shoes|     ARS|MLA991188|   60.0|    3|999888777|                NULL|  180.0|
|      Shoes|     ARS|MLA555555|   30.0|    5|444555666|                NULL|  150.0|
|      Shoes|     ARS|MLA676767|   20.0|    8|44455566

In [26]:
#3.

#a- Calcular la facturación total de cada seller.
dfFacturacionTotal = df_final.groupBy("sellerId") \
                            .agg(sum("GMV").alias("total_revenue"))

dfFacturacionTotal.show()

#b- Unidades vendidas por item
dfUnidadesVendidas = df_final.groupBy("id") \
                        .agg(sum("sales").alias("total_units_sold"))

dfUnidadesVendidas.show()

+---------+-------------+
| sellerId|total_revenue|
+---------+-------------+
|999888777|     126401.0|
|444555666|       3924.0|
|111222333|       9075.0|
+---------+-------------+

+---------+----------------+
|       id|total_units_sold|
+---------+----------------+
|MLA333333|              36|
|MLA123456|              10|
|MLA991199|               2|
|MLA991188|               3|
|MLA654321|               2|
|MLA121212|               6|
|MLA999000|               2|
|MLA111111|             121|
|MLA555555|               5|
|MLA676767|               8|
|MLA333444|               3|
|MLA888999|              11|
|MLA777000|              11|
|MLA222333|              21|
|MLA222111|              16|
+---------+----------------+



In [None]:
#4. Asignar un ranking a cada vendedor según los totales calculados en el punto 3 a. Cada vendedor debe obtener una posición
#  en el ranking según el total facturado, siendo la posición 1 la de mayor facturación. Guardar el ranking en formato de un único CSV

dfFacturacionTotal_ranked = dfFacturacionTotal.withColumn(
    "rank",
    rank().over(Window.orderBy(col("total_revenue").desc()))  #Ordena las filas dentro de la ventana por la columna "total_revenue" de mayor a menor
)

#Filtramos y seleccionamos solamente las columnas "sellerId" y "rank" para ver resultados
dfRankingVentas = dfFacturacionTotal_ranked.select("sellerId", "rank")

#Se guarda ranking en archivo CSV, tambien estuvo en mente la descarga a maquina local del mismo pero pensandolo a gran escala no es muy optimizada la solucion.
dfRankingVentas.coalesce(1).write.csv("dfRankingVentas.csv", header=True)

dfRankingVentas.show()

In [None]:
def extraigoRuta(path):
    # Ajusta la expresión regular según el formato de tus nombres de carpeta
    pattern = r"year=(\d{4})/month=(\d{2})/day=(\d{8})/sellerId=(\d+)"
    match = re.search(pattern, path)
    if match:
        return int(match.group(1)), int(match.group(2)), int(match.group(4))
    else:
        return None, None, None

# Lista para almacenar los DataFrames
dfs = []

# Iterar sobre las carpetas y leer los archivos Parquet
for root, dirs, files in os.walk(base_path):
    for file in files:
        if file.endswith(".parquet"):
            file_path = os.path.join(root, file)
            df = spark.read.parquet(file_path)

            # Extraer año, mes y sellerId de la ruta
            year, month, seller_id = extraigoRuta(file_path)
            df = df.withColumn("year", lit(year)) \
                   .withColumn("month", lit(month)) \
                   .withColumn("sellerId", lit(seller_id))

            # Add missing columns with null values
            for column in ['category', 'currency', 'id', 'price', 'sales', 'title']:
                if column not in df.columns:
                    df = df.withColumn(column, lit(None).cast(StringType()))
            dfs.append(df)

# Unir todos los DataFrames en uno solo usando unionByName (si es necesario)
df_final = reduce(lambda x, y: x.unionByName(y), dfs)

df_final.write.partitionBy("year", "month", "sellerId").parquet("combined_data.parquet")


In [40]:
#Code cell adicional para leer y mostrar archivo parquet particionado por año, mes, seller.
dfParquet= spark.read.parquet("/content/combined_data.parquet")

#Ordenamos por sellerId de mayor a menor
dfParquetOrdenado = dfParquet.orderBy(col("sellerId").desc())

dfParquetOrdenado.show(30)

+-----------+--------+---------+-------+-----+--------------------+----+-----+---------+
|   category|currency|       id|  price|sales|               title|year|month| sellerId|
+-----------+--------+---------+-------+-----+--------------------+----+-----+---------+
|      Shoes|     ARS|MLA777000| 9001.0|    1|Nike jordan super...|2024|    7|999888777|
|       NULL|     ARS|MLA123456|  150.0|    2|                NULL|2024|    7|999888777|
|Electronics|     ARS|MLA888999| 1009.0|    7|samsun phone 100%...|2024|    7|999888777|
|       NULL|     ARS|MLA654321|   30.0|    1|                NULL|2024|    7|999888777|
|      Shoes|     ARS|MLA777000| 9001.0|    8|Nike jordan super...|2024|    7|999888777|
|       NULL|     ARS|MLA123456|  150.0|    3|                NULL|2024|    7|999888777|
|Electronics|     ARS|MLA888999| 1009.0|    3|samsun phone 100%...|2024|    7|999888777|
|      Shoes|     ARS|MLA777000| 9001.0|    2|Nike jordan super...|2024|    7|999888777|
|Electronics|     ARS

In [None]:
!pip install azure-data-tables
!pip install azure-storage-blob
!pip install python-dotenv #configurar variable de entorno



In [None]:
from azure.data.tables import TableServiceClient
import os
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv()) # read local .env file

# Conexión a la cuenta de Azure Table
#The connection string was modified to use double curly braces to properly format the f-string and access the environment variable.
connection_string = f"DefaultEndpointsProtocol=https;AccountName={os.getenv('AZURE_STORAGE_ACCOUNT')};AccountKey={os.getenv('AZURE_STORAGE_KEY')};EndpointSuffix=core.windows.net"
table_service_client = TableServiceClient.from_connection_string(conn_str=connection_string)

# Crear cliente para la tabla
table_client = table_service_client.get_table_client(table_name="FacturacionVentas")

# Subir cada fila de la tabla
for row in df_final.collect():
    entity = {
        'PartitionKey': str(row['sellerId']),
        'RowKey': str(row['id']),
        'price': row['price'],
        'sales': row['sales'],
        'currency': row['currency'],
        'category': row['category'],
        'title': row['title'],
        'GMV': row['GMV']
    }
    try:
        # Intenta crear la entidad
        table_client.create_entity(entity=entity)
    except ResourceExistsError:
        # Maneja la excepción si la entidad ya existe
        print(f"Entity with PartitionKey '{entity['PartitionKey']}' and RowKey '{entity['RowKey']}' already exists.")
        # Puedes optar por actualizar la entidad existente o simplemente ignorar el error.
        # Para actualizar la entidad:
        # table_client.update_entity(entity=entity)

        # Consultar todas las entidades (esto puede ser un gran volumen de datos)
# Se agrega un filtro para que la consulta retorne todas las entidades
entities = table_client.query_entities(query_filter="")

for entity in entities:
    print(entity)


# Consultar entidades con un PartitionKey específico
entities = table_client.query_entities(query_filter="PartitionKey eq '999888777'")

for entity in entities:
    print(entity)

# Consultar entidades con un filtro más complejo
entities = table_client.query_entities(query_filter="PartitionKey eq '999888777' and price gt 10")

for entity in entities:
    print(entity)


Entity with PartitionKey '999888777' and RowKey 'MLA123456' already exists.
Entity with PartitionKey '999888777' and RowKey 'MLA654321' already exists.
Entity with PartitionKey '444555666' and RowKey 'MLA333333' already exists.
Entity with PartitionKey '444555666' and RowKey 'MLA333444' already exists.
Entity with PartitionKey '999888777' and RowKey 'MLA123456' already exists.
Entity with PartitionKey '999888777' and RowKey 'MLA654321' already exists.
Entity with PartitionKey '444555666' and RowKey 'MLA333333' already exists.
Entity with PartitionKey '444555666' and RowKey 'MLA121212' already exists.
Entity with PartitionKey '999888777' and RowKey 'MLA999000' already exists.
Entity with PartitionKey '999888777' and RowKey 'MLA991199' already exists.
Entity with PartitionKey '999888777' and RowKey 'MLA991188' already exists.
Entity with PartitionKey '444555666' and RowKey 'MLA555555' already exists.
Entity with PartitionKey '444555666' and RowKey 'MLA676767' already exists.
Entity with 