In [60]:
##PROJETO DE PORTFÓLIO: DATA PIPELINE COM DADOS DE TÁXI DE NYC
#
###Objetivo###
# Construir um pipeline de dados completo que:
# 1. **Extrai** dados de corridas de táxi de NYC de um repositório público.
# 2. **Transforma** os dados brutos, limpando-os e modelando-os em um esquema Estrela (Star Schema).
# 3. **Carrega** os dados modelados em um Data Lake na AWS S3 em formato Parquet.
#
### Tecnologias Utilizadas###
# - Python (com Pandas)
# - Jupyter Notebook
# - AWS S3
# - Formato Parquet

In [None]:
import pandas as pd
import os

print("Bibliotecas importadas com sucesso.")

#PARÂMETROS DO PROJETO 

#URL do arquivo Parquet com os dados de Janeiro de 2024
#https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
DATA_URL = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet'


AWS_ACCESS_KEY_ID = 'MEU_ID'
AWS_SECRET_ACCESS_KEY = 'MINHA_CHAVE'
AWS_REGION = 'XXXXXXXXXX'  
NOME_DO_BUCKET = 'XXXXXXXX' 

Bibliotecas importadas com sucesso.


In [62]:
print(f"Iniciando a extração de dados da URL: {DATA_URL}")

#O Pandas lê o arquivo Parquet diretamente da internet e o carrega em um DataFrame
df_bruto = pd.read_parquet(DATA_URL)

print("\nExtração concluída!")
print(f"Total de {len(df_bruto)} registros brutos encontrados.")

#Exibe as 5 primeiras linhas para validação
df_bruto.head()

Iniciando a extração de dados da URL: https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet

Extração concluída!
Total de 2964624 registros brutos encontrados.


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,2,2024-01-01 00:57:55,2024-01-01 01:17:43,1.0,1.72,1.0,N,186,79,2,17.7,1.0,0.5,0.0,0.0,1.0,22.7,2.5,0.0
1,1,2024-01-01 00:03:00,2024-01-01 00:09:36,1.0,1.8,1.0,N,140,236,1,10.0,3.5,0.5,3.75,0.0,1.0,18.75,2.5,0.0
2,1,2024-01-01 00:17:06,2024-01-01 00:35:01,1.0,4.7,1.0,N,236,79,1,23.3,3.5,0.5,3.0,0.0,1.0,31.3,2.5,0.0
3,1,2024-01-01 00:36:38,2024-01-01 00:44:56,1.0,1.4,1.0,N,79,211,1,10.0,3.5,0.5,2.0,0.0,1.0,17.0,2.5,0.0
4,1,2024-01-01 00:46:51,2024-01-01 00:52:57,1.0,0.8,1.0,N,211,148,1,7.9,3.5,0.5,3.2,0.0,1.0,16.1,2.5,0.0


In [63]:
#Mostra informações sobre as colunas, tipos de dados e valores nulos
print("Informações do DataFrame:")
df_bruto.info()

#Exibe estatísticas descritivas para as colunas numéricas
print("\nEstatísticas Descritivas:")
df_bruto.describe()

Informações do DataFrame:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2964624 entries, 0 to 2964623
Data columns (total 19 columns):
 #   Column                 Dtype         
