# ANP Data Pipeline

This script intends to create an ETL pipeline for the ANP Fuel Sales repport.

## Goals

Extracting and Structuring data from the tables below:

- Sales of oil derivative fuels by UF and product
- Sales of diesel by UF and type

## Author
### Guilherme Norberto
Profissional de "Data Engineering", Poliglota e Multidisciplinar | [Linkedin](www.linkedin.com/in/guilhermedna)

##### O futuro é agora! | [www.ofuturoeagora.com.br](www.ofuturoeagora.com.br) 

###### São Paulo, 20/07/2020

# "C'est parti !" (Let's go!)

### Importing Pandas Library

In [2]:
import pandas as pd

### Importing Excel file

#### File name

In [3]:
file_name = 'vendas-combustiveis-m3.xlsx'

#### File sheets' names

In [4]:
#file_sheets = pd.ExcelFile(file_name).sheet_names
file_sheets = ['DPCache_m3', 'DPCache_m3_2']
file_sheets

['DPCache_m3', 'DPCache_m3_2']

#### Reading Excel data

In [5]:
df_derivative_fuels_sales = pd.read_excel(file_name,sheet_name=file_sheets[0])
df_derivative_fuels_sales[:5]

Unnamed: 0,COMBUSTÍVEL,ANO,REGIÃO,ESTADO,Jan,Fev,Mar,Abr,Mai,Jun,Jul,Ago,Set,Out,Nov,Dez,TOTAL
0,GASOLINA C (m3),2000,REGIÃO NORTE,RONDÔNIA,136073.253,9563.263,11341.229,9369.746,10719.983,11165.968,12312.451,11220.97,12482.281,13591.122,11940.57,11547.576,10818.094
1,GASOLINA C (m3),2000,REGIÃO NORTE,ACRE,3358.346,40001.853,3065.758,3495.29,2946.93,3023.92,3206.93,3612.58,3264.46,3835.74,3676.571,3225.61,3289.718
2,GASOLINA C (m3),2000,REGIÃO NORTE,AMAZONAS,20766.918,21180.919,242742.352,17615.604,20258.2,18741.344,19604.023,20221.674,20792.616,19912.898,21869.338,21145.643,20633.175
3,GASOLINA C (m3),2000,REGIÃO NORTE,RORAIMA,3716.032,3200.4,3339.332,43338.929,3259.3,3636.216,3631.569,3348.416,3394.016,4078.616,3346.616,4029.9,4358.516
4,GASOLINA C (m3),2000,REGIÃO NORTE,PARÁ,29755.907,28661.951,28145.784,29294.796,359575.398,28830.479,32297.047,27310.979,29396.384,26511.009,36553.25,31807.84,31009.972


### Checking DataFrame info

In [6]:
df_derivative_fuels_sales.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4536 entries, 0 to 4535
Data columns (total 17 columns):
COMBUSTÍVEL    4536 non-null object
ANO            4536 non-null int64
REGIÃO         4536 non-null object
ESTADO         4536 non-null object
Jan            4420 non-null float64
Fev            4419 non-null float64
Mar            4420 non-null float64
Abr            4420 non-null float64
Mai            4419 non-null float64
Jun            4420 non-null float64
Jul            4419 non-null float64
Ago            4420 non-null float64
Set            4420 non-null float64
Out            4419 non-null float64
Nov            4420 non-null float64
Dez            4420 non-null float64
TOTAL          4420 non-null float64
dtypes: float64(13), int64(1), object(3)
memory usage: 602.6+ KB


### Defining columns' names lists

#### Default columns

In [7]:
default_cols = ['year_month','uf','product','unit','volume','created_at']

#### Months' names dictionary [PT - EN]

In [8]:
months_dictionary = {
    'Jan':'Jan',
    'Fev':'Feb',
    'Mar':'Mar',
    'Abr':'Apr',
    'Mai':'May',
    'Jun':'Jun',
    'Jul':'Jul',
    'Ago':'Aug',
    'Set':'Sep',
    'Out':'Oct',
    'Nov':'Nov',
    'Dez':'Dec'}

#### Months' names list

In [9]:
months = list(months_dictionary.values())

### Translating months' names [PT-EN]

In [10]:
df_derivative_fuels_sales = df_derivative_fuels_sales.rename(columns=months_dictionary)
df_derivative_fuels_sales[:5]

