In [21]:
from kedro.io import MemoryDataSet
from typing import Dict
import pandas as pd 

In [22]:
def download_file() -> MemoryDataSet:
    import wget
    file_url = 'http://www.anp.gov.br/arquivos/dados-estatisticos/vendas-combustiveis/vendas-combustiveis-m3.xls'
    path_file = f'./data/01_raw/vendas-combustiveis-m3.xls'
    wget.download(file_url, path_file)
    return MemoryDataSet(data=[])

In [24]:
def convert_xls_ods(data: MemoryDataSet) -> MemoryDataSet:
    import os
    os.chdir(f'./data/01_raw/')
    stream = os.popen('soffice --headless --convert-to ods vendas-combustiveis-m3.xls')
    output = stream.read()
    print(output)
    return data.save(data=['convertido'])

In [25]:
def partition_by_date(df: pd.DataFrame) -> Dict[str, pd.DataFrame]:
    parts = {}
    
    for date in df['year_month'].unique():
        parts[f'date=={date}'] = df[df['year_month'] == date]
    return parts

In [26]:
def cleaning(data: MemoryDataSet) -> None:
    from pandas_ods_reader import read_ods
    import pandas as pd
    from kedro.extras.datasets.pandas import ParquetDataSet 
    from datetime import datetime
    pd.set_option('display.precision', 2)
    UF = {
        'Acre': 'AC',
        'Alagoas': 'AL',
        'Amazonas': 'AM',
        'Amapá': 'AP',
        'Bahia': 'BA',
        'Ceará': 'CE',
        'Distrito Federal': 'DF',
        'Espírito Santo': 'ES',
        'Goiás': 'GO',
        'Maranhão': 'MA',
        'Minas Gerais': 'MG',
        'Mato Grosso do Sul': 'MS',
        'Mato Grosso': 'MT',
        'Pará': 'PA',
        'Paraíba': 'PB',
        'Pernambuco': 'PE',
        'Piauí': 'PI',
        'Paraná': 'PR',
        'Rio de Janeiro': 'RJ',
        'Rio Grande do Norte': 'RN',
        'Rondônia': 'RO',
        'Roraima': 'RR',
        'Rio Grande do Sul': 'RS',
        'Santa Catarina': 'SC',
        'Sergipe': 'SE',
        'São Paulo': 'SP',
        'Tocantins': 'TO'
    }

    UF = dict((k.upper(), v.upper()) for k, v in UF.items()) 

    months = [
        'Jan',
        'Fev',
        'Mar',
        'Abr',
        'Mai',
        'Jun',
        'Jul',
        'Ago',
        'Set',
        'Out',
        'Nov',
        'Dez'
    ]

    derivados = ['ETANOL HIDRATADO', 'GASOLINA C', 'GASOLINA DE AVIAÇÃO', 'GLP', 'ÓLEO COMBUSTÍVEL', 'ÓLEO DIESEL', 'QUEROSENE DE AVIAÇÃO', 'QUEROSENE ILUMINANTE']
    diesel = ['ÓLEO DIESEL (OUTROS )', 'ÓLEO DIESEL MARÍTIMO', 'ÓLEO DIESEL S-10', 'ÓLEO DIESEL S-500', 'ÓLEO DIESEL S-1800']

    #Carregando arquivos ods
    df_2 = read_ods(f'./data/01_raw/vendas-combustiveis-m3.ods', 2)
    df_3 = read_ods(f'./data/01_raw/vendas-combustiveis-m3.ods', 3)

    #concatenando e montando dataset
    dataset = pd.concat([df_2, df_3])

    final = list()
    for row in dataset.fillna(0).to_dict(orient='records'):
        _header = {
            'uf': UF[row['ESTADO']],
            'product': row['COMBUSTÍVEL'].replace(' (m3)', ''),
            'unit': 'm3',
        }
        if _header['product'] in derivados + diesel:
            for index, month in enumerate(months):
                final.append({
                  **{
                    'year_month': datetime.strptime(f"{int(row['ANO'])}-{index + 1}", '%Y-%m'),
                    'volume': row[month],
                    'created_at': datetime.now()
                }
                , **_header, 
                })
    final = pd.DataFrame(final)[['year_month', 'uf', 'product', 'unit', 'volume', 'table', 'created_at']]

    parquetDataset = ParquetDataSet(filepath=f'{context.project_path}/data/02_intermediate/amostra_final.parquet')
    parquetDataset.save(final)
    return None

In [4]:
download_file()

In [5]:
convert_xls_ods()

convert /Users/victorapolonio/Code/raizen_etl/data/01_raw/vendas-combustiveis-m3.xls -> /Users/victorapolonio/Code/raizen_etl/data/01_raw/vendas-combustiveis-m3.ods using filter : calc8



In [8]:
cleaning()