---  ------                 -----         
 0   VendorID               int32         
 1   tpep_pickup_datetime   datetime64[us]
 2   tpep_dropoff_datetime  datetime64[us]
 3   passenger_count        float64       
 4   trip_distance          float64       
 5   RatecodeID             float64       
 6   store_and_fwd_flag     object        
 7   PULocationID           int32         
 8   DOLocationID           int32         
 9   payment_type           int64         
 10  fare_amount            float64       
 11  extra                  float64       
 12  mta_tax                float64       
 13  tip_amount             float64       
 14  tolls_amount           float64       
 15  improvement_surcharge  float64       
 16  total_amount           float64       
 17  congestion_surcharge   float64       
 

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
count,2964624.0,2964624,2964624,2824462.0,2964624.0,2824462.0,2964624.0,2964624.0,2964624.0,2964624.0,2964624.0,2964624.0,2964624.0,2964624.0,2964624.0,2964624.0,2824462.0,2824462.0
mean,1.754204,2024-01-17 00:46:36.431092,2024-01-17 01:02:13.208130,1.339281,3.652169,2.069359,166.0179,165.1167,1.161271,18.17506,1.451598,0.4833823,3.33587,0.5270212,0.9756319,26.8015,2.256122,0.1411611
min,1.0,2002-12-31 22:59:39,2002-12-31 23:05:41,0.0,0.0,1.0,1.0,1.0,0.0,-899.0,-7.5,-0.5,-80.0,-80.0,-1.0,-900.0,-2.5,-1.75
25%,2.0,2024-01-09 15:59:19.750000,2024-01-09 16:16:23,1.0,1.0,1.0,132.0,114.0,1.0,8.6,0.0,0.5,1.0,0.0,1.0,15.38,2.5,0.0
50%,2.0,2024-01-17 10:45:37.500000,2024-01-17 11:03:51.500000,1.0,1.68,1.0,162.0,162.0,1.0,12.8,1.0,0.5,2.7,0.0,1.0,20.1,2.5,0.0
75%,2.0,2024-01-24 18:23:52.250000,2024-01-24 18:40:29,1.0,3.11,1.0,234.0,234.0,1.0,20.5,2.5,0.5,4.12,0.0,1.0,28.56,2.5,0.0
max,6.0,2024-02-01 00:01:15,2024-02-02 13:56:52,9.0,312722.3,99.0,265.0,265.0,4.0,5000.0,14.25,4.0,428.0,115.92,1.0,5000.0,2.5,1.75
std,0.4325902,,,0.8502817,225.4626,9.823219,63.62391,69.31535,0.5808686,18.94955,1.804102,0.11776,3.896551,2.12831,0.2183645,23.38558,0.8232747,0.4876239


In [64]:
#URL para o arquivo CSV que mapeia os IDs de zona
LOOKUP_URL = 'https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv'

print(f"Baixando a tabela de tradução de zonas de: {LOOKUP_URL}")

#Carrega o arquivo de lookup em um novo DataFrame
df_zonas = pd.read_csv(LOOKUP_URL)

print("Tabela de zonas carregada com sucesso!")
print(f"Total de {len(df_zonas)} zonas encontradas.")

#Renomeia as colunas para um padrão mais limpo e claro
df_zonas.rename(columns={
    'LocationID': 'id_localizacao',
    'Borough': 'distrito',
    'Zone': 'zona',
    'service_zone': 'zona_servico'
}, inplace=True)

#Exibe as 5 primeiras linhas para verificação
df_zonas.head()

Baixando a tabela de tradução de zonas de: https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv
Tabela de zonas carregada com sucesso!
Total de 265 zonas encontradas.


Unnamed: 0,id_localizacao,distrito,zona,zona_servico
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [65]:
print("Iniciando a criação das tabelas de dimensão...")

### 1 - dim_tempo ###
#Extrai as datas de embarque e desembarque, remove duplicatas e cria uma dimensão de tempo única
df_tempo = df_bruto[['tpep_pickup_datetime', 'tpep_dropoff_datetime']].copy()

#Converte para objetos de data/hora, caso não estejam
df_tempo['tpep_pickup_datetime'] = pd.to_datetime(df_tempo['tpep_pickup_datetime'])
df_tempo['tpep_dropoff_datetime'] = pd.to_datetime(df_tempo['tpep_dropoff_datetime'])

#Empilha as duas colunas para ter uma lista única de todas as datas
datas_unicas = pd.concat([df_tempo['tpep_pickup_datetime'], df_tempo['tpep_dropoff_datetime']]).unique()
dim_tempo = pd.DataFrame(datas_unicas, columns=['datetime_completo'])
dim_tempo.dropna(inplace=True) # Remove valores nulos

#Cria a chave primária da dimensão
dim_tempo.reset_index(drop=True, inplace=True)
dim_tempo['id_tempo'] = dim_tempo.index + 1

