In [1]:
import pandas as pd
import numpy as np

In [2]:
pd.set_option("display.max_rows", 100)
pd.set_option("display.max_columns", None)
pd.set_option("display.width", 120)
pd.set_option("display.float_format", "{:.2f}".format)
pd.set_option("display.precision", 3)
pd.set_option("display.max_colwidth", None)


## Leitura e primeiras colunas deletadas

In [3]:
entradas = pd.read_csv(r"ENTRADASEBZMORUMBI2025 - Entradas.csv")
entradas = entradas.dropna(subset=("Colaborador"))
entradas.drop(columns=["Unnamed: 49", "Unnamed: 50"], inplace=True)

In [4]:
entradas.columns = [col.lower() for col in entradas.columns]
entradas.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1217 entries, 0 to 1216
Data columns (total 52 columns):
 #   Column                         Non-Null Count  Dtype  
---  ------                         --------------  -----  
 0   data                           1217 non-null   object 
 1   id                             1217 non-null   float64
 2   colaborador                    1217 non-null   object 
 3   cliente                        1217 non-null   object 
 4   servico                        1217 non-null   object 
 5   valordoservico                 1217 non-null   object 
 6   clientenovo                    1217 non-null   object 
 7   local                          1217 non-null   object 
 8   produto                        1217 non-null   object 
 9   quantidadeproduto              1217 non-null   object 
 10  valortotaldosproduto           1217 non-null   object 
 11  auxvalortotalcompraprodutos    1217 non-null   object 
 12  auxdescontoprodutos            1217 non-null   object

### Definindo classe e funções de limpeza

In [5]:
from typing import Dict, Union
import pandas as pd
import numpy as np


class DataCleaner:
    def __init__(self, df: pd.DataFrame):
        self.df = df.copy()

    def to_date(self, date_column: str, date_format: str = "%d/%m/%Y") -> "DataCleaner":
        """Convert a column to datetime."""
        if date_column in self.df.columns:
            self.df[date_column] = pd.to_datetime(
                self.df[date_column], format=date_format, errors="coerce"
            )
        return self

    def normalize_dollar(self, dollar_column: str) -> "DataCleaner":
        """Normalize dollar values: remove currency symbols, convert to float."""
        if dollar_column in self.df.columns:
            self.df[dollar_column] = (
                self.df[dollar_column]
                .astype(str)
                .str.replace("R$", "")
                .str.replace(",", ".")
                .str.replace(" ", "")
                .str.replace("0.0.0", "0")
            )
            self.df[dollar_column] = pd.to_numeric(
                self.df[dollar_column], errors="coerce"
            )
        return self

    def to_boolean(self, column: str) -> "DataCleaner":
        """Convert 'SIM'/'NÃO' values to boolean."""
        if column in self.df.columns:
            self.df[column] = (
                self.df[column].map({"NÃO": False, "SIM": True}).astype("boolean")
            )
        return self

    def to_null(self, column: str, null_val: str = "VAZIO") -> "DataCleaner":
        """Convert specific string values to NaN."""
        if column in self.df.columns:
            self.df[column] = self.df[column].replace(null_val, np.nan)
        return self

    def to_its_own_dimension(
        self, key: str, value: str, id_column: str = "id"
    ) -> pd.DataFrame:
        """Extract a normalized dimension table from the current DataFrame."""
        if all(col in self.df.columns for col in [id_column, key, value]):
            new_df = (
                self.df[[id_column, key, value]]
                .drop_duplicates()
                .rename(columns={key: "produtos", value: "quantidade"})
                .assign(type=key)
            )
            return new_df
        else:
            raise ValueError("nem todas ou alguma coluna não presente")

    def get_data(self) -> pd.DataFrame:
        """Return the cleaned DataFrame."""
        return self.df.copy()

In [6]:
class feature_engineering(pd.DataFrame):

    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)

    def index_values(self, column: str) -> "feature_engineering":
        """
        Creates a mapping of unique values in a column to unique indices.

        Args:
            column (str): The column to index.

        Returns:
            feature_engineering: The updated DataFrame with an indexed column.
        """
        unique_values = self[column].unique()
        dicionario = {value: key for key, value in enumerate(unique_values)}
        self[f"id_{column}"] = self[column].map(dicionario)
        return self

## Limpando dados

In [7]:
entradas = DataCleaner(entradas)
entradas = entradas.to_date("data")

In [8]:
entradas.df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1217 entries, 0 to 1216
Data columns (total 52 columns):
 #   Column                         Non-Null Count  Dtype         
