In [87]:
# Importing libraries

import pandas as pd
from datetime import datetime
import sys
import numpy as np

## Ingesting data and Extracting DataFrame

As requested the operational and sales head insert all the operations and sales inside a XLS documentation.

This document is hand-craft and may contain errors. We will iterate.

Before this Pipeline we are generated we already had made previous test in a Test Scale in the Company's SandBox.

In the previous tests we gathered that the steps to clean and organize data is:

* Add header and convert to snake_case;

* First column —> Pedido —> make it a header

* Delete columns that have more than 10% of null values

* Delete rows with all null values


In this Script we will make a ETL process Where:

- Ingestion --> From local Computer

- Extraction --> From the several Sheets on the XLS file

- Transformation --> We will work on the cleaning and organizing data for de Data Analysis team

- Load --> Load will be as one single csv file to a blob storage.


As we are creating a automated pipeline we will need to overwrite our Blob monthly.

In [88]:
# Specify the path of the Excel file
file_path = '/Users/gabrielreus/Desktop/TAZ/ML_DA/jun-2023/databases/database_sales.xls'

# Get the current year
current_year = datetime.now().year

# Create an empty list to store the DataFrames
dfs = []

# Iterate over the years
for year in range(2017, current_year + 1):
    # Convert the year to string
    year_str = str(year)
    
    # Load the sheet corresponding to the year as a DataFrame
    df = pd.read_excel(file_path, sheet_name=year_str)
    
    # Add the DataFrame to the list
    dfs.append(df)

# Analyzing the DataFrames

We created a list of dataframes from the XLS doc.

We will now start pre analyze data, to confirm its correct format. 

First we will test it out to verify if the list generated is equal the actual year dif.

In [89]:
# Verify if it worked all right and stoping if needed

try:
    if (current_year - 2017) == len(dfs) - 1:
        print('Success')
    else:
        print("An error occurred")
        sys.exit("Stopping the script.")

except Exception as e:
    print(f"An error occurred: {e}")
    sys.exit("Stopping the script.")

Success


# Transformation

We will start our steps to transform the data we have to something clean, organized and readble to a structured table

In [90]:
for i, x in enumerate(dfs):
    # Encontrar a linha que contém 'pedido' na primeira coluna
    linha_pedido = x[x.iloc[:, 0].astype(str).str.contains('pedido', case=False, na=False)].index[0]

    # Definir a linha encontrada como o cabeçalho do DataFrame
    x.columns = x.iloc[linha_pedido]

    # Converter o cabeçalho para snake_case
    x.columns = x.columns.str.lower().str.replace(' ', '_')

    # Remover a linha que foi utilizada como cabeçalho
    x = x.drop(x.index[linha_pedido])

    # Redefinir o índice do DataFrame
    x = x.reset_index(drop=True)

    # Limpar rótulos de colunas vazios ou NaN
    x.columns = x.columns.fillna('')

    # Calcular a porcentagem de valores nulos em cada coluna
    porcentagem_nulos = x.isnull().sum() / len(x) * 100

    # Obter a lista de colunas que possuem mais de 50% de valores nulos
    colunas_a_remover = porcentagem_nulos[porcentagem_nulos > 50].index

    # Verificar se há colunas a serem removidas
    if len(colunas_a_remover) > 0:
        # Remover as colunas com mais de 50% de valores nulos
        x = x.drop(columns=colunas_a_remover)

    # Definir a porcentagem mínima de valores não nulos para manter a linha
    porcentagem_minima = 80

    # Calcular a quantidade mínima de valores não nulos necessários
    quantidade_minima = len(x.columns) * (porcentagem_minima / 100)

    # Remover linhas com mais de 50% de valores nulos
    x = x.dropna(thresh=quantidade_minima)

    # Update the corresponding DataFrame in the dfs list
    dfs[i] = x



In [1]:
dfs[1].head()

NameError: name 'dfs' is not defined

## Check on the list of Dataframes generated

In [92]:
# Check if all DataFrames have the same number of columns
num_columns = len(column_names)
same_num_columns = all(len(df.columns) == num_columns for df in dfs)