Unnamed: 0,COMBUSTÍVEL,ANO,REGIÃO,ESTADO,Jan,Feb,Mar,Apr,May,Jun,Jul,Aug,Sep,Oct,Nov,Dec,TOTAL
0,GASOLINA C (m3),2000,REGIÃO NORTE,RONDÔNIA,136073.253,9563.263,11341.229,9369.746,10719.983,11165.968,12312.451,11220.97,12482.281,13591.122,11940.57,11547.576,10818.094
1,GASOLINA C (m3),2000,REGIÃO NORTE,ACRE,3358.346,40001.853,3065.758,3495.29,2946.93,3023.92,3206.93,3612.58,3264.46,3835.74,3676.571,3225.61,3289.718
2,GASOLINA C (m3),2000,REGIÃO NORTE,AMAZONAS,20766.918,21180.919,242742.352,17615.604,20258.2,18741.344,19604.023,20221.674,20792.616,19912.898,21869.338,21145.643,20633.175
3,GASOLINA C (m3),2000,REGIÃO NORTE,RORAIMA,3716.032,3200.4,3339.332,43338.929,3259.3,3636.216,3631.569,3348.416,3394.016,4078.616,3346.616,4029.9,4358.516
4,GASOLINA C (m3),2000,REGIÃO NORTE,PARÁ,29755.907,28661.951,28145.784,29294.796,359575.398,28830.479,32297.047,27310.979,29396.384,26511.009,36553.25,31807.84,31009.972


### Transforming each line of DataFrame

#### Counting DataFrame lines

In [11]:
lines = len(df_derivative_fuels_sales)
lines

4536

#### Transposing DataFrame

In [12]:
tb_anp_data = 0
for line in range(0,lines):
    # Getting fuel volume data by year 
    tb2 = df_derivative_fuels_sales[line:line+1][months] \
        .transpose()\
        .reset_index()\
        .rename(columns={'index':'months',line:'volume'})
    
    # Getting uf name
    uf_name = df_derivative_fuels_sales[df_derivative_fuels_sales.columns[3:4]][line:line+1]
    
    # Joining fuel name, year and uf name
    tb1 = df_derivative_fuels_sales[df_derivative_fuels_sales.columns[0:2]][line:line+1].join(uf_name,how='left')
    
    # Replicating fuel type, year and uf name for each month
    n_columns = len(tb1.columns.to_list())
    for x in range(0,n_columns):
        tb2 = tb2.assign(**{tb1.columns[x]:tb1.iloc[0][tb1.columns[x]]})

    # Appending data for each iteration
    if type(tb_anp_data) is int:
        tb_anp_data = tb2
    else:
        tb_anp_data = tb_anp_data.append(tb2, ignore_index=False)
tb_anp_data

Unnamed: 0,months,volume,COMBUSTÍVEL,ANO,ESTADO
0,Jan,136073.253000,GASOLINA C (m3),2000,RONDÔNIA
1,Feb,9563.263000,GASOLINA C (m3),2000,RONDÔNIA
2,Mar,11341.229000,GASOLINA C (m3),2000,RONDÔNIA
3,Apr,9369.746000,GASOLINA C (m3),2000,RONDÔNIA
4,May,10719.983000,GASOLINA C (m3),2000,RONDÔNIA
...,...,...,...,...,...
7,Aug,,GLP (m3),2020,DISTRITO FEDERAL
8,Sep,,GLP (m3),2020,DISTRITO FEDERAL
9,Oct,,GLP (m3),2020,DISTRITO FEDERAL
10,Nov,,GLP (m3),2020,DISTRITO FEDERAL


#### Renaming 'ESTADO' column to uf

In [13]:
tb_anp_data = tb_anp_data.rename(columns={'ESTADO':default_cols[1]})
tb_anp_data

Unnamed: 0,months,volume,COMBUSTÍVEL,ANO,uf
0,Jan,136073.253000,GASOLINA C (m3),2000,RONDÔNIA
1,Feb,9563.263000,GASOLINA C (m3),2000,RONDÔNIA
2,Mar,11341.229000,GASOLINA C (m3),2000,RONDÔNIA
3,Apr,9369.746000,GASOLINA C (m3),2000,RONDÔNIA
4,May,10719.983000,GASOLINA C (m3),2000,RONDÔNIA
...,...,...,...,...,...
7,Aug,,GLP (m3),2020,DISTRITO FEDERAL
8,Sep,,GLP (m3),2020,DISTRITO FEDERAL
9,Oct,,GLP (m3),2020,DISTRITO FEDERAL
10,Nov,,GLP (m3),2020,DISTRITO FEDERAL


