# Data Pipeline Total Ingredientes - Brazillian Import Trading Data.

We will save the document into our root for future analysis and generate an automatic report. We will do the follwing steps:

INGESTING --> EXTRACTION --> TRANSFORM --> LOAD

3) Install Libraries;

2) Download file from: https://balanca.economia.gov.br/balanca/bd/comexstat-bd/mun/IMP_COMPLETA_MUN.zip

3) Pre-analyze data;

4) Transform data;

5) Load data;

We will Schedule the script to run and load to AzureSQL every month.


Data comes from: https://www.gov.br/produtividade-e-comercio-exterior/pt-br/assuntos/comercio-exterior/estatisticas/base-de-dados-bruta


IMPORTANT NOTE: THIS IS AN OPEN SOURCE FILE PROVIDED BY BRAZILIAN GOVERNMENT. 

In [1]:
!pip install unicode

Collecting unicode
  Downloading unicode-2.9-py2.py3-none-any.whl (14 kB)
Installing collected packages: unicode
Successfully installed unicode-2.9
[0m

# Install libraries 

We will install the needed libraries to connect to DataSources and Work with the Data

In [2]:
#  Importing libraries
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
import pandas as pd
import os
import zipfile
import requests
import shutil
import numpy as np
import unidecode
import gzip

# Data Ingestion

We will use REST API model to connect to two endpoits file:

1) From Brazilian Government to collect most actual Import trade data.

2) From internal URL download link file creating --> it is a XLSX support file for future needs

In [4]:
# Connect point via API REST model

# Collecting data governnment trade data
URL = "https://balanca.economia.gov.br/balanca/bd/comexstat-bd/ncm/IMP_COMPLETA.zip"

# Make a GET request to the API endpoint
response = requests.get(URL, verify=False)

# Check if the request was successful (status code 200)
if response.status_code == 200:
    # Save the response content to a file
    with open("IMP_COMPLETA_MUN.zip", "wb") as file:
        file.write(response.content)
    print("File downloaded successfully!")
else:
    print(f"Error downloading the file. Status code: {response.status_code}")



File downloaded successfully!


In [None]:
# Connect point via API REST model

# Collecting aux data from internal URL
URL2 = "https://72697629-90ee-488f-b0b3-494b8ba56809.usrfiles.com/ugd/726976_88877e33311a4a54a518c05cd0705b67.xlsx"

# Make a GET request to the API endpoint
response = requests.get(URL2, verify=False)

# Check if the request was successful (status code 200)
if response.status_code == 200:
    # Open the response into a new file with the same name
    open("726976_88877e33311a4a54a518c05cd0705b67.xlsx", "wb").write(response.content)
    print("File downloaded successfully!")
else:
    print(f"Error downloading the file. Status code: {response.status_code}")

# Extraction

We will open our data using Python and Pandas Tools.

In [5]:
# Opening import database
df = pd.read_csv(f'/kaggle/working/IMP_COMPLETA_MUN.zip', sep=';')
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 38917114 entries, 0 to 38917113
Data columns (total 13 columns):
 #   Column      Dtype 
---  ------      ----- 
 0   CO_ANO      int64 
 1   CO_MES      int64 
 2   CO_NCM      int64 
 3   CO_UNID     int64 
 4   CO_PAIS     int64 
 5   SG_UF_NCM   object
 6   CO_VIA      int64 
 7   CO_URF      int64 
 8   QT_ESTAT    int64 
 9   KG_LIQUIDO  int64 
 10  VL_FOB      int64 
 11  VL_FRETE    int64 
 12  VL_SEGURO   int64 
dtypes: int64(12), object(1)
memory usage: 3.8+ GB


# Pre-Analyze Data

We are analyzing data and filtering as projects needs.

Stakeholders considered data to be important after 2019. So that is what we will filter by and take a look at our dataframe

In [6]:
# Filter for CO_ANO higher after 2018
df = df[df['CO_ANO'] >= 2019]

