Podemos pensar que trabalhos de ingestão são uma forma particular de ETL. E para tornar as coisas ainda mais simples, na pratica, faremos um programa em python com a intenção de:
- extrair dados de alguma fonte
- realizar algum processamento necessário nos dados e por fim (de acordo como objetivo dessa aula do modulo)
- fazer o upload desses dados no nosso container servidor de PostgreSQL.  

In [4]:
# dependencias
import pandas as pd
from sqlalchemy import create_engine, text


Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


A etapa de extração não chega a ser um grande desafio, pois, vamos utilizar os arquivos .csv hospedados no github do zoompcamp.

https://github.com/DataTalksClub/nyc-tlc-data/releases/tag/yellow

Nesse sentido, o método read_csv do pandas vai ser muito conveniente para nós por ser capaz de ler hospedados online a partir da URL e também lidar com a descompressão. 

In [5]:
# Vamos ler as primeiras 1000 linhas para conhecer o dataset e inferir os tipos de dados

df = pd.read_csv(
    filepath_or_buffer="https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-01.csv.gz",
    compression="gzip",
    nrows=1000
)

df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 18 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   VendorID               1000 non-null   int64  
 1   tpep_pickup_datetime   1000 non-null   object 
 2   tpep_dropoff_datetime  1000 non-null   object 
 3   passenger_count        1000 non-null   int64  
 4   trip_distance          1000 non-null   float64
 5   RatecodeID             1000 non-null   int64  
 6   store_and_fwd_flag     1000 non-null   object 
 7   PULocationID           1000 non-null   int64  
 8   DOLocationID           1000 non-null   int64  
 9   payment_type           1000 non-null   int64  
 10  fare_amount            1000 non-null   float64
 11  extra                  1000 non-null   float64
 12  mta_tax                1000 non-null   float64
 13  tip_amount             1000 non-null   float64
 14  tolls_amount           1000 non-null   float64
 15  impro

In [6]:
dbengine = create_engine('postgresql://postgres:admin@localhost:5432')

In [13]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=dbengine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TEXT, 
	tpep_dropoff_datetime TEXT, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [14]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [15]:
# inputar o schema da tabela no banco de dados
df.head(n=0).to_sql(name='yellow_taxi_data', con=dbengine, if_exists='replace')

0

In [None]:
iter_chunk = 100000

df_iter = pd.read_csv(
    filepath_or_buffer="https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-01.csv.gz",
    compression="gzip",
    iterator=True,
    chunksize=iter_chunk
)

# Para fins de demonstração, vamos carregar apenas 1 milhão de linhas (10 chunks de 100 mil)
for i in range(10):
    try:
        df_chunk = next(df_iter)
        df_chunk.tpep_pickup_datetime = pd.to_datetime(df_chunk.tpep_pickup_datetime)
        df_chunk.tpep_dropoff_datetime = pd.to_datetime(df_chunk.tpep_dropoff_datetime)
        df_chunk.to_sql(name='yellow_taxi_data', con=dbengine, if_exists='append', index=False)
        print(f"> {i}00.000 linhas carregadas para a tabela yellow_taxi_data")
    except Exception as e:
        print(f"Processamento finalizado com erro: {e}")
    finally:
        print(f"> {i}00.000 linhas carregadas para a tabela yellow_taxi_data")