#Cria atributos descritivos do tempo
dim_tempo['ano'] = dim_tempo['datetime_completo'].dt.year
dim_tempo['mes'] = dim_tempo['datetime_completo'].dt.month
dim_tempo['dia'] = dim_tempo['datetime_completo'].dt.day
dim_tempo['hora'] = dim_tempo['datetime_completo'].dt.hour
dim_tempo['dia_da_semana'] = dim_tempo['datetime_completo'].dt.day_name()

#Enriquecendo a dim_tempo com contexto de negócio
print("Enriquecendo a Dimensão Tempo...")

#Indicador de Fim de Semana (Sábado=5, Domingo=6)
dim_tempo['fim_de_semana'] = dim_tempo['datetime_completo'].dt.weekday.isin([5, 6])

#Período do Dia
def get_periodo_dia(hora):
    if 5 <= hora < 12:
        return 'Manhã'
    elif 12 <= hora < 18:
        return 'Tarde'
    elif 18 <= hora < 22:
        return 'Noite'
    else:
        return 'Madrugada'

dim_tempo['periodo_do_dia'] = dim_tempo['hora'].apply(get_periodo_dia)

#Indicador de Horário de Pico (Rush Hour) 7-9h e 17-19h em dias de semana
dim_tempo['horario_de_pico'] = (
    (~dim_tempo['fim_de_semana']) & 
    ((dim_tempo['hora'].between(7, 9)) | (dim_tempo['hora'].between(17, 19)))
)

display(dim_tempo.head())


### 2 - dim_localizacao ###
dim_localizacao = df_zonas.copy()
print("\nDimensão Localização ENRIQUECIDA criada.")
display(dim_localizacao.head())

### 3 - dim_pagamento ###
#Mapeia os códigos de tipo de pagamento para seus significados
pagamento_map = {
    1: 'Cartão de crédito',
    2: 'Dinheiro',
    3: 'Sem cobrança',
    4: 'Disputa',
    5: 'Desconhecido',
    6: 'Viagem anulada'
}
dim_pagamento = pd.DataFrame(df_bruto['payment_type'].unique(), columns=['id_pagamento'])
dim_pagamento.dropna(inplace=True)
dim_pagamento['desc_pagamento'] = dim_pagamento['id_pagamento'].map(pagamento_map)

print("\nDimensão Pagamento criada.")
display(dim_pagamento.head())

### 4 - dim_tarifa ###
#Mapeia os códigos de tarifa para suas descrições
tarifa_map = {
    1: 'Tarifa padrão',
    2: 'JFK',
    3: 'Newark',
    4: 'Nassau ou Westchester',
    5: 'Tarifa negociada',
    6: 'Viagem compartilhada'
}
dim_tarifa = pd.DataFrame(df_bruto['RatecodeID'].unique(), columns=['id_tarifa'])
dim_tarifa.dropna(inplace=True)

#Converte ID para inteiro para o merge funcionar corretamente
dim_tarifa['id_tarifa'] = dim_tarifa['id_tarifa'].astype(int)
dim_tarifa['desc_tarifa'] = dim_tarifa['id_tarifa'].map(tarifa_map)
print("\nDimensão Tarifa criada.")
display(dim_tarifa.head())

### 5 - dim_fornecedor ###
#Mapeia os códigos de fornecedor para seus nomes
fornecedor_map = {
    1: 'Creative Mobile Technologies, LLC',
    2: 'VeriFone Inc.'
}
dim_fornecedor = pd.DataFrame(df_bruto['VendorID'].unique(), columns=['id_fornecedor'])
dim_fornecedor.dropna(inplace=True)

#Converte ID para inteiro
dim_fornecedor['id_fornecedor'] = dim_fornecedor['id_fornecedor'].astype(int)
dim_fornecedor['desc_fornecedor'] = dim_fornecedor['id_fornecedor'].map(fornecedor_map)
print("\nDimensão Fornecedor criada.")
display(dim_fornecedor.head())

Iniciando a criação das tabelas de dimensão...
Enriquecendo a Dimensão Tempo...