In [7]:
# Opening import database
df2 = pd.read_excel(f'/kaggle/working/726976_88877e33311a4a54a518c05cd0705b67.xlsx')
df2.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 13632 entries, 0 to 13631
Data columns (total 6 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   CO_NCM      13632 non-null  int64  
 1   NO_NCM_POR  13632 non-null  object 
 2   CO_PAIS     281 non-null    float64
 3   NO_PAIS     281 non-null    object 
 4   CO_MUN_GEO  5570 non-null   float64
 5   NO_MUN_MIN  5570 non-null   object 
dtypes: float64(2), int64(1), object(3)
memory usage: 639.1+ KB


# Transforming

We will transform data adding columns and indexing information. We will add, delete, join and reindex data as needed.

In [8]:
# We will add VL_FRETE and VL_FOB = Product value + shipment (per kg)
df_filtrado = df
df_filtrado['VL_CIF'] = (df_filtrado['VL_FOB'] + df_filtrado['VL_FRETE'])/(df_filtrado['KG_LIQUIDO'])
df_filtrado['VL_CIF'] = pd.to_numeric(df_filtrado['VL_CIF'], errors='coerce')
df_filtrado['VL_CIF'] = df_filtrado['VL_CIF'].round(2).astype(float)
df_filtrado['VL_CIF'] = df_filtrado['VL_CIF'].replace([np.inf, -np.inf], np.nan)
df_filtrado.dropna(subset=['VL_CIF'], inplace=True)

# We will add VL_FRETE_KG = Product value - shipment (per kg)
df_filtrado['VL_FOB_KG'] = df_filtrado['VL_FOB']/df_filtrado['KG_LIQUIDO']
df_filtrado['VL_FOB_KG'] = pd.to_numeric(df_filtrado['VL_FOB_KG'], errors='coerce')
df_filtrado['VL_FOB_KG'] = df_filtrado['VL_FOB_KG'].round(2).astype(float)
df_filtrado['VL_FOB_KG'] = df_filtrado['VL_FOB_KG'].replace([np.inf, -np.inf], np.nan)
df_filtrado.dropna(subset=['VL_FOB_KG'], inplace=True)

In [9]:
# We will create a column called CO_ANOMES that is the Year+Month value in (YEAR-MONTH) transform to dt
# Creating CO_ANOMES
df_filtrado['CO_ANO'] = df_filtrado['CO_ANO'].astype(str)
df_filtrado['CO_MES'] = df_filtrado['CO_MES'].astype(str)
df_filtrado['CO_ANOMES'] = df_filtrado['CO_ANO'] + '-' + df_filtrado['CO_MES']
df_filtrado['CO_ANOMES'] = pd.to_datetime(df_filtrado['CO_ANOMES'])
df_filtrado['CO_ANOMES'] = df_filtrado['CO_ANOMES'].dt.strftime('%Y-%m')
df_filtrado.dropna()

Unnamed: 0,CO_ANO,CO_MES,CO_NCM,CO_UNID,CO_PAIS,SG_UF_NCM,CO_VIA,CO_URF,QT_ESTAT,KG_LIQUIDO,VL_FOB,VL_FRETE,VL_SEGURO,VL_CIF,VL_FOB_KG,CO_ANOMES
30516895,2019,6,85439090,10,399,SC,4,817600,1,1,15,7,0,22.00,15.00,2019-06
30516896,2019,12,56041000,10,160,SC,1,927800,453,453,1889,81,4,4.35,4.17,2019-12
30516898,2019,7,39269090,10,247,PR,1,917800,167,167,1477,499,0,11.83,8.84,2019-07
30516899,2019,12,62021300,11,160,ES,4,817600,16,5,1603,54,2,331.40,320.60,2019-12
30516900,2019,8,85369090,11,23,RS,4,1017801,91,3,1524,159,20,561.00,508.00,2019-08
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
38917108,2023,3,85334092,11,245,AM,1,227600,28500,10,2174,7,0,218.10,217.40,2023-03
38917109,2023,2,84253990,11,23,SP,1,817800,6,232,4212,143,0,18.77,18.16,2023-02
38917110,2023,1,84821010,11,111,SP,4,817800,7,1,83,12,0,95.00,83.00,2023-01
38917112,2023,2,51052910,10,845,RS,7,1010252,9678,9678,152623,700,45,15.84,15.77,2023-02


In [10]:
# Criando dataframes de mapeamento com valores únicos
df_pais = df2[['CO_PAIS', 'NO_PAIS']].drop_duplicates(subset='CO_PAIS').set_index('CO_PAIS')
df_ncm = df2[['CO_NCM', 'NO_NCM_POR']].drop_duplicates(subset='CO_NCM').set_index('CO_NCM')

# Adicionando o nome do país
df_filtrado['NO_PAIS'] = df_filtrado['CO_PAIS'].map(df_pais['NO_PAIS']).fillna('')
df['NO_PAIS'] = df['NO_PAIS'].apply(lambda x: unidecode.unidecode(x))

# Adicionando o nome do NCM
df_filtrado['NO_NCM'] = df_filtrado['CO_NCM'].map(df_ncm['NO_NCM_POR']).fillna('')
df['NO_NCM'] = df['NO_NCM'].apply(lambda x: unidecode.unidecode(x))

In [11]:
# Info
df_filtrado.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 7462055 entries, 30516895 to 38917113
Data columns (total 18 columns):
 #   Column      Dtype  
---  ------      -----  
 0   CO_ANO      object 
 1   CO_MES      object 
 2   CO_NCM      int64  
 3   CO_UNID     int64  
 4   CO_PAIS     int64  
 5   SG_UF_NCM   object 
 6   CO_VIA      int64  
 7   CO_URF      int64  
 8   QT_ESTAT    int64  
 9   KG_LIQUIDO  int64  
 10  VL_FOB      int64  
 11  VL_FRETE    int64  
 12  VL_SEGURO   int64  
 13  VL_CIF      float64
 14  VL_FOB_KG   float64
 15  CO_ANOMES   object 
 16  NO_PAIS     object 
 17  NO_NCM      object 
dtypes: float64(2), int64(10), object(6)
memory usage: 1.1+ GB


In [12]:
# Showing 10 first lines
df_filtrado.head(10)

Unnamed: 0,CO_ANO,CO_MES,CO_NCM,CO_UNID,CO_PAIS,SG_UF_NCM,CO_VIA,CO_URF,QT_ESTAT,KG_LIQUIDO,VL_FOB,VL_FRETE,VL_SEGURO,VL_CIF,VL_FOB_KG,CO_ANOMES,NO_PAIS,NO_NCM
30516895,2019,6,85439090,10,399,SC,4,817600,1,1,15,7,0,22.0,15.0,2019-06,Japao,Partes de outras maquinas e aparelhos eletrico...
30516896,2019,12,56041000,10,160,SC,1,927800,453,453,1889,81,4,4.35,4.17,2019-12,China,"Fios e cordas, de borracha, recobertos de texteis"
30516898,2019,7,39269090,10,247,PR,1,917800,167,167,1477,499,0,11.83,8.84,2019-07,Eslovaquia,Outras obras de plasticos
30516899,2019,12,62021300,11,160,ES,4,817600,16,5,1603,54,2,331.4,320.6,2019-12,China,"Mantos, impermeaveis, capas e semelhantes, de ..."
30516900,2019,8,85369090,11,23,RS,4,1017801,91,3,1524,159,20,561.0,508.0,2019-08,Alemanha,"Outros aparelhos para interrupcao, etc, para c..."
30516901,2019,9,94054090,10,351,AL,1,717800,16675,16675,35257,2773,10,2.28,2.11,2019-09,Hong Kong,"Outros aparelhos eletricos de iluminacao, de o..."
30516902,2019,2,85176255,11,249,SP,4,817600,419,585,92512,3847,259,164.72,158.14,2019-02,Estados Unidos,Moduladores/demoduladores (modems)
30516903,2019,4,82055900,10,23,SP,1,817800,1123,1123,37235,265,13,33.39,33.16,2019-04,Alemanha,"Outras ferramentas manuais, de metais comuns, ..."
30516904,2019,10,29181500,10,361,SC,1,927800,3600,3600,9814,324,7,2.82,2.73,2019-10,India,Sais e esteres do acido citrico
30516905,2019,9,90328982,11,160,SP,1,817800,7445,488,31490,678,17,65.92,64.53,2019-09,China,Instrumentos e aparelhos automaticos para cont...


In [13]:
# Selecting columns we need
df_filtrado = df_filtrado[['CO_NCM', 'KG_LIQUIDO', 'VL_CIF', 'VL_FOB_KG', 'CO_ANOMES','NO_PAIS','NO_NCM', 'SG_UF_NCM']]

In [14]:
# generate a CSV file from the DataFrame
data_gen = df_filtrado.to_csv('dados.csv',index=False)

In [15]:
""" this is for test instance only 

# Create a gzip file and write the CSV into it
with open('dados.csv', 'rb') as f_in:
    with gzip.open('dados.csv.gz', 'wb') as f_out:
        shutil.copyfileobj(f_in, f_out)   """

# Loading it into Azure SQL Storage

We will Load our data_gen into our Azure SQL Storage.

Per purposes of security, we will not insert data in the Public script (of course).

In [18]:
# Import Library
import pyodbc
from sqlalchemy import create_engine

In [None]:
# Azure SQL server details

server = 'totalingredientes.database.windows.net'
database = 'totaling_brazil'
username = 'your_username'
password = 'your_password'
table_name = 'finalDBncmBrazil'

In [None]:
# Path to the CSV file
csv_file_path = 'dados.csv' # Our data is on the root

In [None]:
# Create the connection string
conn_str = f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}'

# Establish a connection to the Azure SQL server
conn = pyodbc.connect(conn_str)

# Create a cursor object to execute SQL queries
cursor = conn.cursor()

## Regenerating our data Monthly

As we are going to scheduale montlhy runs on the script we will regenerate our Azure SQL Table as well, to refit all data inside.

In [None]:
# Truncate the existing table to remove all data
truncate_query = f'TRUNCATE TABLE {table_name}'
cursor.execute(truncate_query)
conn.commit()

In [None]:
# Create an SQLAlchemy engine using the connection
engine = create_engine(f'mssql+pyodbc:///?odbc_connect={conn_str}')

# Upload the DataFrame to the Azure SQL server table
df_filtrado.to_sql(table_name, con=engine, if_exists='replace', index=False)

# Close the cursor and connection
cursor.close()
conn.close()

print("CSV data uploaded to Azure SQL server successfully!")