---  ------                         --------------  -----         
 0   data                           1217 non-null   datetime64[ns]
 1   id                             1217 non-null   float64       
 2   colaborador                    1217 non-null   object        
 3   cliente                        1217 non-null   object        
 4   servico                        1217 non-null   object        
 5   valordoservico                 1217 non-null   object        
 6   clientenovo                    1217 non-null   object        
 7   local                          1217 non-null   object        
 8   produto                        1217 non-null   object        
 9   quantidadeproduto              1217 non-null   object        
 10  valortotaldosproduto           1217 non-null   object        
 11  auxvalortotalcomprapro

In [9]:
colunas_valor = (
    [col for col in entradas.df.columns if col.startswith("valor")]
    + [
        "taxamaquina",
        "total(s+p)",
        "total(s+p)*t",
        "total[(s+p)-lp]*t",
        "totals+lp-col",
        "totalcolaborador",
        "colaborador50%",
    ]
    + [col for col in entradas.df.columns if col.startswith("auxvalor")]
    + [col for col in entradas.df.columns if col.startswith("auxdesconto")]
)
for col in colunas_valor:
    entradas = entradas.normalize_dollar(col)

In [10]:
entradas = entradas.to_boolean("clientenovo")

colunas = ["produto", "doces", "salgados", "bebidas"]
for col in colunas:
    entradas = entradas.to_null(col)

In [11]:
np.where(
    (entradas.df["produto"] == "VAZIO")
    | (entradas.df["doces"] == "VAZIO")
    | (entradas.df["salgados"] == "VAZIO")
    | (entradas.df["bebidas"] == "VAZIO"),
    1,
    0,
).sum()

np.int64(0)

## Criando dimensão de produto

In [12]:
produtos = pd.read_csv(r"ENTRADASEBZMORUMBI2025 - Produtos.csv")
produtos.columns = [col.lower() for col in produtos.columns]

In [13]:
dim_servico = produtos[["serviço", "valor do serviço"]].copy()

dim_produto = produtos[["produto", "valor de venda", "valor de compra", "lucro"]].copy()



dim_doce = produtos[["doce", "valor de venda.1", "valor de compra.1", "lucro.1"]].copy()


dim_salgado = produtos[
    ["salgado", "valor de venda.2", "valor de compra.2", "lucro.2"]
].copy()



dim_bebida = produtos[
    ["bebida", "valor de venda.3", "valor de compra.3", "lucro.3"]
].copy()



dim_funcionario = produtos[["funcionario", "porcentagem"]].copy()


dim_taxa = produtos[["funcao", "taxa"]].copy()

In [14]:
dimensoes_produtos = [dim_produto, dim_doce, dim_salgado, dim_bebida]

In [15]:
dim_servico.loc[:, ("tipo")] = "servico"
dim_produto.loc[:, ("tipo")] = "barbearia"
dim_doce.loc[:, ("tipo")] = "doce"
dim_salgado.loc[:, ("tipo")] = "salgado"
dim_bebida.loc[:, ("tipo")] = "bebida"
dim_funcionario.loc[:, ("tipo")] = "funcionario"
dim_taxa.loc[:, ("tipo")] = "taxa"

In [16]:
dataframes = [
    dim_servico,
    dim_produto,
    dim_doce,
    dim_salgado,
    dim_bebida,
    dim_funcionario,
    dim_taxa,
]
for data in dataframes:
    data.dropna(inplace=True)

In [17]:
novas_colunas = ["produto", "vl_venda", "vl_compra", "lucro", "tipo"]

dim_produto.columns = novas_colunas
dim_bebida.columns = novas_colunas
dim_doce.columns = novas_colunas
dim_salgado.columns = novas_colunas

dim_servico.rename(columns={"serviço": "servico", "valor do serviço": "vl_venda"})

Unnamed: 0,servico,vl_venda,tipo
0,Corte,"R$ 60,00",servico
1,Barba,"R$ 50,00",servico
2,Perfil,"R$ 20,00",servico
3,Sobrancelha,"R$ 20,00",servico
4,Higienização Facial,"R$ 50,00",servico
...,...,...,...
110,Corte+Barba+Sobrancelha+Platinado,"R$ 360,00",servico
111,Corte+Sobrancelha+Platinado,"R$ 320,00",servico
112,Perfil+Barba+Sobrancelha,"R$ 90,00",servico
113,Corte+Higienização Facial+Alisamento Sódio,"R$ 170,00",servico


In [18]:
dataframes = [dim_produto, dim_bebida, dim_doce, dim_salgado]