### Splitting 'COMBUSTÍVEL' column in fuel and unit columns

In [14]:
tb_anp_data[default_cols[2:4]] = tb_anp_data['COMBUSTÍVEL'].str.split(r" \(|\)",expand=True)[[0,1]]\
.rename(columns={0:default_cols[2:3][0],1:default_cols[3:4][0]})

tb_anp_data

Unnamed: 0,months,volume,COMBUSTÍVEL,ANO,uf,product,unit
0,Jan,136073.253000,GASOLINA C (m3),2000,RONDÔNIA,GASOLINA C,m3
1,Feb,9563.263000,GASOLINA C (m3),2000,RONDÔNIA,GASOLINA C,m3
2,Mar,11341.229000,GASOLINA C (m3),2000,RONDÔNIA,GASOLINA C,m3
3,Apr,9369.746000,GASOLINA C (m3),2000,RONDÔNIA,GASOLINA C,m3
4,May,10719.983000,GASOLINA C (m3),2000,RONDÔNIA,GASOLINA C,m3
...,...,...,...,...,...,...,...
7,Aug,,GLP (m3),2020,DISTRITO FEDERAL,GLP,m3
8,Sep,,GLP (m3),2020,DISTRITO FEDERAL,GLP,m3
9,Oct,,GLP (m3),2020,DISTRITO FEDERAL,GLP,m3
10,Nov,,GLP (m3),2020,DISTRITO FEDERAL,GLP,m3


### Deleting original fuel column

In [15]:
tb_anp_data = tb_anp_data.drop(columns=['COMBUSTÍVEL'])
tb_anp_data

Unnamed: 0,months,volume,ANO,uf,product,unit
0,Jan,136073.253000,2000,RONDÔNIA,GASOLINA C,m3
1,Feb,9563.263000,2000,RONDÔNIA,GASOLINA C,m3
2,Mar,11341.229000,2000,RONDÔNIA,GASOLINA C,m3
3,Apr,9369.746000,2000,RONDÔNIA,GASOLINA C,m3
4,May,10719.983000,2000,RONDÔNIA,GASOLINA C,m3
...,...,...,...,...,...,...
7,Aug,,2020,DISTRITO FEDERAL,GLP,m3
8,Sep,,2020,DISTRITO FEDERAL,GLP,m3
9,Oct,,2020,DISTRITO FEDERAL,GLP,m3
10,Nov,,2020,DISTRITO FEDERAL,GLP,m3


### Creating year_month column

#### Converting 'ANO' column to string

In [16]:
tb_anp_data['ANO'] = tb_anp_data['ANO'].astype(str)

#### Concatenating 'ANO' and 'months' columns' values

In [17]:
tb_anp_data['months'] = tb_anp_data['ANO'] + '-' + tb_anp_data['months']
tb_anp_data

Unnamed: 0,months,volume,ANO,uf,product,unit
0,2000-Jan,136073.253000,2000,RONDÔNIA,GASOLINA C,m3
1,2000-Feb,9563.263000,2000,RONDÔNIA,GASOLINA C,m3
2,2000-Mar,11341.229000,2000,RONDÔNIA,GASOLINA C,m3
3,2000-Apr,9369.746000,2000,RONDÔNIA,GASOLINA C,m3
4,2000-May,10719.983000,2000,RONDÔNIA,GASOLINA C,m3
...,...,...,...,...,...,...
7,2020-Aug,,2020,DISTRITO FEDERAL,GLP,m3
8,2020-Sep,,2020,DISTRITO FEDERAL,GLP,m3
9,2020-Oct,,2020,DISTRITO FEDERAL,GLP,m3
10,2020-Nov,,2020,DISTRITO FEDERAL,GLP,m3


### Deleting 'ANO' column and renaming 'months' column to 'year_month'

In [18]:
tb_anp_data = tb_anp_data.rename(columns={'months':default_cols[0]})\
            .drop(columns=['ANO'])
