## Ingesting NY Taxi Data to Postgres 

Itens relacionados:
* [Notas](../../anotacoes/1_intro.md)
* [Vídeo 1](https://www.youtube.com/watch?v=2JM-ziJt0WI&list=PL3MmuxUbc_hJed7dXYoJw8DoCuVHhGEQb&index=5)
* [Vídeo 2](https://www.youtube.com/watch?v=3IkfkTwqHx4&list=PL3MmuxUbc_hJed7dXYoJw8DoCuVHhGEQb&index=6)

### Instalação de requisitos

No meu computador eu não possuía o Anaconda para Python, então não tinha o Jupyter Notebook do pacote.
Utilizei o [`pyenv`](https://github.com/pyenv/pyenv) para Linux e com ele já veio o [`pyenv-virtualenv`](https://github.com/pyenv/pyenv-virtualenv) que permite gerenciar ambientes virtuais para Python em sistemas baseados em UNIX. É uma espécie de _wrapper_, que centraliza os ambientes virtuais em um diretório raíz.
Após instalar o `pyenv` com o [`pyenv-installer`](https://github.com/pyenv/pyenv-installer), criei um ambiente virtual e instalei os seguintes pacotes com o `pip`:

`pip install ipykernel pandas sqlalchemy pyarrow psycopg2-binary`

* `ipykernel`: necessário para rodar os arquivos Jupyter Notebook
* `pandas`: biblioteca que fornece ferramentas para análise e manipulação de dados
* `sqlalchemy`: biblioteca com ferramentas e ORM (_Object Relational Mapper_) para utilização de SQL aliado ao Python
* `pyarrow`: plataforma de desenvolvimento para análises _in-memory_
* `psycopg2-binary`: adaptador Python para PostgreSQL

### Iniciando o trabalho de investigação dos dados

In [1]:
import pandas as pd
# pd.__version__

In [2]:
import pyarrow.csv as csv
import pyarrow.parquet as pq

O arquivo com os dados é muito grande e o `pandas` não é capaz de lidar com ele da melhor maneira por questões de memória RAM. 
No curso os dados vêm em CSV, porém em julho/2022 a extensão no site mudou para PARQUET. Para seguir os passos apresentados no curso, foi feita a conversão de PARQUET para CSV usando o `pyarrow`:

In [3]:
table = pq.read_table('./nyc_taxi_data/yellow_tripdata_2021-01.parquet')
options = csv.WriteOptions(include_header=True)
csv.write_csv(
    table, 
    './nyc_taxi_data/yellow_tripdata_2021-01.csv', 
    options)

O arquivo CSV vai ser usado posteriormente na hora de inserir os dados no banco de dados no `PostgreSQL`. 
Optei por criar o `DataFrame` do `pandas` utilizando a `table` obtida pela leitura do arquivo em PARQUET porque ao fazer isso, garanti o tipo de dados `TIMESTAMP` de maneira direta. No curso, ao ler do CSV o tipo das colunas `TIMESTAMP` vão para `TEXT` e ele faz a conversão de forma manual toda vez que lê do arquivo de origem.

Por ora, precisaremos apenas das 100 primeiras linhas:

In [4]:
df = table.to_pandas().head(n=100)
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.10,1.0,N,142,43,2,8.0,3.0,0.5,0.00,0.0,0.3,11.80,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.20,1.0,N,238,151,2,3.0,0.5,0.5,0.00,0.0,0.3,4.30,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.70,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.60,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,2,2021-01-01 00:12:41,2021-01-01 00:26:47,1.0,4.13,1.0,N,161,226,1,14.5,0.5,0.5,3.66,0.0,0.3,21.96,2.5,
96,2,2021-01-01 00:23:29,2021-01-01 00:35:03,2.0,4.12,1.0,N,162,74,2,13.5,0.5,0.5,0.00,0.0,0.3,17.30,2.5,
97,2,2021-01-01 00:46:17,2021-01-01 00:54:25,2.0,2.22,1.0,N,144,170,1,9.0,0.5,0.5,2.56,0.0,0.3,15.36,2.5,
98,2,2021-01-01 00:28:16,2021-01-01 00:51:44,1.0,7.11,1.0,N,264,264,2,23.5,0.5,0.5,0.00,0.0,0.3,24.80,0.0,


Vamos agora verificar o **_schema_** necessário. O _schema_ é uma estrutura lógica de dados que, no `PostgreSQL` serve como coleção de tabelas, _views_, funções, restrições (_constraints_), índices, etc. No nosso caso, vamos ver como vai ser a criação da tabela para inserir os dados. O `pandas` consegue nos dar o **DDL** (_Data Definition Language_) em SQL com as instruções necessárias para criar a tabela:

In [5]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "RatecodeID" REAL,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL,
  "airport_fee" REAL
)


**IMPORTANTE: Precisamos "printar" para que o resultado venha como uma instrução SQL.** 

### Criando a tabela no banco de dados

Apesar de termos as instruções DDL, tudo não passou de uma verificação de qual será a instrução passado para o `PostgreSQL`, não foi feita a criação e nem mesmo a conexão com o banco de dados ainda. Iremos utilizar o `sqlalchemy` para isso:

In [6]:
from sqlalchemy import create_engine

Um **engine** especifica os detalhes do banco de dados em uma **URI** (_Uniform Resource Identifier_):
`database://user:password@host:port/database_name`

In [7]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [8]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7f36bc016d10>

**IMPORTANTE: Só vamos conseguir se conectar no banco de dados se o contêiner do `PostgreSQL` já foi criado no `Docker` e estiver rodando.**

Passando o valor da nossa **URI de conexão** para o argumento `con=` conseguimos trazer o DDL específico para o `PostgreSQL`, com todos os tipos de dados suportados e automaticamente reconhecidos.

In [9]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




Agora iremos criar um **iterador** para nos permitir ler o CSV em blocos de 100.000 linhas e enviá-los ao banco de dados, para evitar incorrer em erros ao tentar inserir muitos registros de uma só vez.

In [10]:
df_iter = pd.read_csv(
    './nyc_taxi_data/yellow_tripdata_2021-01.csv',
    iterator=True,
    chunksize=100000)

Como se trata de um iterador, conseguimos usar a função `next()` para acessar seu valor.

In [11]:
df = next(df_iter)
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10.000000,2021-01-01 00:36:12.000000,1,2.10,1,N,142,43,2,8.0,3.0,0.5,0.00,0.0,0.3,11.80,2.5,
1,1,2021-01-01 00:51:20.000000,2021-01-01 00:52:19.000000,1,0.20,1,N,238,151,2,3.0,0.5,0.5,0.00,0.0,0.3,4.30,0.0,
2,1,2021-01-01 00:43:30.000000,2021-01-01 01:11:06.000000,1,14.70,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48.000000,2021-01-01 00:31:01.000000,0,10.60,1,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49.000000,2021-01-01 00:48:21.000000,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,1,2021-01-04 14:04:31.000000,2021-01-04 14:08:52.000000,3,0.70,1,N,234,224,2,5.0,2.5,0.5,0.00,0.0,0.3,8.30,2.5,
99996,1,2021-01-04 14:18:46.000000,2021-01-04 14:35:45.000000,2,3.30,1,N,234,236,1,14.5,2.5,0.5,3.55,0.0,0.3,21.35,2.5,
99997,1,2021-01-04 14:42:41.000000,2021-01-04 14:59:22.000000,2,4.70,1,N,236,79,1,17.0,2.5,0.5,4.05,0.0,0.3,24.35,2.5,
99998,2,2021-01-04 14:39:02.000000,2021-01-04 15:09:37.000000,2,17.95,2,N,132,148,1,52.0,0.0,0.5,5.00,0.0,0.3,60.30,2.5,


Como lemos do CSV, ao vermos o tipo de dados das colunas através do comando do `pandas` que gera o _schema_, podemos perceber aquele problema com as colunas de tipo `TIMESTAMP`:

In [12]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TEXT,
  "tpep_dropoff_datetime" TEXT,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL,
  "airport_fee" REAL
)


Portanto, precisamos converter manualmente essas colunas:

In [13]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL,
  "airport_fee" REAL
)


Finalmente vamos criar a tabela no banco de dados. Com o comando `df.head(n=0)` nós conseguimos somente os nomes das colunas. Nós usaremos isso para criar a instrução SQL que vai gerar a tabela.

In [14]:
df.head(n=0).to_sql(
    name='yellow_taxi_data',
    con=engine,
    if_exists='replace'
)

0

Agora, usando o comando `read_sql` do `pandas` e a _query_ abaixo, conseguimos ver as tabelas criadas no banco de dados.

In [15]:
query = """
SELECT *
FROM pg_catalog.pg_tables
WHERE schemaname != 'pg_catalog'
AND schemaname != 'information_schema';
"""
pd.read_sql(query, con=engine)

Unnamed: 0,schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity
0,public,yellow_taxi_data,root,,True,False,False,False


Conseguimos ver também informações sobre a tabela criada com a seguinte _query_. Assim, constatamos que os tipos de dados que obtivemos ao consultar o _schema_ que iria ser criado pelo `pandas.io.sql.get_schema()` ao inserir os dados no `PostgreSQL` se concretizaram através do método `pd.DataFrame.to_sql()`.


In [16]:
query_describe = """
SELECT * FROM information_schema.columns
WHERE table_name = 'yellow_taxi_data';
"""
pd.read_sql(query_describe, con=engine)

Unnamed: 0,table_catalog,table_schema,table_name,column_name,ordinal_position,column_default,is_nullable,data_type,character_maximum_length,character_octet_length,...,is_identity,identity_generation,identity_start,identity_increment,identity_maximum,identity_minimum,identity_cycle,is_generated,generation_expression,is_updatable
0,ny_taxi,public,yellow_taxi_data,airport_fee,20,,YES,double precision,,,...,NO,,,,,,NO,NEVER,,YES
1,ny_taxi,public,yellow_taxi_data,VendorID,2,,YES,bigint,,,...,NO,,,,,,NO,NEVER,,YES
2,ny_taxi,public,yellow_taxi_data,tpep_pickup_datetime,3,,YES,timestamp without time zone,,,...,NO,,,,,,NO,NEVER,,YES
3,ny_taxi,public,yellow_taxi_data,tpep_dropoff_datetime,4,,YES,timestamp without time zone,,,...,NO,,,,,,NO,NEVER,,YES
4,ny_taxi,public,yellow_taxi_data,passenger_count,5,,YES,bigint,,,...,NO,,,,,,NO,NEVER,,YES
5,ny_taxi,public,yellow_taxi_data,trip_distance,6,,YES,double precision,,,...,NO,,,,,,NO,NEVER,,YES
6,ny_taxi,public,yellow_taxi_data,RatecodeID,7,,YES,bigint,,,...,NO,,,,,,NO,NEVER,,YES
7,ny_taxi,public,yellow_taxi_data,index,1,,YES,bigint,,,...,NO,,,,,,NO,NEVER,,YES
8,ny_taxi,public,yellow_taxi_data,PULocationID,9,,YES,bigint,,,...,NO,,,,,,NO,NEVER,,YES
9,ny_taxi,public,yellow_taxi_data,DOLocationID,10,,YES,bigint,,,...,NO,,,,,,NO,NEVER,,YES


Vamos incluir o bloco de 100.000 registros no banco de dados para verificar o tempo que levará para a inserção.

In [17]:
%time df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

CPU times: user 6.93 s, sys: 183 ms, total: 7.11 s
Wall time: 12.4 s


1000

Vendo a quantidade de dados inseridos na tabela no `PostgreSQL`, que deve bater com a quantidade da origem (100.000 registros).

In [18]:
pd.read_sql(sql="SELECT COUNT(1) FROM yellow_taxi_data;", con=engine)

Unnamed: 0,count
0,100000


Apesar de não ser o melhor código possível, é criado um _loop_ para iterar sobre o arquivo CSV, extraindo blocos de 100.000 registros por vez e os armazenando em `DataFrames` na memória, permitindo a inserção aos poucos na tabela do `PostgreSQL`.

In [19]:
from time import time

while True:
    try:
        t_start = time()
        df = next(df_iter)

        df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
        df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

        df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

        t_end = time()

        print(f'Inserido outro bloco, levou {(t_end - t_start):3f} segundos')
    except StopIteration:
        print('Inserção completa')
        break

Inserido outro bloco, levou 11.089297 segundos
Inserido outro bloco, levou 10.915970 segundos
Inserido outro bloco, levou 11.186382 segundos
Inserido outro bloco, levou 12.459067 segundos
Inserido outro bloco, levou 13.972321 segundos
Inserido outro bloco, levou 14.677891 segundos
Inserido outro bloco, levou 12.561882 segundos
Inserido outro bloco, levou 12.326394 segundos
Inserido outro bloco, levou 14.631574 segundos
Inserido outro bloco, levou 11.096790 segundos
Inserido outro bloco, levou 12.374445 segundos


  df = next(df_iter)


Inserido outro bloco, levou 10.077468 segundos
Inserido outro bloco, levou 7.987357 segundos
Inserção completa


Checando se na tabela consta a mesma quantidade de registros que no arquivo de origem.

#### Contagem na ORIGEM

In [20]:
len(table)

1369769

#### Contagem no DESTINO

In [21]:
pd.read_sql(sql="SELECT COUNT(1) FROM yellow_taxi_data;", con=engine)

Unnamed: 0,count
0,1369769
