In [None]:
# Necessário possuir a biblioteca DuckDB, holidays e babel instaladas.
!pip install duckdb
!pip install babel
!pip install holidays
!pip install gdown

In [1]:
import sys

# Getting Github files if this notebook is executed in a Google Colab environment.
if 'google.colab' in sys.modules:
  temp_folder = "etl_bank"

  !git clone -b 'dev' 'https://github.com/jpclarindo/etl_bank.git' $temp_folder
  !rsync -a $temp_folder/ .
  !rm -rf $temp_folder


Important! Google Drive Link for data source is required to execute this notebook. Please, put the link in gdrive_link.txt at the root.

In [2]:
import pandas as pd
import duckdb
import src.utils as utils

# Libraries to create date dimension
from datetime import datetime, timedelta
from babel.dates import format_date, format_datetime, format_time
import holidays

# Auxiliar libraries
import time
import json
import os, glob

In [25]:
# Init DuckDB connection.
con = duckdb.connect(database='database.duckdb', read_only=False)

# Creating cleaning function mapping for each attribute
cleaning_mapping = json.load(open('var/function_mapping.json','r',encoding='utf-8'))

# Creating date mapping
date_mapping = json.load(open('var/date_mapping.json','r',encoding='utf-8'))

# Creating relationships mapping
relationships = json.load(open('var/relationship_mapping.json','r',encoding='utf-8'))

Creating time and date dimension tables.