tb_anp_data

Unnamed: 0,year_month,volume,uf,product,unit
0,2000-Jan,136073.253000,RONDÔNIA,GASOLINA C,m3
1,2000-Feb,9563.263000,RONDÔNIA,GASOLINA C,m3
2,2000-Mar,11341.229000,RONDÔNIA,GASOLINA C,m3
3,2000-Apr,9369.746000,RONDÔNIA,GASOLINA C,m3
4,2000-May,10719.983000,RONDÔNIA,GASOLINA C,m3
...,...,...,...,...,...
7,2020-Aug,,DISTRITO FEDERAL,GLP,m3
8,2020-Sep,,DISTRITO FEDERAL,GLP,m3
9,2020-Oct,,DISTRITO FEDERAL,GLP,m3
10,2020-Nov,,DISTRITO FEDERAL,GLP,m3


### Ordering columns to default order

In [19]:
tb_anp_data = tb_anp_data[default_cols[:-1]]
tb_anp_data

Unnamed: 0,year_month,uf,product,unit,volume
0,2000-Jan,RONDÔNIA,GASOLINA C,m3,136073.253000
1,2000-Feb,RONDÔNIA,GASOLINA C,m3,9563.263000
2,2000-Mar,RONDÔNIA,GASOLINA C,m3,11341.229000
3,2000-Apr,RONDÔNIA,GASOLINA C,m3,9369.746000
4,2000-May,RONDÔNIA,GASOLINA C,m3,10719.983000
...,...,...,...,...,...
7,2020-Aug,DISTRITO FEDERAL,GLP,m3,
8,2020-Sep,DISTRITO FEDERAL,GLP,m3,
9,2020-Oct,DISTRITO FEDERAL,GLP,m3,
10,2020-Nov,DISTRITO FEDERAL,GLP,m3,


### Converting year_month column to YYYY-MM-DD date format (%Y-%m%-%d )

In [20]:
import datetime
from pandas.tseries.offsets import MonthEnd

In [21]:
tb_anp_data['year_month'] = pd.to_datetime(tb_anp_data['year_month'], format="%Y-%b") + MonthEnd(1)
tb_anp_data

Unnamed: 0,year_month,uf,product,unit,volume
0,2000-01-31,RONDÔNIA,GASOLINA C,m3,136073.253000
1,2000-02-29,RONDÔNIA,GASOLINA C,m3,9563.263000
2,2000-03-31,RONDÔNIA,GASOLINA C,m3,11341.229000
3,2000-04-30,RONDÔNIA,GASOLINA C,m3,9369.746000
4,2000-05-31,RONDÔNIA,GASOLINA C,m3,10719.983000
...,...,...,...,...,...
7,2020-08-31,DISTRITO FEDERAL,GLP,m3,
8,2020-09-30,DISTRITO FEDERAL,GLP,m3,
9,2020-10-31,DISTRITO FEDERAL,GLP,m3,
10,2020-11-30,DISTRITO FEDERAL,GLP,m3,


### Creating 'created_at' column

In [22]:
tb_anp_data = tb_anp_data.assign(**{default_cols[-1]:datetime.datetime.now()})
tb_anp_data

Unnamed: 0,year_month,uf,product,unit,volume,created_at
0,2000-01-31,RONDÔNIA,GASOLINA C,m3,136073.253000,2020-07-20 16:22:48.742830
1,2000-02-29,RONDÔNIA,GASOLINA C,m3,9563.263000,2020-07-20 16:22:48.742830
2,2000-03-31,RONDÔNIA,GASOLINA C,m3,11341.229000,2020-07-20 16:22:48.742830
3,2000-04-30,RONDÔNIA,GASOLINA C,m3,9369.746000,2020-07-20 16:22:48.742830
4,2000-05-31,RONDÔNIA,GASOLINA C,m3,10719.983000,2020-07-20 16:22:48.742830
...,...,...,...,...,...,...
7,2020-08-31,DISTRITO FEDERAL,GLP,m3,,2020-07-20 16:22:48.742830
8,2020-09-30,DISTRITO FEDERAL,GLP,m3,,2020-07-20 16:22:48.742830
9,2020-10-31,DISTRITO FEDERAL,GLP,m3,,2020-07-20 16:22:48.742830
10,2020-11-30,DISTRITO FEDERAL,GLP,m3,,2020-07-20 16:22:48.742830