Unnamed: 0,datetime_completo,id_tempo,ano,mes,dia,hora,dia_da_semana,fim_de_semana,periodo_do_dia,horario_de_pico
0,2024-01-01 00:57:55,1,2024,1,1,0,Monday,False,Madrugada,False
1,2024-01-01 00:03:00,2,2024,1,1,0,Monday,False,Madrugada,False
2,2024-01-01 00:17:06,3,2024,1,1,0,Monday,False,Madrugada,False
3,2024-01-01 00:36:38,4,2024,1,1,0,Monday,False,Madrugada,False
4,2024-01-01 00:46:51,5,2024,1,1,0,Monday,False,Madrugada,False



Dimensão Localização ENRIQUECIDA criada.


Unnamed: 0,id_localizacao,distrito,zona,zona_servico
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone



Dimensão Pagamento criada.


Unnamed: 0,id_pagamento,desc_pagamento
0,2,Dinheiro
1,1,Cartão de crédito
2,4,Disputa
3,3,Sem cobrança
4,0,



Dimensão Tarifa criada.


Unnamed: 0,id_tarifa,desc_tarifa
0,1,Tarifa padrão
1,5,Tarifa negociada
2,2,JFK
3,4,Nassau ou Westchester
4,99,



Dimensão Fornecedor criada.


Unnamed: 0,id_fornecedor,desc_fornecedor
0,2,VeriFone Inc.
1,1,"Creative Mobile Technologies, LLC"
2,6,


In [66]:
print("Iniciando a criação da Tabela Fato...")

df_fato = df_bruto.copy()

#Faz o merge (junção) para buscar as chaves da dimensão Tempo
#Renomeamos as colunas para evitar conflitos no merge
dim_tempo_pickup = dim_tempo.rename(columns={'datetime_completo': 'tpep_pickup_datetime', 'id_tempo': 'id_tempo_pickup'})
dim_tempo_dropoff = dim_tempo.rename(columns={'datetime_completo': 'tpep_dropoff_datetime', 'id_tempo': 'id_tempo_dropoff'})

df_fato = pd.merge(df_fato, dim_tempo_pickup[['tpep_pickup_datetime', 'id_tempo_pickup']], on='tpep_pickup_datetime', how='left')
df_fato = pd.merge(df_fato, dim_tempo_dropoff[['tpep_dropoff_datetime', 'id_tempo_dropoff']], on='tpep_dropoff_datetime', how='left')

#Seleciona apenas as chaves estrangeiras e as métricas para a tabela fato
fato_corridas = df_fato[[
    'id_tempo_pickup',
    'id_tempo_dropoff',
    'PULocationID',
    'DOLocationID',
    'payment_type',
    'passenger_count',
    'trip_distance',
    'fare_amount',
    'extra',
    'mta_tax',
    'tip_amount',
    'tolls_amount',
    'improvement_surcharge',
    'total_amount'
]].copy()

#Renomeia as colunas de ID para o padrão de chave estrangeira
fato_corridas.rename(columns={
    'PULocationID': 'id_local_pickup',
    'DOLocationID': 'id_local_dropoff',
    'payment_type': 'id_pagamento'
}, inplace=True)

#Remove registros onde alguma chave estrangeira seja nula para manter a integridade
fato_corridas.dropna(inplace=True)

#Converte as chaves para inteiros
for col in ['id_tempo_pickup', 'id_tempo_dropoff', 'id_local_pickup', 'id_local_dropoff', 'id_pagamento']:
    fato_corridas[col] = fato_corridas[col].astype(int)

print("Tabela Fato criada com sucesso!")
display(fato_corridas.head())

Iniciando a criação da Tabela Fato...
Tabela Fato criada com sucesso!


Unnamed: 0,id_tempo_pickup,id_tempo_dropoff,id_local_pickup,id_local_dropoff,id_pagamento,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,1,4572,186,79,2,1.0,1.72,17.7,1.0,0.5,0.0,0.0,1.0,22.7
1,2,2671,140,236,1,1.0,1.8,10.0,3.5,0.5,3.75,0.0,1.0,18.75
2,3,2375,236,79,1,1.0,4.7,23.3,3.5,0.5,3.0,0.0,1.0,31.3
3,4,2632,79,211,1,1.0,1.4,10.0,3.5,0.5,2.0,0.0,1.0,17.0
4,5,924,211,148,1,1.0,0.8,7.9,3.5,0.5,3.2,0.0,1.0,16.1


