In [185]:
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from google.cloud import storage
from google.cloud import bigquery
from google.oauth2 import service_account

In [186]:
storage_credentials = service_account.Credentials.from_service_account_file('../.env/storage_credentials.json')
bigquery_credentials = service_account.Credentials.from_service_account_file('../.env/bigquery_credentials.json')

In [187]:
def download_blob(bucket, blob, destination):
    """Downloads a blob from the bucket.
    - bucket_name: nome do bucket no GCP
    - source_blob_name: nome do arquivo a baixar
    - destination: destino"""

    storage_client = storage.Client(credentials=storage_credentials)

    bucket = storage_client.bucket(bucket)

    blob = bucket.blob(blob)
    blob.download_to_filename(destination)

    print(
        "Download do arquivo {} do bucket {} para diretório {}.".format(
            blob, bucket, destination
        )
    )

In [193]:
os.mkdir('../bronze')
download_blob('tlc-trip-bronze', 'data.parquet', '../bronze/data.parquet')
download_blob('tlc-trip-bronze', 'taxi_zone.csv', '../bronze/taxi_zone.csv')

Download do arquivo <Blob: tlc-trip-bronze, data.parquet, 1722093615910258> do bucket <Bucket: tlc-trip-bronze> para diretório ../bronze/data.parquet.
Download do arquivo <Blob: tlc-trip-bronze, taxi_zone.csv, 1722093597014586> do bucket <Bucket: tlc-trip-bronze> para diretório ../bronze/taxi_zone.csv.


In [189]:
table = pq.read_table('../bronze/data.parquet')
df_taxi_zone = pd.read_csv('../bronze/taxi_zone.csv')

In [190]:
df_data = table.to_pandas()

codigo_tarifa = pd.DataFrame(df_data['RatecodeID']).rename(columns={'RatecodeID': 'id_codigo_tarifa'})
flag = pd.DataFrame(df_data['store_and_fwd_flag']).rename(columns={'store_and_fwd_flag': 'id_flag'})
tipo_pagamento = pd.DataFrame(df_data['payment_type']).rename(columns={'payment_tipe': 'id_tipo_pagamento'})
provedor = pd.DataFrame(df_data['VendorID']).rename(columns={'VendorID': 'id_provedor'})
id_inicio = pd.DataFrame(df_data['PULocationID'])
local_inicio = pd.merge(id_inicio, df_taxi_zone, how='left', left_on='PULocationID', right_on='LocationID')\
                .drop(columns='LocationID')\
                .rename(columns={'PULocationID': 'id_local_inicio', 'Borough': 'bairro', 'Zone': 'zona', 'service_zone': 'zona_servico'})
id_termino = pd.DataFrame(df_data['DOLocationID'])
local_termino = pd.merge(id_termino, df_taxi_zone, how='left', left_on='DOLocationID', right_on='LocationID')\
                .drop(columns='LocationID')\
                .rename(columns={'DOLocationID': 'id_local_termino', 'Borough': 'bairro', 'Zone': 'zona', 'service_zone': 'zona_servico'})
calendario = pd.DataFrame(df_data[['tpep_pickup_datetime', 'tpep_dropoff_datetime']])\
                .rename(columns={'tpep_pickup_datetime': 'inicio_datetime', 'tpep_dropoff_datetime': 'termino_datetime'})
viagens = df_data.drop(columns='total_amount').rename(columns={
    'VendorID': 'id_provedor',
    'tpep_pickup_datetime': 'inicio_datetime',
    'tpep_dropoff_datetime': 'termino_datetime', 
    'passenger_count': 'qtd_passageiros',
    'trip_distance': 'distancia',
    'RatecodeID': 'id_codigo_tarifa',
    'store_and_fwd_flag': 'id_flag',
    'PULocationID': 'id_local_inicio',
    'DOLocationID': 'id_local_termino',
    'payment_type': 'tipo_pagamento',
    'fare_amount': 'vlr_tarifa',
    'extra': 'vlr_extra',
    'mta_tax': 'vlr_tributo_mta',
    'tip_amount': 'vlr_gorgeta',
    'tolls_amount': 'vlr_pedagio',
    'improvement_surcharge': 'vlr_sobretaxa_melhoria',
    'congestion_surcharge': 'vlr_sobretaxa_congestionamento',
    'Airport_fee': 'vlr_tarifa_aeroporto'
})

In [198]:
client = bigquery.Client(credentials=bigquery_credentials, project=bigquery_credentials.project_id)
dataset = bigquery.Dataset(f'{client.project}.silver')
dataset = client.create_dataset(dataset, exists_ok=True)

In [199]:
export = {
    'codigo_tarifa': codigo_tarifa, 
    'flag': flag, 
    'tipo_pagamento': tipo_pagamento, 
    'provedor': provedor, 
    'local_inicio': local_inicio, 
    'local_termino': local_termino, 
    'calendario': calendario, 
    'viagens': viagens
    }

for table_name, table in export.items():
    table_id = f'{client.project}.{dataset.dataset_id}.silver_{table_name}'
    job = client.load_table_from_dataframe(table, table_id)
    job.result()
    print(f'Tabela "{table_name}" carregada com sucesso')

Tabela "codigo_tarifa" carregada com sucesso
Tabela "flag" carregada com sucesso
Tabela "tipo_pagamento" carregada com sucesso
Tabela "provedor" carregada com sucesso
Tabela "local_inicio" carregada com sucesso
Tabela "local_termino" carregada com sucesso
Tabela "calendario" carregada com sucesso
Tabela "viagens" carregada com sucesso