### Checking DataFrame info

In [23]:
tb_anp_data.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 54432 entries, 0 to 11
Data columns (total 6 columns):
year_month    54432 non-null datetime64[ns]
uf            54432 non-null object
product       54432 non-null object
unit          54432 non-null object
volume        53036 non-null float64
created_at    54432 non-null datetime64[ns]
dtypes: datetime64[ns](2), float64(1), object(3)
memory usage: 2.9+ MB


# Pandas to Spark

### Importing PySpark Library and initializating a Spark Session

In [24]:
from pyspark.sql import SparkSession

appName = "ANP Fuel Sales Data"
master = "local[8]"

# Creating Spark session
spark = SparkSession.builder\
                    .appName(appName)\
                    .master(master)\
                    .getOrCreate()

### Importing Spark Data Types

In [25]:
from pyspark.sql.types import\
     StructType, StructField,\
     DateType, StringType, DoubleType, TimestampType

### Defining Schema

In [26]:
data_schema = StructType([
    StructField('year_month',DateType(),True),
    StructField('uf',StringType(),True),
    StructField('product',StringType(),True),
    StructField('unit',StringType(),True),
    StructField('volume',DoubleType(),True),
    StructField('created_at',TimestampType(),True)
])

### Creating Spark DataFrame with Pandas DataFrame

In [27]:
df_anp_data = spark.createDataFrame(tb_anp_data,schema=data_schema)

In [28]:
df_anp_data.show(5,False)

+----------+--------+----------+----+----------+-------------------------+
|year_month|uf      |product   |unit|volume    |created_at               |
+----------+--------+----------+----+----------+-------------------------+
|2000-01-31|RONDÔNIA|GASOLINA C|m3  |136073.253|2020-07-20 16:22:48.74283|
|2000-02-29|RONDÔNIA|GASOLINA C|m3  |9563.263  |2020-07-20 16:22:48.74283|
|2000-03-31|RONDÔNIA|GASOLINA C|m3  |11341.229 |2020-07-20 16:22:48.74283|
|2000-04-30|RONDÔNIA|GASOLINA C|m3  |9369.746  |2020-07-20 16:22:48.74283|
|2000-05-31|RONDÔNIA|GASOLINA C|m3  |10719.983 |2020-07-20 16:22:48.74283|
+----------+--------+----------+----+----------+-------------------------+
only showing top 5 rows



### Checking schema

In [29]:
df_anp_data.printSchema()

root
 |-- year_month: date (nullable = true)
 |-- uf: string (nullable = true)
 |-- product: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- volume: double (nullable = true)
 |-- created_at: timestamp (nullable = true)



### Conting rows

In [30]:
df_anp_data.count()

54432

### Importing Spark Date Functions

In [31]:
from pyspark.sql.functions import year, month, dayofmonth

### Defining product partition by name sheet

In [32]:
product_partition = {
                    file_sheets[0]:'oil-derivative-fuels',   
                    file_sheets[1]:'diesel'}

## Partition Strategy 1: small data

### Creating a new DataFrame

In [33]:
df = df_anp_data

### Defining repartition data strategy

In [34]:
df = df.repartition('year_month', 'uf', 'product','unit')

### Writing data to files in partitions

In [37]:
df.write.partitionBy('year_month', 'uf', 'product','unit')\
        .mode('append')\
        .csv('data/'+product_partition[file_sheets[0]], header=True) #by file_name?

## Partition Strategy 2: big data

### Creating a new DataFrame with Year, Month and Day columns and then dropping year_month column

In [38]:
df = df_anp_data.withColumn('Year', year('year_month'))\
                .withColumn('Month', month('year_month'))\
                .withColumn('Day', dayofmonth('year_month'))\
                .drop('year_month')

### Defining repartition data strategy

In [39]:
df = df.repartition('Year', 'Month', 'Day', 'uf', 'product','unit')

### Writing data to files in partitions

In [40]:
df.write.partitionBy('Year', 'Month', 'Day', 'uf', 'product','unit')\
        .mode('append')\
        .csv('data/'+product_partition[file_sheets[0]], header=True)

# "C'est fini !" (The end)