In [67]:
print("Enriquecendo a Tabela Fato com métricas derivadas...")

df_fato_enriquecido = df_fato.copy()

### 1 - Duração da Viagem (em minutos) ##
#Calcula a diferença entre o desembarque e o embarque, que resulta em um 'Timedelta'
duracao = df_fato_enriquecido['tpep_dropoff_datetime'] - df_fato_enriquecido['tpep_pickup_datetime']

#Converte o Timedelta para o total de minutos
df_fato_enriquecido['duracao_minutos'] = duracao.dt.total_seconds() / 60

### 2 - Velocidade Média (vamos usar milhas por hora, pois a distância está em milhas) ##
#Converte a duração para horas
df_fato_enriquecido['duracao_horas'] = df_fato_enriquecido['duracao_minutos'] / 60

#Evita divisão por zero para viagens com duração zero
#Se a duração for 0, a velocidade também será 0
df_fato_enriquecido['velocidade_media_mph'] = df_fato_enriquecido['trip_distance'] / df_fato_enriquecido['duracao_horas']

#Substitui valores infinitos (caso de distância > 0 e tempo = 0) por 0
df_fato_enriquecido['velocidade_media_mph'].replace([float('inf'), -float('inf')], 0, inplace=True)

### 3 - Percentual de Gorjeta ###
#Calcula a gorjeta como uma porcentagem do valor da tarifa (fare_amount)
#Evita divisão por zero se a tarifa for 0
df_fato_enriquecido['percentual_gorjeta'] = 0.0 

#Calcula o percentual apenas onde fare_amount > 0
mask_tarifa_valida = df_fato_enriquecido['fare_amount'] > 0
df_fato_enriquecido.loc[mask_tarifa_valida, 'percentual_gorjeta'] = (df_fato_enriquecido['tip_amount'] / df_fato_enriquecido['fare_amount']) * 100

### 4 - Custo por Distância (custo total por milha) ###
df_fato_enriquecido['custo_por_milha'] = 0.0 

#Calcula o custo apenas onde a distância > 0
mask_distancia_valida = df_fato_enriquecido['trip_distance'] > 0
df_fato_enriquecido.loc[mask_distancia_valida, 'custo_por_milha'] = df_fato_enriquecido['total_amount'] / df_fato_enriquecido['trip_distance']


print("Métricas derivadas criadas com sucesso!")
df_fato_enriquecido[['duracao_minutos', 'velocidade_media_mph', 'percentual_gorjeta', 'custo_por_milha']].describe()

Enriquecendo a Tabela Fato com métricas derivadas...


The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df_fato_enriquecido['velocidade_media_mph'].replace([float('inf'), -float('inf')], 0, inplace=True)


Métricas derivadas criadas com sucesso!


Unnamed: 0,duracao_minutos,velocidade_media_mph,percentual_gorjeta,custo_por_milha
count,2964624.0,2963866.0,2964624.0,2964624.0
mean,15.61295,13.71395,22.80727,15.88416
std,34.85105,1087.522,2642.288,173.3914
min,-13.56667,-28296.0,0.0,-50100.0
25%,7.15,7.238606,5.882353,8.055556
50%,11.63333,9.570732,24.0404,11.31915
75%,18.68333,13.02632,29.34579,15.47368
max,9455.4,1443334.0,4280000.0,50100.0


In [68]:
print("Iniciando a criação da Tabela Fato final...")

#Seleciona as colunas de chaves e as novas métricas do dataframe enriquecido
fato_corridas = df_fato_enriquecido[[
    'tpep_pickup_datetime',  #Temporário para merge
    'tpep_dropoff_datetime', #Temporário para merge
    'PULocationID',
    'DOLocationID',
    'payment_type',
    'RatecodeID', #Novo
    'VendorID',   #Novo
    'passenger_count',
    'trip_distance',
    'fare_amount',
    'extra',
    'mta_tax',
    'tip_amount',
    'tolls_amount',
    'improvement_surcharge',
    'total_amount',
    'duracao_minutos',        #Nova métrica
    'velocidade_media_mph',   #Nova métrica
    'percentual_gorjeta',     #Nova métrica
    'custo_por_milha'         #Nova métrica
]].copy()