for df in dataframes:
    assert list(df.columns) == novas_colunas, "colunas não estão iguais"

In [19]:
dim_produto = DataCleaner(dim_produto).to_null("produto", "VAZIO").df

dim_salgado = DataCleaner(dim_salgado).to_null("produto", "VAZIO").df
dim_doce = DataCleaner(dim_doce).to_null("produto", "VAZIO").df
dim_bebida = DataCleaner(dim_bebida).to_null("produto", "VAZIO").df

In [20]:
dataframes = [dim_produto, dim_bebida, dim_doce, dim_salgado]

for data in dataframes:
    count = data["produto"].value_counts().get("VAZIO", 0)
    print(count)

0
0
0
0


In [21]:
def quick_clean(data):
    if isinstance(data, pd.DataFrame):
        data_cleaner = DataCleaner(data)
    elif isinstance(data, DataCleaner):
        data_cleaner = data
    else:
        raise TypeError("data precisa ser um DataFrame ou um DataCleaner")
    return (
        data_cleaner.normalize_dollar("vl_compra")
        .normalize_dollar("vl_venda")
        .normalize_dollar("lucro")
        .df
    )


dim_produto = quick_clean(dim_produto)
dim_salgado = quick_clean(dim_salgado)
dim_doce = quick_clean(dim_doce)
dim_bebida = quick_clean(dim_bebida)

In [22]:
dataframes = [
    dim_servico,
    dim_produto,
    dim_doce,
    dim_salgado,
    dim_bebida,
    dim_funcionario,
    dim_taxa,
]

for df in dataframes:
    df.dropna(inplace=True)

In [23]:
def to_string(dataframe, coluna):

    dataframe[coluna] = dataframe[coluna].astype(str)

    return dataframe[coluna].apply(lambda x: type(x)).value_counts()

In [24]:
assert to_string(dim_produto, "produto").iloc[0] > 0
assert to_string(dim_bebida, "produto").iloc[0] > 0
assert to_string(dim_doce, "produto").iloc[0] > 0
assert to_string(dim_salgado, "produto").iloc[0] > 0

#### Sobre as tabelas produtos :

A planilha produtos possui em si dois tipos de dimensões: as dimensões de acordo com o tipo ("bebidas","salgados","doces") e a tabela "produtos" que possui a "junção" desses outros produtos mais alguns relacionados ao salão (produtos cosméticos)

In [28]:
dim_produtos_independetes = pd.concat([dim_bebida, dim_doce, dim_salgado])


produtos_merged = dim_produto.merge(
    dim_produtos_independetes, on="produto", how="left", suffixes=(None, "_y")
)

produtos_merged.head(10)

Unnamed: 0,produto,vl_venda,vl_compra,lucro,tipo,vl_venda_y,vl_compra_y,lucro_y,tipo_y
0,Balm Midtown B.URB,65.0,37.0,28.0,barbearia,,,,
1,Óleo para Barba Venice B.URB,65.0,45.0,20.0,barbearia,,,,
2,Pomada Queens B.URB,75.0,46.0,29.0,barbearia,,,,
3,Pomada Brooklyn B.URB,75.0,51.0,24.0,barbearia,,,,
4,Minoxidil Kirkland,110.0,55.0,55.0,barbearia,,,,
5,PENTE MASSAGEADOR,40.0,25.0,15.0,barbearia,,,,
6,Máscara Black Red Nek,65.0,40.3,24.7,barbearia,,,,
7,Esfoliante Facial B.URB,65.0,43.64,21.36,barbearia,,,,
8,Kit de Higienização Facial,125.0,75.0,50.0,barbearia,,,,
9,Leave-in B.URB,65.0,40.0,25.0,barbearia,,,,


In [None]:
produtos_merged = produtos_merged.drop(produtos_merged.columns[4:8], axis=1)

In [None]:
produtos_merged["tipo"] = produtos_merged["tipo_y"].apply(
    lambda x: "barbearia" if pd.isna(x) else x
)
produtos_merged.drop(column="tipo_y")
produtos_merged = feature_engineering(produtos_merged)
produtos_merged = produtos_merged.index_values("produto")
produto_map = produtos_merged.set_index("produto")["id_produto"].to_dict()

KeyError: 'tipo_y'

In [None]:
dim_p_b

In [None]:
passage: Dict[str, pd.DataFrame] = {}

for key, value in combine_columns.items():
    result_dataframe = data_cleaning.to_its_own_dimension(entradas, key, value)