In [4]:
# @title
def create_time_dimension_table():
  time_list = []
  time_of_day = ['Early Morning', 'Morning', 'Afternoon', 'Night']
  time_of_day_pt = ['Madrugada', 'Manhã', 'Tarde', 'Noite']

  for i in range(86400):
    min_sec = i % 3600

    time_dict = {'tempo_id': i+1,
                'hora': i // 3600,
                'minuto': min_sec // 60,
                'segundo': min_sec % 60
                }

    time_dict['periodo_en'] = time_of_day[time_dict['hora'] // 6]
    time_dict['periodo_pt'] = time_of_day_pt[time_dict['hora'] // 6]
    time_dict['formatado'] = f"{time_dict['hora']:02d}:{time_dict['minuto']:02d}:{time_dict['segundo']:02d}"

    time_list.append(time_dict)


  time_df = pd.DataFrame(time_list)
  con.execute("""CREATE TABLE IF NOT EXISTS d_tempo
                 AS SELECT * FROM time_df""")

  print('Time dimension created and inserted in the database')
  return True

In [5]:
# @title
def create_date_dimension_table():
  date_list = []
  start_date =  datetime(1950, 1, 1)
  end_date = datetime(2050, 12, 31)
  delta = timedelta(days=1)
  count = 0

  while start_date <= end_date:
    count += 1

    date_dict = {'data_id': count,
                'formatado': start_date.strftime('%Y-%m-%d'),
                'ano': start_date.year,
                'trimestre': format_date(start_date,'Q'),
                'nome_trimestre_en': format_date(start_date, format='QQQQ', locale='en_US'),
                'nome_trimestre_pt': format_date(start_date, format='QQQQ', locale='pt_BR'),
                'mes': start_date.month,
                'nome_mes_en': format_date(start_date, format='MMMM', locale='en_US'),
                'nome_mes_pt': format_date(start_date, format='MMMM', locale='pt_BR'),
                'dia': start_date.day,
                'nome_dia_en': format_date(start_date, format='EEEE', locale='en_US'),
                'nome_dia_pt': format_date(start_date, format='EEEE', locale='pt_BR'),
                'fim_de_semana': 1 if start_date.weekday() in [5, 6] else 0,
                'feriado': 1 if start_date.strftime('%Y-%m-%d') in holidays.Brazil() else 0
    }

    print(date_dict)

    start_date += delta
    date_list.append(date_dict)

  date_df = pd.DataFrame(date_list)
  con.execute("""CREATE TABLE IF NOT EXISTS d_data
                 AS SELECT * FROM date_df""")


ETL Processing - Extraction

In [6]:
def get_raw_data():
  #
  utils.download_data()

  # Adding files through a dict
  dfs_raw = {}

  file_list = glob.glob('data/*.csv')

  for csv_file in file_list:
    table_name = os.path.basename(csv_file).replace('.csv','')
    date_columns = date_mapping.get(table_name,None)

    df = pd.read_csv(csv_file, sep=',', parse_dates=date_columns, encoding='utf-8')

    # Metadata for extraction
    df['_load_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    df['_file_name'] = os.path.basename(csv_file)

    dfs_raw[table_name] = df

  return dfs_raw

dfs_raw = get_raw_data()

Downloading...
From: https://drive.google.com/uc?id=16jF2hJHVOwikmgoEKs9wARnrE_j6Y7e2
To: C:\Users\jpaul\OneDrive\Documentos\GitHub\etl_bank\data.zip
100%|█████████████████████████████████████████████████████████████████████████████| 1.24M/1.24M [00:00<00:00, 3.79MB/s]


ETL - Cleaning

In [65]:
def cleaning_attributes(df):
    df_clean = df.copy()
    current_columns = set(df_clean.columns)

    for column, function_name in cleaning_mapping.items():
        if column in current_columns:
            if hasattr(utils, function_name):
                func = getattr(utils, function_name)
                print(f"   -> Expanding column '{column}' using '{function_name}'")

                # Aplica a função que retorna dict
                function_results = df_clean[column].apply(func)

                sample = function_results.dropna().iloc[0] if not function_results.dropna().empty else None

                if isinstance(sample, dict):
                # Transforma a coluna de dicts em um novo DataFrame de colunas
                    df_expanded = pd.json_normalize(function_results)
                    df_clean = pd.concat([df_clean, df_expanded], axis=1)
                    df_clean = df_clean.loc[:, ~df_clean.columns.duplicated(keep='last')]
                else:
                    df_clean[column] = function_results
                    
                #df_clean.drop(columns=[column], inplace=True)
            else:
                print(f"Error")       

    return df_clean

In [87]:
dfs_clean = {}

for table_name, df_raw in dfs_raw.items():
    print(f'Cleaning {table_name} attributes')
    dfs_clean[table_name] = cleaning_attributes(df_raw)

Cleaning agencias attributes
   -> Expanding column 'tipo_agencia' using 'clean_text'
   -> Expanding column 'cidade' using 'clean_text'
   -> Expanding column 'uf' using 'clean_text'
   -> Expanding column 'endereco' using 'get_address_dict'
Cleaning clientes attributes
   -> Expanding column 'primeiro_nome' using 'clean_text'
   -> Expanding column 'ultimo_nome' using 'clean_text'
   -> Expanding column 'tipo_cliente' using 'clean_text'
   -> Expanding column 'email' using 'normalize_email'
   -> Expanding column 'cep' using 'normalize_cep'
   -> Expanding column 'endereco' using 'get_address_dict'
   -> Expanding column 'data_nascimento' using 'get_age_dict'
Cleaning colaboradores attributes
   -> Expanding column 'primeiro_nome' using 'clean_text'
   -> Expanding column 'ultimo_nome' using 'clean_text'
   -> Expanding column 'email' using 'normalize_email'
   -> Expanding column 'cep' using 'normalize_cep'
   -> Expanding column 'endereco' using 'get_address_dict'
   -> Expanding c

ETL - Quality check

In [123]:
def repair_referential_integrity(dfs_dict_original, relationships_config):
    dfs_dict = dfs_dict_original.copy()
    counter = 0
    
    for rule in relationships_config:       
        source_col = rule['source_column']
        target_col = rule['target_column']
        
        df_source = dfs_dict.get(rule['source_table'])
        df_target = dfs_dict.get(rule['target_table'])
        
        # Identifying orphan keys
        source_keys = set(df_source[source_col].dropna().unique())
        target_keys = set(df_target[target_col].dropna().unique())
        orphan_keys = list(source_keys - target_keys)
        
        if not orphan_keys:
            print(f'The relationship between {rule['target_table']} and {rule['source_table']} complains integrity')
            counter += 1

            # If all relationships are consistent, return a boolean (for checking)
            if counter < len(relationships_config):
                continue
            else:
                return True
                
        print(f"Found inconsistencies ({len(orphan_keys)} rows) in '{rule['target_column']}' (Source table: {rule['source_table']})")

        # Creating a new dataframe with orphan keys
        df_orphans = pd.DataFrame({target_col: orphan_keys})
        
        for col in df_target.columns:
            if col == target_col:
                continue             

            # Get the type
            current_dtype = df_target[col].dtype
            
            # Fill the row with NI values
            if pd.api.types.is_numeric_dtype(current_dtype):
                df_orphans[col] = -1
            elif pd.api.types.is_datetime64_any_dtype(current_dtype):
                df_orphans[col] = pd.Timestamp('1900-01-01')
            else:
                df_orphans[col] = 'NI'

        dfs_dict[rule['target_table']] = pd.concat([df_target, df_orphans], ignore_index=True)

    return dfs_dict

In [120]:
dfs_dict_checked = repair_referential_integrity(dfs_clean, relationships)

The relationship between contas and transacoes complains integrity
Found inconsistencies (1 rows) in 'cod_cliente' (Source table: contas)
The relationship between agencias and contas complains integrity
The relationship between clientes and propostas_credito complains integrity
The relationship between colaboradores and propostas_credito complains integrity
The relationship between agencias and colaborador_agencia complains integrity


In [122]:
repair_referential_integrity(dfs_dict_checked, relationships)

The relationship between contas and transacoes complains integrity
The relationship between clientes and contas complains integrity
The relationship between agencias and contas complains integrity
The relationship between clientes and propostas_credito complains integrity
The relationship between colaboradores and propostas_credito complains integrity
The relationship between agencias and colaborador_agencia complains integrity


True

ETL - Loading