# Bix Tech Challenge

João Paulo Macedo

## Lib imports

In [1]:
import polars as pl
import psycopg2
import requests
import sqlalchemy
from configparser import ConfigParser
from psycopg2 import OperationalError

## Data extraction

### Parquet

In [2]:
parquet_data = pl.read_parquet('categoria.parquet').rename({'id': 'id_categoria'})

### API

In [3]:
url = r'https://us-central1-bix-tecnologia-prd.cloudfunctions.net/api_challenge_junior/'

names_list = []
for i in range(1, 10):
    response = requests.get(f'{url}?id={i}').content.decode()
    names_list.append(response)

api_data = {'id_funcionario': range(1, 10), 'nome_funcionario': names_list}

### Postgres

In [4]:
## psycopg2 configuration


def get_db_info(name, section):
    parser = ConfigParser()
    parser.read(name)

    db_info = {}
    if parser.has_section(section):
        key_val_tuple = parser.items(section)
        for item in key_val_tuple:
            db_info[item[0]] = item[1]
    
    return db_info



In [5]:
## Postgres connection and query

name = 'db_config_files/db_info.ini'
section = 'postgres-db'
db_info = get_db_info(name, section)

conn = None
try:
    conn = psycopg2.connect(**db_info)
    cursor = conn.cursor()
    print('Successfully connected')
    conn.autocommit = True

    query = 'SELECT * FROM venda'
    cursor.execute(query)
    db_data = cursor.fetchall()

    cursor.close()

except OperationalError:
    print('Error connecting to database')

finally:
    if cursor:
        cursor.close()
    if conn:
        conn.close()
        print("Closed connection.")


Successfully connected
Closed connection.


## Data transformation

### Schemas and Data Frames creation

In [6]:

api_schema = {'id_funcionario': pl.Int64, 'nome_funcionario': pl.String}

db_schema = {'id_venda': pl.Int64, 
          'id_funcionario': pl.Int64, 
          'id_categoria': pl.Int64,
          'data_venda': pl.Date,
          'venda': pl.Int64}

funcionario_df = pl.DataFrame(api_data, schema=api_schema)
venda_df = pl.DataFrame(db_data, schema=db_schema).with_columns(pl.col('data_venda').cast(pl.Date).cast(pl.Utf8))
categoria_df = parquet_data


### Joins

In [7]:
venda_categoria_join = venda_df.join(categoria_df, on='id_categoria')
venda_funcionario_join = venda_df.join(funcionario_df, on='id_funcionario')

venda_categoria_join

id_venda,id_funcionario,id_categoria,data_venda,venda,nome_categoria
i64,i64,i64,str,i64,str
1,1,1,"""2017-10-01""",21636,"""Babywear"""
2,1,4,"""2018-05-12""",3312,"""Sportwear"""
3,1,3,"""2019-02-01""",11778,"""Womens Footwea…"
4,1,4,"""2019-03-11""",2554,"""Sportwear"""
5,1,3,"""2018-09-07""",4425,"""Womens Footwea…"
…,…,…,…,…,…
1013,9,1,"""2017-02-08""",10689,"""Babywear"""
1014,9,3,"""2018-03-07""",15056,"""Womens Footwea…"
1015,9,6,"""2019-06-03""",3101,"""Bath Clothes"""
1016,9,4,"""2020-02-03""",204,"""Sportwear"""


## Data loading

### Connecting and writing data to local Postgres database

In [9]:
connection_uri = 'postgresql://dbadmin:postgres@localhost:5432/tech_challenge_db'

categoria_df.write_database(table_name='dim_categoria', connection=connection_uri, if_table_exists='append', engine='sqlalchemy')
funcionario_df.write_database(table_name='dim_funcionario', connection=connection_uri, if_table_exists='append', engine='sqlalchemy')
venda_df.write_database(table_name='fact_venda', connection=connection_uri, if_table_exists='append', engine='sqlalchemy')


17