**Leitura de arquivos - Dados da ANP**

**Definição do problema:**
Leitura de arquivos excel e estruturação dos dados

**Metas**:
Geração de duas tabelas
1. Vendas de combustíveis derivados de petróleo por UF e produto
2. Vendas de diesel por UF e tipo


A tabela deve ter o seguinte formato (schema):
* year_month: date
* uf: string
* product: string
* unit: string
* volume: double
* created_at: timestamp


In [2]:
#libs necessarias para leitura de arquivos a partir do google drive
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/drive


In [3]:
#importacao de libs usadas
import pandas as pd
import numpy as np
import datetime
from pandas.tseries.offsets import MonthEnd

In [5]:
#caminho das pastas dos arquivos
path_folder_tb1 = '/content/dados/tb1_vd_comb_uf_prod/'
path_folder_tb2 = '/content/dados/tb2_vd_diesel_uf_tipo/'

#nome dos arquivos
path_files_tb1 = ['ETANOL HIDRATADO (m3).xlsx', 'GASOLINA C (m3).xlsx', 'GASOLINA DE AVIAÇÃO (m3).xlsx', 'GLP (m3).xlsx','ÓLEO COMBUSTÍVEL (m3).xlsx',
                  'ÓLEO DIESEL (m3).xlsx','QUEROSENE DE AVIAÇÃO (m3).xlsx','QUEROSENE ILUMINANTE (m3).xlsx' ]

path_files_tb2 = ['ÓLEO DIESEL (OUTROS ) (m3).xlsx','ÓLEO DIESEL MARÍTIMO (m3).xlsx','ÓLEO DIESEL S-10 (m3).xlsx','ÓLEO DIESEL S-500 (m3).xlsx','ÓLEO DIESEL S-1800 (m3).xlsx']

list_sheet_names = ['ACRE', 'ALAGOAS','AMAPÁ','AMAZONAS','BAHIA','CEARÁ','DISTRITO FEDERAL','ESPÍRITO SANTO','GOIÁS','MARANHÃO','MATO GROSSO','MATO GROSSO DO SUL','MINAS GERAIS','PARÁ','PARAÍBA',
                    'PARANÁ','PERNAMBUCO','PIAUÍ','RIO DE JANEIRO','RIO GRANDE DO NORTE','RIO GRANDE DO SUL','RONDÔNIA','RORAIMA','SANTA CATARINA','SÃO PAULO','SERGIPE','TOCANTINS']

replace_values = {'Janeiro' : 'Jan', 'Fevereiro' : 'Feb', 'Março' : 'Mar', 'Abril': 'Apr', 'Maio': 'May', 'Junho': 'Jun', 'Julho': 'Jul', 'Agosto': 'Aug', 'Setembro': 'Sep', 'Outubro': 'Oct',
                  'Novembro': 'Nov', 'Dezembro': 'Dec'}

In [6]:
'''
Definicao de funcao para tratamento dos arquivos de entrada, paramentros:
path_folder: caminho da pasta
lst_path_file: lista com os nomes dos arquivos
lst_sheet: lista com os nomes das planilhas a serem lidas
replace_values: dicionario para replace de colunas
'''

def creatDF(path_folder, lst_path_file, lst_sheet, replace_values=replace_values):
    all_data = pd.DataFrame()
    for i in lst_path_file: #percorre a lista de arquivos
        path = path_folder + i

        for j in lst_sheet: #percorre a lista de planilhas
          df = pd.read_excel(path, j, header=4, nrows=12)
          df.rename(columns={'Dados':'Month'}, inplace=True)
          df = df.replace({"Month": replace_values})  
          df = pd.melt(df.reset_index(), id_vars=['Month'], var_name='Year', value_name='volume').sort_values(['Year','Month'], ascending=False) #transformacao de colunas em linhas
          df = df[df.Year.str.contains("index") == False]
          #criacao de novas colunas
          df['uf'] = j
          df['product'] = i.split('.')[0]
          product = i.split('.')[0]
          df['unit'] = product.split('(')[1].replace(')','') 
          df['created_at'] = datetime.datetime.now()
          df['year_month'] = df['Year'] + '-' + df['Month']

          df.drop(columns=['Month', 'Year'], inplace=True) #exclusao de colunas
          df = df.reindex(columns=['year_month', 'uf', 'product', 'unit', 'volume', 'created_at'])#reordenacao de colunas
          
          df['year_month'] = pd.to_datetime(df['year_month'], format="%Y-%b") + MonthEnd(1)#conversao de tipos
          all_data = all_data.append(df,ignore_index=True)
    return(all_data) #retorna o dataframe com todos os dados



In [11]:
#Leitura dos arquivos

#Vendas de combustiveis por UF e produto
df_vd_comb_uf_prod = creatDF(path_folder_tb1, path_files_tb1, list_sheet_names)