#Faz o merge para buscar as chaves da dimensão Tempo
fato_corridas = pd.merge(fato_corridas, dim_tempo[['datetime_completo', 'id_tempo']].rename(columns={'datetime_completo': 'tpep_pickup_datetime'}), on='tpep_pickup_datetime', how='left')
fato_corridas.rename(columns={'id_tempo': 'id_tempo_pickup'}, inplace=True)
fato_corridas = pd.merge(fato_corridas, dim_tempo[['datetime_completo', 'id_tempo']].rename(columns={'datetime_completo': 'tpep_dropoff_datetime'}), on='tpep_dropoff_datetime', how='left')
fato_corridas.rename(columns={'id_tempo': 'id_tempo_dropoff'}, inplace=True)

#Limpa colunas temporárias
fato_corridas.drop(['tpep_pickup_datetime', 'tpep_dropoff_datetime'], axis=1, inplace=True)

#Renomeia as colunas de ID para o padrão de chave estrangeira
fato_corridas.rename(columns={
    'PULocationID': 'id_local_pickup',
    'DOLocationID': 'id_local_dropoff',
    'payment_type': 'id_pagamento',
    'RatecodeID': 'id_tarifa',
    'VendorID': 'id_fornecedor'
}, inplace=True)

#Remove registros onde alguma chave estrangeira seja nula
fato_corridas.dropna(inplace=True)

print("Tabela Fato final criada com sucesso!")
display(fato_corridas.head())

Iniciando a criação da Tabela Fato final...
Tabela Fato final criada com sucesso!


Unnamed: 0,id_local_pickup,id_local_dropoff,id_pagamento,id_tarifa,id_fornecedor,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,duracao_minutos,velocidade_media_mph,percentual_gorjeta,custo_por_milha,id_tempo_pickup,id_tempo_dropoff
0,186,79,2,1.0,2,1.0,1.72,17.7,1.0,0.5,0.0,0.0,1.0,22.7,19.8,5.212121,0.0,13.197674,1,4572
1,140,236,1,1.0,1,1.0,1.8,10.0,3.5,0.5,3.75,0.0,1.0,18.75,6.6,16.363636,37.5,10.416667,2,2671
2,236,79,1,1.0,1,1.0,4.7,23.3,3.5,0.5,3.0,0.0,1.0,31.3,17.916667,15.739535,12.875536,6.659574,3,2375
3,79,211,1,1.0,1,1.0,1.4,10.0,3.5,0.5,2.0,0.0,1.0,17.0,8.3,10.120482,20.0,12.142857,4,2632
4,211,148,1,1.0,1,1.0,0.8,7.9,3.5,0.5,3.2,0.0,1.0,16.1,6.1,7.868852,40.506329,20.125,5,924


In [None]:
# #Configurando as credenciais da AWS para esta sessão
# os.environ['AWS_ACCESS_KEY_ID'] = AWS_ACCESS_KEY_ID
# os.environ['AWS_SECRET_ACCESS_KEY'] = AWS_SECRET_ACCESS_KEY
# os.environ['AWS_DEFAULT_REGION'] = AWS_REGION

# #Função para carregar um DataFrame para uma pasta no S3 em formato Parquet
# def carregar_para_s3(df, pasta_tabela, nome_bucket):
#     print(f"Carregando a tabela '{pasta_tabela}' para o bucket '{nome_bucket}'...")
#     caminho_s3 = f"s3://{nome_bucket}/{pasta_tabela}/"
    
#     try:
#         df.to_parquet(caminho_s3, engine='pyarrow', index=False)
#         print(f"Dados de '{pasta_tabela}' carregados com sucesso em {caminho_s3}")
#     except Exception as e:
#         print(f"ERRO ao carregar a tabela '{pasta_tabela}': {e}")