result_dataframe.dropna(subset="produtos", inplace=True)
result_dataframe = result_dataframe.assign(
    produtos=result_dataframe["produtos"].str.split(",")
).explode("produtos")
result_dataframe["quantidade"] = 1
result_dataframe["produtos"].unique()
fato_produtos = result_dataframe
fato_produtos.rename(columns={"ID": "id_servico"}, inplace=True)
fato_produtos["id_servico"] = fato_produtos["id_servico"].astype(int)
fato_produtos = fato_produtos.merge(
    dim_p_b, left_on="produtos", right_on="produto", how="left"
)

#### Limpeza produtos

## Criando tabela Fato


In [None]:
tabela_fato = entradas[
    [
        "data",
        "ID",
        "Colaborador",
        "Cliente",
        "Servico",
        "ValordoServico",
        "ClienteNovo",
        "Local",
        "Produto",
        "QuantidadeProduto",
        "ValorTotaldosProduto",
        "Doces",
        "QuantidadeDoces",
        "ValorTotaldosDoces",
        "Salgados",
        "QuantidadeSalgados",
        "Bebidas",
        "QuantidadeBebidas",
        "ValorTotaldasBebidas",
        "FormadePagamento",
        "TaxaMaquina",
        "Total(S+P)",
        "Total(S+P)*T",
        "Total[(S+P)-LP]*T",
        "TotalS+LP-Col",
        "TotalColaborador",
        "Colaborador50%",
    ]
]

tabela_fato = tabela_fato.rename(
    columns={
        "data": "data",
        "ID": "id",
        "Colaborador": "colaborador",
        "Cliente": "cliente",
        "Servico": "servico",
        "ValordoServico": "vl_servico",
        "ClienteNovo": "cliente_novo",
        "Local": "fidelizado",
        "Produto": "produto",
        "QuantidadeProduto": "qtd_produto",
        "ValorTotaldosProduto": "vl_total_produto",
        "Doces": "doce",
        "QuantidadeDoces": "qtd_doce",
        "ValorTotaldosDoces": "vl_total_doces",
        "Salgados": "salgado",
        "QuantidadeSalgados": "qtd_salgado",
        "ValorTotaldosSalgados": "vl_total_salgado",
        "Bebidas": "bebida",
        "QuantidadeBebidas": "qtd_bebida",
        "ValorTotaldasBebidas": "vl_total_bebida",
        "FormadePagamento": "forma_de_pagamento",
        "TaxaMaquina": "tx_maquina",
        "Total(S+P)": "total_sp",
        "Total(S+P)*T": "total_spt",
        "Total[(S+P)-LP]*T": "total_splt",
        "TotalS+LP-Col": "total_slp_col",
        "TotalColaborador": "total_colaborador",
        "Colaborador50%": "colaborador_50",
    }
)

## Criação da tabela 

In [None]:
import mysql.connector


def create_table(cursor):
    cursor.execute(
        """
        CREATE TABLE IF NOT EXISTS tabela_fato (
            data DATETIME,
            id DOUBLE,
            colaborador TEXT,
            cliente TEXT,
            servico TEXT,
            vl_servico DOUBLE,
            cliente_novo BOOLEAN,
            fidelizado TEXT,
            produto TEXT,
            qtd_produto TEXT,
            vl_total_produto DOUBLE,
            doce TEXT,
            qtd_doce DOUBLE,
            vl_total_doces DOUBLE,
            salgado TEXT,
            qtd_salgado DOUBLE,
            bebida TEXT,
            qtd_bebida TEXT,
            vl_total_bebida DOUBLE,
            forma_de_pagamento TEXT,
            tx_maquina DOUBLE,
            total_sp DOUBLE,
            total_spt DOUBLE,
            total_splt DOUBLE,
            total_slp_col DOUBLE,
            total_colaborador DOUBLE,
            colaborador_50 DOUBLE
        )
    """
    )


def insert_data(cursor, entradas):
    for _, row in entradas.iterrows():
        cursor.execute(
            """
            INSERT INTO tabela_fato VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """,
            tuple(row),
        )


def main(entradas):
    conn = mysql.connector.connect(
        host="database-atinova.ct6oomqu6y49.sa-east-1.rds.amazonaws.com",
        user="integrantes",
        password="grupoPI2025",
        database="barbearia",  # Substituir pelo nome correto do banco de dados
        port=3306,
    )
    cursor = conn.cursor()

    create_table(cursor)

    entradas.fillna("", inplace=True)  # Substituir NaN por strings vazias
    insert_data(cursor, entradas)

    conn.commit()
    cursor.close()
    conn.close()


# Chame a função passando seu DataFrame diretamente
main(tabela_fato)