#Vendas de Diesel por UF e tipo
df_vd_diesel_uf_tipo = creatDF(path_folder_tb2, path_files_tb2, list_sheet_names)


In [12]:
#Vendas de combustiveis por UF e produto
display(df_vd_comb_uf_prod)

Unnamed: 0,year_month,uf,product,unit,volume,created_at
0,2016-09-30,ACRE,ETANOL HIDRATADO (m3),m3,710.0,2020-09-08 06:37:14.780999
1,2016-10-31,ACRE,ETANOL HIDRATADO (m3),m3,917.0,2020-09-08 06:37:14.780999
2,2016-11-30,ACRE,ETANOL HIDRATADO (m3),m3,639.8,2020-09-08 06:37:14.780999
3,2016-05-31,ACRE,ETANOL HIDRATADO (m3),m3,473.0,2020-09-08 06:37:14.780999
4,2016-03-31,ACRE,ETANOL HIDRATADO (m3),m3,620.5,2020-09-08 06:37:14.780999
...,...,...,...,...,...,...
2587,2016-01-31,TOCANTINS,QUEROSENE ILUMINANTE (m3),m3,0.0,2020-09-08 06:37:41.312626
2588,2016-02-29,TOCANTINS,QUEROSENE ILUMINANTE (m3),m3,0.0,2020-09-08 06:37:41.312626
2589,2016-12-31,TOCANTINS,QUEROSENE ILUMINANTE (m3),m3,0.0,2020-09-08 06:37:41.312626
2590,2016-08-31,TOCANTINS,QUEROSENE ILUMINANTE (m3),m3,0.0,2020-09-08 06:37:41.312626


In [17]:
print(len(df_vd_comb_uf_prod['uf'].unique())) # 27 valor correto
print(len(df_vd_comb_uf_prod['product'].unique())) #8 valor correto



27
8


In [18]:
#Vendas de Diesel por UF e tipo
display(df_vd_diesel_uf_tipo)

Unnamed: 0,year_month,uf,product,unit,volume,created_at
0,2016-09-30,ACRE,ÓLEO DIESEL (OUTROS ) (m3),OUTROS,0.0,2020-09-08 06:37:41.437822
1,2016-10-31,ACRE,ÓLEO DIESEL (OUTROS ) (m3),OUTROS,0.0,2020-09-08 06:37:41.437822
2,2016-11-30,ACRE,ÓLEO DIESEL (OUTROS ) (m3),OUTROS,0.0,2020-09-08 06:37:41.437822
3,2016-05-31,ACRE,ÓLEO DIESEL (OUTROS ) (m3),OUTROS,0.0,2020-09-08 06:37:41.437822
4,2016-03-31,ACRE,ÓLEO DIESEL (OUTROS ) (m3),OUTROS,0.0,2020-09-08 06:37:41.437822
...,...,...,...,...,...,...
1615,2016-01-31,TOCANTINS,ÓLEO DIESEL S-1800 (m3),m3,0.0,2020-09-08 06:37:51.298825
1616,2016-02-29,TOCANTINS,ÓLEO DIESEL S-1800 (m3),m3,0.0,2020-09-08 06:37:51.298825
1617,2016-12-31,TOCANTINS,ÓLEO DIESEL S-1800 (m3),m3,0.0,2020-09-08 06:37:51.298825
1618,2016-08-31,TOCANTINS,ÓLEO DIESEL S-1800 (m3),m3,0.0,2020-09-08 06:37:51.298825


In [19]:
print(len(df_vd_diesel_uf_tipo['uf'].unique())) # 27 valor correto
print(len(df_vd_diesel_uf_tipo['product'].unique())) # 5 valor correto

27
5


In [20]:
df_vd_diesel_uf_tipo.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1620 entries, 0 to 1619
Data columns (total 6 columns):
 #   Column      Non-Null Count  Dtype         
---  ------      --------------  -----         
 0   year_month  1620 non-null   datetime64[ns]
 1   uf          1620 non-null   object        
 2   product     1620 non-null   object        
 3   unit        1620 non-null   object        
 4   volume      1620 non-null   float64       
 5   created_at  1620 non-null   datetime64[ns]
dtypes: datetime64[ns](2), float64(1), object(3)
memory usage: 76.1+ KB


In [21]:
df_vd_comb_uf_prod.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2592 entries, 0 to 2591
Data columns (total 6 columns):
 #   Column      Non-Null Count  Dtype         
---  ------      --------------  -----         
 0   year_month  2592 non-null   datetime64[ns]
 1   uf          2592 non-null   object        
 2   product     2592 non-null   object        
 3   unit        2592 non-null   object        
 4   volume      2592 non-null   float64       
 5   created_at  2592 non-null   datetime64[ns]
dtypes: datetime64[ns](2), float64(1), object(3)
memory usage: 121.6+ KB


