In [38]:
import pandas as pd

import pyarrow.csv as pv
import pyarrow.parquet as pq
from pyarrow import concat_tables

from datetime import datetime, timedelta
import os, re

In [36]:
def csv_to_parquet(path, delimiter=';', encoding='Latin 1'):
    parse_options = pv.ParseOptions(delimiter=delimiter)
    read_options = pv.ReadOptions(encoding=encoding)
    
    if not path.endswith('/'):
        path=path+'/'
        
    files = [f for f in os.listdir(path) if re.search(f'{state}_\d+.csv', f)]
    
    list_tables = []
    
    for f in files:
        tmp = pv.read_csv(
        path+f,
        read_options=read_options,
        parse_options=parse_options
        )
        
        list_tables.append(tmp)
        
        del tmp
        
    table = concat_tables(list_tables)

    pq.write_table(table, path+'dois_turnos.parquet')

In [40]:
def prepare_data(filepath):
    
    cols = ['TS_GERACAO', 'NR_TURNO', 'SG_UF', 'CD_MUNICIPIO', 'NM_MUNICIPIO', 'NR_ZONA',
       'NR_SECAO', 'NR_LOCAL_VOTACAO', 'CD_CARGO_PERGUNTA',
       'DS_CARGO_PERGUNTA', 'NR_PARTIDO', 'SG_PARTIDO', 'NM_PARTIDO',
       'DT_BU_RECEBIDO', 'QT_APTOS', 'QT_COMPARECIMENTO', 'QT_ABSTENCOES',
       'CD_TIPO_URNA', 'DS_TIPO_URNA', 'CD_TIPO_VOTAVEL', 'DS_TIPO_VOTAVEL',
       'NR_VOTAVEL', 'NM_VOTAVEL', 'QT_VOTOS',]
    
    df = pd.read_parquet(filepath)
    
    prepared = df.assign(
        TS_GERACAO = lambda df: pd.to_datetime(df.DT_GERACAO + ' ' + df.HH_GERACAO.astype(str))
        )\
        .drop(columns=['DT_GERACAO','HH_GERACAO'])\
        [cols]
    
    return prepared

In [90]:
def write_prepared_df_to_parquet(df, path):
    df.to_parquet(path)

In [41]:
def connect_to_postgres(
    user='postgres',
    password = 'postgres',
    host = 'localhost',
    port = '5432',
    dbname = 'ny_taxi_data'):
    
    engine = sqlalchemy.create_engine(f"postgresql://{user}:{password}@{host}:{port}/{dbname}")
    
    return engine

In [42]:
def upload_parquet_to_postgres(connection, filepath, table_name, if_exists='append', batch_size=10**5):
    
    df = pd.read_parquet(filepath)
    
    chunks = df.shape[0] // 10 ** 5 + 1
    
    i = 0
    while i < chunks:
        to_db = df.iloc[i * batch_size:(i + 1) * batch_size, :]
        to_db.to_sql(table_name, con=connection, if_exists=if_exists)
        print(f"sent chunk number {i+1}")
        i = i + 1