In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace
import findspark
import os
import pandas as pd
import sqlite3

findspark.init()
findspark.find()

'/home/saulo/anaconda3/envs/pyspark_env/lib/python3.7/site-packages/pyspark'

In [2]:
dir_input = os.path.join(os.getcwd(),'input')
dir_output = os.path.join(os.getcwd(),'output')
dir_config = os.path.join(os.getcwd(),'config')

In [3]:
# Create SparkSession
spark = SparkSession.builder\
        .appName("LAB")\
        .config('spark.sparkContext.setLogLevel','WARN')\
        .config('spark.sql.warehouse.dir', dir_config)\
        .getOrCreate()

2022-06-18 20:03:04 WARN  Utils:66 - Your hostname, PC resolves to a loopback address: 127.0.1.1; using 192.168.191.134 instead (on interface eth0)
2022-06-18 20:03:04 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
2022-06-18 20:03:05 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Consumer


In [4]:
dict_mensagem = {
 'cod_cli': [13, 234]
,'ag_cli': [1234, 3]
,'valor_op': [323.23, 213.22]
,'tipo_op': ['Deposito', 'Saque']
,'data_op': ['2020-06-12 21:00:00', '2020-05-23 21:00:00']
,'saldo_cli': ['1033.06','10.33']
}

df_raw_mensagem = pd.DataFrame.from_dict(dict_mensagem, orient='columns')
df_raw_mensagem


Unnamed: 0,cod_cli,ag_cli,valor_op,tipo_op,data_op,saldo_cli
0,13,1234,323.23,Deposito,2020-06-12 21:00:00,1033.06
1,234,3,213.22,Saque,2020-05-23 21:00:00,10.33


In [5]:
dict_mens_datatype = {
 'cod_cli': 'int64'
,'valor_op': 'float64'
,'data_op': 'datetime64'
,'saldo_cli': 'float64'
}
df_mensagem = df_raw_mensagem.astype(dict_mens_datatype)
df_mensagem

Unnamed: 0,cod_cli,ag_cli,valor_op,tipo_op,data_op,saldo_cli
0,13,1234,323.23,Deposito,2020-06-12 21:00:00,1033.06
1,234,3,213.22,Saque,2020-05-23 21:00:00,10.33


In [6]:
df_raw_cliente = pd.read_csv(
	os.path.join(dir_input,'dataset_cliente.csv'),
	sep=';',
	encoding='utf-8',
	header=0)

dict_cliente_datatype = {
 'cod_cli': 'int64'
,'idade': 'int64'
,'score_credito': 'int64' #  De 0 a 1000
}

df_cliente = df_raw_cliente.astype(dict_cliente_datatype)
df_cliente


Unnamed: 0,cod_cli,nome,idade,gerente_conta,conta_corrente,tipo_conta_corrente,score_credito
0,337,Robert Pennington,51,Amos Dunn,66779-1,Povão,9
1,341,Mufutau Brady,37,William Hutchinson,32382-3,Ricão,9
2,54,Cora Blackwell,62,Timon Buckner,54031-3,Ricão,11
3,225,Ava Davenport,54,Carl Clark,69826-7,Chefão,12
4,171,Elvis Benjamin,43,Cameron Diaz,28856-6,Chefão,16
...,...,...,...,...,...,...,...
495,149,Clarke Nelson,49,Mariko Bass,89389-1,Ricão,995
496,392,Darrel Rice,69,Lance Bates,68821-6,Povão,996
497,462,Judah Hatfield,64,Maisie Chang,17626-5,Ricão,997
498,150,Aileen Dyer,18,Ryan Watkins,70521-5,Ricão,999


## Enriquecimento

In [7]:
conn = sqlite3.connect(os.path.join(dir_output,'lab_database.db'))
cur = conn.cursor()

df_cliente.to_sql('tb_cliente', conn, index=False, if_exists='replace')

In [8]:
df_select_cliente = pd.read_sql(f"""
SELECT *
FROM tb_cliente
where cod_cli in {tuple(df_mensagem['cod_cli'])};
""", conn)

df_select_cliente

Unnamed: 0,cod_cli,nome,idade,gerente_conta,conta_corrente,tipo_conta_corrente,score_credito
0,13,Lane Larson,60,Reed Wilder,41431-7,Povão,39
1,234,Ashton Delaney,80,Jasmine Wagner,23617-3,Ricão,267


In [9]:
df_enriq = df_mensagem.merge(
	df_select_cliente,
	how='inner',
	left_on='cod_cli',
	right_on='cod_cli')

df_enriq

Unnamed: 0,cod_cli,ag_cli,valor_op,tipo_op,data_op,saldo_cli,nome,idade,gerente_conta,conta_corrente,tipo_conta_corrente,score_credito
0,13,1234,323.23,Deposito,2020-06-12 21:00:00,1033.06,Lane Larson,60,Reed Wilder,41431-7,Povão,39
1,234,3,213.22,Saque,2020-05-23 21:00:00,10.33,Ashton Delaney,80,Jasmine Wagner,23617-3,Ricão,267


##### Tomada de decisão

In [10]:
# Nova coluna
df_enriq['decision'] = ""

# Valor do saque maior que saldo em conta
df_decision_valor_op = df_enriq.query('tipo_op == "Saque" and valor_op > saldo_cli')[['cod_cli','decision']]
df_decision_valor_op['decision'] = 'Operação indisponivel, valor do saque maior que saldo em conta'

# Empréstimo pessoal
df_decision_emprestimo = df_enriq.query('score_credito >= 700 and tipo_conta_corrente=="Ricão"')[['cod_cli','decision']]
df_decision_emprestimo['decision'] = 'Campanha de empréstimo pessoal'

# Renegociação de dívidas
df_decision_divida = df_enriq.query('idade >= 30 or score_credito <= 400')[['cod_cli','decision']]
df_decision_divida['decision'] = 'Renegociação de dívidas disponivel'

# Agrupa decisões
df_decisions = pd.concat([df_decision_valor_op, df_decision_emprestimo, df_decision_divida])
df_agg_decisions = df_decisions.groupby(by='cod_cli').agg(lambda decis: '/'.join(decis))
df_agg_decisions.reset_index(inplace=True)

# Insere decisões
for cod in df_agg_decisions['cod_cli']:
	df_enriq.loc[df_enriq.cod_cli==cod,'decision'] = df_agg_decisions.loc[df_agg_decisions.cod_cli==cod,'decision']

df_enriq


Unnamed: 0,cod_cli,ag_cli,valor_op,tipo_op,data_op,saldo_cli,nome,idade,gerente_conta,conta_corrente,tipo_conta_corrente,score_credito,decision
0,13,1234,323.23,Deposito,2020-06-12 21:00:00,1033.06,Lane Larson,60,Reed Wilder,41431-7,Povão,39,Renegociação de dívidas disponivel
1,234,3,213.22,Saque,2020-05-23 21:00:00,10.33,Ashton Delaney,80,Jasmine Wagner,23617-3,Ricão,267,"Operação indisponivel, valor do saque maior qu..."