# Check if number conditions are True
if same_num_columns:
    print("All DataFrames have the number of columns.")
else:
    print("DataFrames do not have the same name and number of columns.")

All DataFrames have the number of columns.


## Rename all the columns name with the first DF pattern

In [93]:
# Get the column names from the first DataFrame in the list
column_names = dfs[0].columns

# Rename the columns of all DataFrames in the list
for i, df in enumerate(dfs):
    dfs[i] = df.set_axis(column_names, axis=1)

## Join all of the DF

In [94]:
# Join DataFrames on index
merged_df = pd.concat(dfs, axis=0)

In [95]:
merged_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 20010 entries, 2 to 3157
Data columns (total 21 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   pedido       20008 non-null  object
 1   data_compra  20004 non-null  object
 2   empresa      20007 non-null  object
 3   produtos     20010 non-null  object
 4   quantidade   20008 non-null  object
 5   valor        20010 non-null  object
 6   icms         20007 non-null  object
 7   pis/cofins   20003 non-null  object
 8   ipi          20005 non-null  object
 9   custo_net    20010 non-null  object
 10  form.pgto    19953 non-null  object
 11  data_saída   20005 non-null  object
 12  empresa      20010 non-null  object
 13  valor        20010 non-null  object
 14  venda_net    20010 non-null  object
 15  rep.1_(%)    20009 non-null  object
 16  comissão     20010 non-null  object
 17  líquida      20010 non-null  object
 18  líquido_(%)  20010 non-null  object
 19  saldo_net    20010 non-nul

## Final clean of the merged_df

In [98]:
# Remove NaN values from the "data_compra" column
merged_df.dropna(subset=['data_compra'], inplace=True)

# Show
merged_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 20004 entries, 2 to 3157
Data columns (total 21 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   pedido       20004 non-null  object
 1   data_compra  20004 non-null  object
 2   empresa      20003 non-null  object
 3   produtos     20004 non-null  object
 4   quantidade   20004 non-null  object
 5   valor        20004 non-null  object
 6   icms         20001 non-null  object
 7   pis/cofins   19997 non-null  object
 8   ipi          19999 non-null  object
 9   custo_net    20004 non-null  object
 10  form.pgto    19947 non-null  object
 11  data_saída   19999 non-null  object
 12  empresa      20004 non-null  object
 13  valor        20004 non-null  object
 14  venda_net    20004 non-null  object
 15  rep.1_(%)    20003 non-null  object
 16  comissão     20004 non-null  object
 17  líquida      20004 non-null  object
 18  líquido_(%)  20004 non-null  object
 19  saldo_net    20004 non-nul

In this pipeline we will not work on the Dtype. We will leave it to the DA / DS to do it so. Due to different approaches.

# Load

We will load it in two different ways. 

1) On the root

2) Direct to our Azure SQL Server.

## 1) Direct on the root - Databricks directory

In [100]:
# generate a CSV file from the DataFrame
data_gen = merged_df.to_csv('dados.csv',index=False)

## 2) Direct to DB - Azure SQL Server

In [101]:
# Import Library
import pyodbc
from sqlalchemy import create_engine

In [102]:
# Azure SQL server details

server = 'totalingredientes.database.windows.net'
database = 'totaling_brazil'
username = 'your_username'
password = 'your_password'
table_name = 'operations-by-year'

In [None]:
# Create the connection string
conn_str = f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}'

# Establish a connection to the Azure SQL server
conn = pyodbc.connect(conn_str)

# Create a cursor object to execute SQL queries
cursor = conn.cursor()

In [None]:
# Truncate the existing table to remove all data
truncate_query = f'TRUNCATE TABLE {table_name}'
cursor.execute(truncate_query)
conn.commit()

In [None]:
# Create an SQLAlchemy engine using the connection
engine = create_engine(f'mssql+pyodbc:///?odbc_connect={conn_str}')

# Upload the DataFrame to the Azure SQL server table
merged_df.to_sql(table_name, con=engine, if_exists='replace', index=False)

# Close the cursor and connection
cursor.close()
conn.close()

print("DF data uploaded to Azure SQL server successfully!")