In [22]:
df_data_files = pd.DataFrame()
#uniao dos dataframes/dados
df_data_files = df_data_files.append(df_vd_comb_uf_prod, ignore_index=True)
df_data_files = df_data_files.append(df_vd_diesel_uf_tipo,ignore_index=True)

In [23]:
display(df_data_files)

Unnamed: 0,year_month,uf,product,unit,volume,created_at
0,2016-09-30,ACRE,ETANOL HIDRATADO (m3),m3,710.0,2020-09-08 06:37:14.780999
1,2016-10-31,ACRE,ETANOL HIDRATADO (m3),m3,917.0,2020-09-08 06:37:14.780999
2,2016-11-30,ACRE,ETANOL HIDRATADO (m3),m3,639.8,2020-09-08 06:37:14.780999
3,2016-05-31,ACRE,ETANOL HIDRATADO (m3),m3,473.0,2020-09-08 06:37:14.780999
4,2016-03-31,ACRE,ETANOL HIDRATADO (m3),m3,620.5,2020-09-08 06:37:14.780999
...,...,...,...,...,...,...
4207,2016-01-31,TOCANTINS,ÓLEO DIESEL S-1800 (m3),m3,0.0,2020-09-08 06:37:51.298825
4208,2016-02-29,TOCANTINS,ÓLEO DIESEL S-1800 (m3),m3,0.0,2020-09-08 06:37:51.298825
4209,2016-12-31,TOCANTINS,ÓLEO DIESEL S-1800 (m3),m3,0.0,2020-09-08 06:37:51.298825
4210,2016-08-31,TOCANTINS,ÓLEO DIESEL S-1800 (m3),m3,0.0,2020-09-08 06:37:51.298825


In [26]:
len(df_data_files['product'].unique()) #13 valor correto

13

Parte II

In [46]:

#Configuracao do Spark no Google Colab


# Run below commands in google colab
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark3.0.0
!wget -q https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
# unzip it
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
# install findspark 
!pip install -q findspark

#Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

import findspark
findspark.init()


# Check the pyspark version
import pyspark
print(pyspark.__version__)


3.0.1


In [51]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('ETL Pipeline').master("local[*]").getOrCreate()

In [52]:
#definicao do schema e tipos de dados a serem lidos
schema = StructType([StructField('year_month',DateType(),True),StructField('uf',StringType(),True),StructField('product',StringType(),True),StructField('unit',StringType(),True),
    StructField('volume',DoubleType(),True),StructField('created_at',TimestampType(),True)])

In [53]:
df_spark = spark.createDataFrame(df_data_files, schema= schema)

In [54]:
display(df_spark)

DataFrame[year_month: date, uf: string, product: string, unit: string, volume: double, created_at: timestamp]

In [57]:
df_spark.printSchema()

root
 |-- year_month: date (nullable = true)
 |-- uf: string (nullable = true)
 |-- product: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- volume: double (nullable = true)
 |-- created_at: timestamp (nullable = true)



In [56]:
df_spark.show(n=10)

+----------+----+--------------------+----+------+--------------------+
|year_month|  uf|             product|unit|volume|          created_at|
+----------+----+--------------------+----+------+--------------------+
|2016-09-30|ACRE|ETANOL HIDRATADO ...|  m3| 710.0|2020-09-08 06:37:...|
|2016-10-31|ACRE|ETANOL HIDRATADO ...|  m3| 917.0|2020-09-08 06:37:...|
|2016-11-30|ACRE|ETANOL HIDRATADO ...|  m3| 639.8|2020-09-08 06:37:...|
|2016-05-31|ACRE|ETANOL HIDRATADO ...|  m3| 473.0|2020-09-08 06:37:...|
|2016-03-31|ACRE|ETANOL HIDRATADO ...|  m3| 620.5|2020-09-08 06:37:...|
|2016-06-30|ACRE|ETANOL HIDRATADO ...|  m3| 470.0|2020-09-08 06:37:...|
|2016-07-31|ACRE|ETANOL HIDRATADO ...|  m3| 462.5|2020-09-08 06:37:...|
|2016-01-31|ACRE|ETANOL HIDRATADO ...|  m3| 799.0|2020-09-08 06:37:...|
|2016-02-29|ACRE|ETANOL HIDRATADO ...|  m3| 673.5|2020-09-08 06:37:...|
|2016-12-31|ACRE|ETANOL HIDRATADO ...|  m3| 676.8|2020-09-08 06:37:...|
+----------+----+--------------------+----+------+--------------

In [65]:
print('Número de linhas no Dataframe Pandas: {0}'.format(len(df_data_files.index)))
print('Número de linhas no Dataframe Spark: {0}'.format(df_spark.count()))

Número de linhas no Dataframe Pandas: 4212
Número de linhas no Dataframe Spark: 4212


In [72]:
df = df_spark

df.write.partitionBy('product')\
  .mode('append')\
  .csv('content/dados', header=True)