In [8]:
import os
import boto3
import os
import pandas as pd
from io import StringIO



Camada *Standardized Zone* 

In [9]:

bucket_name = 'datalake-dadosfera'  
file_key = 'landingzone/car_sales_data.csv'  

s3 = boto3.client('s3')

csv_obj = s3.get_object(Bucket=bucket_name, Key=file_key)
body = csv_obj['Body']
csv_string = body.read().decode('utf-8')

# Converter o CSV em um DataFrame do pandas
csv_data = StringIO(csv_string)
df = pd.read_csv(csv_data)

# renomear as colunas
df.rename(columns={'Date': 'Data_Venda', 'Salesperson': 'Vendedor_Responsavel', 'Customer Name': 'Nome_Cliente',
                    'Car Make': 'Marca_Veiculo', 'Car Model':'Modelo_Veiculo', 'Car Year': 'Ano_Fabricacao', 'Sale Price':'Valor_Venda',
                     'Commission Rate': 'Taxa_Comissao', 'Commission Earned': 'Comissao_Recebida' }, inplace=True)

# Converter a coluna 'Data' para datetime
df['Ano_Fabricacao'] = pd.to_datetime(df['Ano_Fabricacao'], format='%Y')  
# arredondar a coluna 'Taxa Comissao' para 2 casas decimais
df['Taxa_Comissao'] = df['Taxa_Comissao'].round(2)

print(df.head())


   Data_Venda Vendedor_Responsavel    Nome_Cliente Marca_Veiculo  \
0  2022-08-01      Monica Moore MD     Mary Butler        Nissan   
1  2023-03-15         Roberto Rose  Richard Pierce        Nissan   
2  2023-04-29         Ashley Ramos    Sandra Moore          Ford   
3  2022-09-04       Patrick Harris    Johnny Scott          Ford   
4  2022-06-16           Eric Lopez   Vanessa Jones         Honda   

  Modelo_Veiculo Ano_Fabricacao  Valor_Venda  Taxa_Comissao  Comissao_Recebida  
0         Altima     2018-01-01        15983           0.07            1126.73  
1          F-150     2016-01-01        38474           0.13            5172.40  
2          Civic     2016-01-01        33340           0.11            3818.63  
3         Altima     2013-01-01        41937           0.09            3866.20  
4      Silverado     2022-01-01        20256           0.11            2298.85  


Camada *Staging Zone* 


In [10]:
bucket_name = 'datalake-dadosfera'
file_key = 'standardizedzone/car_sales_data_standardized.csv'

csv_buffer = StringIO()
s3 = boto3.client('s3')

csv_obj = s3.get_object(Bucket=bucket_name, Key=file_key)
body = csv_obj['Body']
csv_string = body.read().decode('utf-8')

# Converter o CSV em um DataFrame do pandas
csv_data = StringIO(csv_string)
df = pd.read_csv(csv_data)


# Remover linhas onde a data de fabricação é posterior à data de venda
df = df[df['Ano_Fabricacao'] <= df['Data_Venda']]
# Remover linhas onde o valor da venda é menor ou igual a zero
df = df[df['Valor_Venda'] > 0]

# Remover espaços extras e padronizar para maiúsculas
df['Vendedor_Responsavel'] = df['Vendedor_Responsavel'].str.strip().str.upper()
df['Marca_Veiculo'] = df['Marca_Veiculo'].str.strip().str.upper()

# Remover linhas onde a data de fabricação é posterior à data de venda
df = df[df['Ano_Fabricacao'] <= df['Data_Venda']]



#print(df.head())

Número de linhas: 150000
Número de linhas: 150000


Camada *Curated Zone*
- Agregação de Vendas por Vendedor
- Média do Valor de Vendas por Mês
- Tabelas Fato e Dimensão
- Relatório Final com Métricas

In [None]:
bucket_name = 'datalake-dadosfera'
file_key = 'stagingzone/car_sales_data_staging.csv'

csv_buffer = StringIO()
s3 = boto3.client('s3')

csv_obj = s3.get_object(Bucket=bucket_name, Key=file_key)
body = csv_obj['Body']
csv_string = body.read().decode('utf-8')

csv_data = StringIO(csv_string)
df = pd.read_csv(csv_data)

#Agregação de Vendas por Vendedor
vendas_por_vendedor = df.groupby('Vendedor_Responsavel').agg(
    total_vendas=('Valor_Venda', 'sum'),
    total_comissao=('Comissao_Recebida', 'sum'),
    carros_vendidos=('Valor_Venda', 'count')
).reset_index()

#print(vendas_por_vendedor)






In [6]:
# Média do Valor de Vendas por Mês


df['Data_Venda'] = pd.to_datetime(df['Data_Venda'])
# Extraindo o mês e ano da data de venda
df['AnoMes'] = df['Data_Venda'].dt.to_period('M')

# Agrupando por mês e calculando a média de vendas
media_vendas_por_mes = df.groupby('AnoMes').agg(
    media_venda=('Valor_Venda', 'mean')
).reset_index()

print(media_vendas_por_mes)

     AnoMes   media_venda
