In [1]:
from produto import Produto
from caixa import Caixa
from estoque import Estoque
from onda import Onda

from pyspark.sql import SparkSession
from pyspark import SparkConf

# Configuração inicial do PySpark
conf = SparkConf()
conf.setAppName("MeuAplicativoPySpark")
conf.setMaster("local[*]")  # Usar todos os núcleos locais para execução

# Criação da SparkSession
spark = SparkSession.builder \
    .config(conf=conf) \
    .getOrCreate()

In [2]:
# Verificar se a sessão Spark está funcionando
print("SparkSession Criada!")
spark.sparkContext.setLogLevel("WARN")  # Define o nível de log para evitar muitas mensagens de depuração

# Leitura de dois arquivos CSV e salvando em variáveis
caixas_df = spark.read.csv("../data/caixas.csv", header=True, inferSchema=True)
estoque_df = spark.read.csv("../data/estoque.csv", header=True, inferSchema=True)

# Mostrando o conteúdo dos DataFrames
caixas_df.show(1)
estoque_df.show(1)


SparkSession Criada!
+-------+--------+-----+-------------+-----+
|ONDA_ID|CAIXA_ID|PECAS|  CLASSE_ONDA|  SKU|
+-------+--------+-----+-------------+-----+
|      4|      12|    1|CLASSE_ONDA_1|SKU_1|
+-------+--------+-----+-------------+-----+
only showing top 1 row

+-----+--------+---------+-----+
|ANDAR|CORREDOR|      SKU|PECAS|
+-----+--------+---------+-----+
|    0|       2|SKU_17028|  193|
+-----+--------+---------+-----+
only showing top 1 row



In [3]:
# Criando uma visão temporária para utilizar SQL puro
caixas_df.createOrReplaceTempView("caixas")
estoque_df.createOrReplaceTempView("estoque")

In [4]:

# Agregando as quantidades de cada SKU por classe de onda e caixa
agregado_caixa = spark.sql("""
    SELECT
        CLASSE_ONDA,
        CAIXA_ID,
        SKU,
        PECAS
    FROM caixas

""")
agregado_caixa.createOrReplaceTempView("agregado_caixa")

# Agregando as quantidades de cada SKU por andar
agregado_estoque = spark.sql("""
    SELECT
        ANDAR,
        SKU,
        SUM(PECAS) AS TOTAL_PECAS_DISPONIVEIS
    FROM estoque
    GROUP BY
        ANDAR, SKU
""")
agregado_estoque.createOrReplaceTempView("agregado_estoque")

# Verificando quantas peças são necessárias em cada caixa e quantas temos no andar
dados_necessarios_disponiveis = spark.sql("""
    SELECT
        ac.CLASSE_ONDA,
        ac.CAIXA_ID,
        ae.ANDAR,
        ac.SKU,
        ac.PECAS AS TOTAL_PECAS_NECESSARIAS,
        ae.TOTAL_PECAS_DISPONIVEIS,
        (ae.TOTAL_PECAS_DISPONIVEIS - ac.PECAS) AS DIFF
        
    FROM agregado_caixa ac
    LEFT JOIN agregado_estoque ae
    ON ac.SKU = ae.SKU
    ORDER BY
        ac.CLASSE_ONDA, ac.CAIXA_ID, ae.ANDAR
""")
dados_necessarios_disponiveis.show()

+-------------+--------+-----+------+-----------------------+-----------------------+----+
|  CLASSE_ONDA|CAIXA_ID|ANDAR|   SKU|TOTAL_PECAS_NECESSARIAS|TOTAL_PECAS_DISPONIVEIS|DIFF|
+-------------+--------+-----+------+-----------------------+-----------------------+----+
|CLASSE_ONDA_1|      12|    0| SKU_2|                      1|                     84|  83|
|CLASSE_ONDA_1|      12|    0| SKU_3|                      1|                    852| 851|
|CLASSE_ONDA_1|      12|    0| SKU_4|                      1|                    670| 669|
|CLASSE_ONDA_1|      12|    0| SKU_6|                      4|                    233| 229|
|CLASSE_ONDA_1|      12|    0| SKU_9|                      4|                    152| 148|
|CLASSE_ONDA_1|      12|    0|SKU_11|                      1|                      1|   0|
|CLASSE_ONDA_1|      12|    0|SKU_12|                      2|                    132| 130|
|CLASSE_ONDA_1|      12|    0|SKU_14|                      8|                    271| 263|

In [5]:
# Filtrando os casos onde o DIFF > 0
caixas_easy = dados_necessarios_disponiveis.filter("DIFF >= 0")
caixas_easy.count()

23299

In [6]:
caixas_hard = dados_necessarios_disponiveis.filter("DIFF < 0")
caixas_hard.count()

218

In [7]:
caixas = {}

for row in caixas_df.collect():
    onda_id = row['ONDA_ID']
    caixa_id = row['CAIXA_ID']
    classe_onda = row['CLASSE_ONDA']
    sku = row['SKU']
    qtd = row['PECAS']
    
    # Criar ou buscar a caixa existente
    if caixa_id not in caixas:
        caixa = Caixa(caixa_id, classe_onda, onda_id)
        caixas[caixa_id] = caixa
    else:
        caixa = caixas[caixa_id]
    
    # Adicionar o produto à caixa
    produto = Produto(sku, qtd)
    caixa.add_produto(produto)

print(len(caixas))
print(caixas[12].get_total_itens())

2072
68


In [9]:
estoques = {}
for row in estoque_df.collect():

    andar = row['ANDAR']
    corredor = row['CORREDOR']
    sku = row['SKU']
    sku = int(sku.split('_')[1])
    qtd = row['PECAS']
    
    # Criar ou buscar o estoque existente
    if (andar, corredor) not in estoques:
        estoque = Estoque(andar, corredor)
        estoques[(andar, corredor)] = estoque
    
    else:
        estoque = estoques[(andar, corredor)]

    # Adicionar o produto ao estoque
    produto = Produto(sku, qtd)
    estoque.add_produto(produto)

print(len(estoques))
print(estoques[(0, 2)].get_total_itens())
        

502
1615