# #Dicionário com os DataFrames finais e os nomes das pastas no S3
# tabelas_para_carregar = {
#     'dim_tempo': dim_tempo,
#     'dim_localizacao': dim_localizacao,
#     'dim_pagamento': dim_pagamento,
#     'dim_tarifa': dim_tarifa,           
#     'dim_fornecedor': dim_fornecedor,   
#     'fato_corridas': fato_corridas
# }

# for pasta, df in tabelas_para_carregar.items():
#     carregar_para_s3(df, pasta, NOME_DO_BUCKET)

# print("\n--- PIPELINE OF GODS FINALIZADO! ---")
# Célula 7: Função de Upload e Execução do Carregamento (VERSÃO ATUALIZADA E ROBUSTA)


os.environ['AWS_ACCESS_KEY_ID'] = AWS_ACCESS_KEY_ID
os.environ['AWS_SECRET_ACCESS_KEY'] = AWS_SECRET_ACCESS_KEY

def carregar_para_s3(df, pasta_tabela, nome_bucket, regiao_aws):
    """
    Função para carregar um DataFrame para uma pasta no S3 em formato Parquet,
    forçando a região correta.
    """
    print(f"Carregando a tabela '{pasta_tabela}' para o bucket '{nome_bucket}' na região '{regiao_aws}'...")
    caminho_s3 = f"s3://{nome_bucket}/{pasta_tabela}/data.parquet"
    
    #MUDANÇA PARA RESOLVER PROBLEMA DA REGIÃO
    #Criando um dicionário de 'opções de armazenamento' para passar ao Pandas.
    #Isso força o uso da região e das credenciais corretas, ignorando outras configurações.
    opcoes_de_armazenamento = {
        'key': AWS_ACCESS_KEY_ID,
        'secret': AWS_SECRET_ACCESS_KEY,
        'client_kwargs': {
            'region_name': regiao_aws
        }
    }
    
    try:
        #Passando as opções de armazenamento para o método to_parquet
        df.to_parquet(
            caminho_s3, 
            engine='pyarrow', 
            index=False,
            storage_options=opcoes_de_armazenamento #MAGIAAA
        )
        print(f"Dados de '{pasta_tabela}' carregados com sucesso em {caminho_s3}")
    except Exception as e:
        #Imprime o erro completo para um diagnóstico mais claro
        print(f"ERRO ao carregar a tabela '{pasta_tabela}':")
        print(e)

#Dicionário com os DataFrames finais e os nomes das pastas no S3
tabelas_para_carregar = {
    'dim_tempo': dim_tempo,
    'dim_localizacao': dim_localizacao,
    'dim_pagamento': dim_pagamento,
    'dim_tarifa': dim_tarifa,
    'dim_fornecedor': dim_fornecedor,
    'fato_corridas': fato_corridas
}

for pasta, df in tabelas_para_carregar.items():
    #variável AWS_REGION para a função
    carregar_para_s3(df, pasta, NOME_DO_BUCKET, AWS_REGION)

print("\n--- PIPELINE FINALIZADO! ---")

Carregando a tabela 'dim_tempo' para o bucket 'meu-portfolio-pipeline-dados' na região 'us-east-2'...
Dados de 'dim_tempo' carregados com sucesso em s3://meu-portfolio-pipeline-dados/dim_tempo/data.parquet
Carregando a tabela 'dim_localizacao' para o bucket 'meu-portfolio-pipeline-dados' na região 'us-east-2'...
Dados de 'dim_localizacao' carregados com sucesso em s3://meu-portfolio-pipeline-dados/dim_localizacao/data.parquet
Carregando a tabela 'dim_pagamento' para o bucket 'meu-portfolio-pipeline-dados' na região 'us-east-2'...
Dados de 'dim_pagamento' carregados com sucesso em s3://meu-portfolio-pipeline-dados/dim_pagamento/data.parquet
Carregando a tabela 'dim_tarifa' para o bucket 'meu-portfolio-pipeline-dados' na região 'us-east-2'...
Dados de 'dim_tarifa' carregados com sucesso em s3://meu-portfolio-pipeline-dados/dim_tarifa/data.parquet
Carregando a tabela 'dim_fornecedor' para o bucket 'meu-portfolio-pipeline-dados' na região 'us-east-2'...
Dados de 'dim_fornecedor' carregados