0   2022-05  30042.724847
1   2022-06  30123.419114
2   2022-07  29955.084184
3   2022-08  30257.891734
4   2022-09  29889.935961
5   2022-10  29948.123452
6   2022-11  29952.107961
7   2022-12  30124.185752
8   2023-01  29991.546258
9   2023-02  30077.281131
10  2023-03  30022.046139
11  2023-04  30074.486109
12  2023-05  30018.805556


In [7]:
# Criando a tabela fato (Fato_Vendas)
fato_vendas = df[['Data_Venda', 'Vendedor_Responsavel', 'Valor_Venda', 'Comissao_Recebida']]

# Criando a tabela dimensão (Dim_Vendedor)
dim_vendedor = df[['Vendedor_Responsavel']].drop_duplicates().reset_index(drop=True)

print("Fato Vendas:")
print(fato_vendas)

print("\nDimensão Vendedor:")
print(dim_vendedor)


Fato Vendas:
       Data_Venda Vendedor_Responsavel  Valor_Venda  Comissao_Recebida
0      2022-08-01      MONICA MOORE MD        15983            1126.73
1      2023-03-15         ROBERTO ROSE        38474            5172.40
2      2023-04-29         ASHLEY RAMOS        33340            3818.63
3      2022-09-04       PATRICK HARRIS        41937            3866.20
4      2022-06-16           ERIC LOPEZ        20256            2298.85
...           ...                  ...          ...                ...
149995 2022-09-05     ALYSSA RODRIGUEZ        15294            1966.16
149996 2023-04-10     MATTHEW PETERSON        12407            1165.06
149997 2022-11-27      STEVEN MARSHALL        30034            2225.00
149998 2023-02-28          APRIL RAMOS        47666            5716.69
149999 2023-02-04       KRISTEN MOONEY        17369            1431.25

[150000 rows x 4 columns]

Dimensão Vendedor:
      Vendedor_Responsavel
0          MONICA MOORE MD
1             ROBERTO ROSE
2      

In [8]:
# Criando o relatório final
relatorio_vendedores = df.groupby('Vendedor_Responsavel').agg(
    total_vendas=('Valor_Venda', 'sum'),
    media_comissao=('Comissao_Recebida', 'mean'),
    carros_vendidos=('Valor_Venda', 'count')
).reset_index()

# Ordenando pelo total de vendas
relatorio_vendedores = relatorio_vendedores.sort_values(by='total_vendas', ascending=False)

print(relatorio_vendedores)


      Vendedor_Responsavel  total_vendas  media_comissao  carros_vendidos
66335        MICHAEL SMITH       2526736     3134.717805               82
65978      MICHAEL JOHNSON       2012029     2984.610484               62
65982        MICHAEL JONES       1755948     3417.822593               54
38691          JAMES SMITH       1658765     3179.357037               54
78684         ROBERT SMITH       1565037     3107.586122               49
...                    ...           ...             ...              ...
17463           CHLOE POPE         10001      961.890000                1
21921   CYNTHIA WILLIAMSON         10001     1289.930000                1
60743   MARCUS CHRISTENSEN         10000      980.270000                1
17466       CHLOE ROBINSON         10000     1357.090000                1
41015       JEFFREY HODGES         10000      956.450000                1

[97136 rows x 4 columns]


In [12]:
    #Agregação de Vendas por Marca
vendas_por_marca = df.groupby('Marca_Veiculo').agg(
        total_vendas=('Valor_Venda', 'sum'),
        carros_vendidos=('Valor_Venda', 'count')
    ).reset_index()

print(vendas_por_marca)

  Marca_Veiculo  total_vendas  carros_vendidos
0     CHEVROLET     901038237            30009
1          FORD     905339736            30122
2         HONDA     894982326            29864
3        NISSAN     901418652            30008
4        TOYOTA     902963173            29997


In [None]:

# Função para baixar dados da Curated Zone
def download_data_from_s3(**kwargs):
    bucket_name = 'datalake-dadosfera'
    file_keys = [
        'curatedzone/dim_vendedor.csv',
        'curatedzone/fato_vendas.csv',
        'curatedzone/venda_total.csv',
        'curatedzone/vendas_por_marca.csv',
        'curatedzone/vendas_por_vendedor.csv'
    ]
    
    # Cria diretório temporário se não existir
    os.makedirs('/tmp/curatedzone', exist_ok=True)
    
    for file_key in file_keys:
        file_name = file_key.split('/')[-1]  # Extrai o nome do arquivo
        local_path = f'/tmp/curatedzone/{file_name}'  # Caminho local para salvar
        s3.download_file(bucket_name, file_key, local_path)

# Função para rodar validação com Great Expectations
def run_data_validation(**kwargs):
    context = ge.data_context.DataContext()
    
    # Valida cada arquivo baixado
    file_paths = [
        '/tmp/curatedzone/dim_vendedor.csv',
        '/tmp/curatedzone/fato_vendas.csv',
        '/tmp/curatedzone/venda_total.csv',
        '/tmp/curatedzone/vendas_por_marca.csv',
        '/tmp/curatedzone/vendas_por_vendedor.csv'
    ]
    
    for file_path in file_paths:
        df = ge.read_csv(file_path)
        
        # Ajuste o nome do suite de expectativas conforme necessário
        result = df.validate(expectation_suite_name='expectations.default')
        
        # Salva ou exibe os resultados (ajustar para salvar no S3, se necessário)
        context.build_data_docs()
        context.open_data_docs()