<a href="https://colab.research.google.com/github/matheusabreuleme/Transacoes_PIX/blob/main/Case_PIX.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Preparação dos dados

In [1]:
#Instalando a última versão do PySpark
!pip install pyspark


#Iniciando a sessão spark
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
      .appName("SparkSQL")
      .getOrCreate()
)


from google.colab import drive
drive.mount('/content/drive')

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=e03dbecef0d84c19deca7b3e611130ced36f99d52757fd32cb4a2f1bc19b5a88
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2
Authtoken saved to configuration file: /root/.ngrok2/ngrok.yml
Mounted at /content/drive


In [4]:
from pyspark.sql.types import *


#Criação do Schema da base de dados

schema_remetente_destinatario = StructType([
    StructField('nome', StringType()),
    StructField('banco', StringType()),
    StructField('tipo', StringType())
])

schema_base_pix = StructType([
    StructField('id_transacao', IntegerType()),
    StructField('valor', DoubleType()),
    StructField('remetente', schema_remetente_destinatario),
    StructField('destinatario', schema_remetente_destinatario),
    StructField('chave_pix', StringType()),
    StructField('categoria', StringType()),
    StructField('transaction_date', StringType()),
    StructField('fraude', IntegerType())
])

#Caminho do arquivo json a ser analsiado
caminho_json = 'drive/MyDrive/case_final.json'

#Lendo o arquivo json

df = spark.read.json(
    caminho_json,
    schema=schema_base_pix,
    timestampFormat="yyyy-MM-dd HH:mm:ss"
)

In [5]:
#Entendendo os tipos de dados
df.printSchema()

root
 |-- id_transacao: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- remetente: struct (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- banco: string (nullable = true)
 |    |-- tipo: string (nullable = true)
 |-- destinatario: struct (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- banco: string (nullable = true)
 |    |-- tipo: string (nullable = true)
 |-- chave_pix: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- fraude: integer (nullable = true)



In [6]:
#Visualizando a base
df.show()

+------------+------------------+--------------------+--------------------+---------+-------------+-------------------+------+
|id_transacao|             valor|           remetente|        destinatario|chave_pix|    categoria|   transaction_date|fraude|
+------------+------------------+--------------------+--------------------+---------+-------------+-------------------+------+
|        1000|            588.08|{Jonathan Gonsalv...|{Calebe Melo, Cai...|aleatoria|       outros|2021-07-16 05:00:55|     0|
|        1001|           80682.5|{Jonathan Gonsalv...|{Davi Lucas Perei...|  celular|transferencia|2022-04-20 12:34:01|     1|
|        1002|             549.9|{Jonathan Gonsalv...|{Sabrina Castro, ...|      cpf|        lazer|2022-07-10 16:51:34|     0|
|        1003|             90.83|{Jonathan Gonsalv...|{Francisco da Con...|aleatoria|   transporte|2022-10-20 10:57:36|     0|
|        1004|13272.619999999999|{Jonathan Gonsalv...|{Isabelly Ferreir...|    email|transferencia|2021-04-06 2

In [12]:
from pyspark.sql.functions import col

#Adequando a base para a modelagem (desmembrando as colunas remente e destinatário devido terem chaves e valor dentro)

df_flatten = df.withColumns({
    'remetente_nome': col('remetente').getField('nome'),
    'remetente_banco': col('remetente').getField('banco'),
    'remetente_tipo': col('remetente').getField('tipo'),
    'destinatario_nome': col('destinatario').getField('nome'),
    'destinatario_banco': col('destinatario').getField('banco'),
    'destinatario_tipo': col('destinatario').getField('tipo')
}).drop('remetente', 'destinatario')


In [14]:
#Visualizando a base após o flatten
df_flatten.show()

+------------+------------------+---------+-------------+-------------------+------+------------------+---------------+--------------+--------------------+------------------+-----------------+
|id_transacao|             valor|chave_pix|    categoria|   transaction_date|fraude|    remetente_nome|remetente_banco|remetente_tipo|   destinatario_nome|destinatario_banco|destinatario_tipo|
+------------+------------------+---------+-------------+-------------------+------+------------------+---------------+--------------+--------------------+------------------+-----------------+
|        1000|            588.08|aleatoria|       outros|2021-07-16 05:00:55|     0|Jonathan Gonsalves|            BTG|            PF|         Calebe Melo|             Caixa|               PF|
|        1001|           80682.5|  celular|transferencia|2022-04-20 12:34:01|     1|Jonathan Gonsalves|            BTG|            PF|  Davi Lucas Pereira|             Caixa|               PJ|
|        1002|             549.9|  

In [15]:
#Verificando estatísticas básicas da base atravésa do describe.
df_flatten.describe().show()

+-------+-----------------+------------------+---------+-----------+-------------------+------------------+------------------+---------------+--------------+-----------------+------------------+-----------------+
|summary|     id_transacao|             valor|chave_pix|  categoria|   transaction_date|            fraude|    remetente_nome|remetente_banco|remetente_tipo|destinatario_nome|destinatario_banco|destinatario_tipo|
+-------+-----------------+------------------+---------+-----------+-------------------+------------------+------------------+---------------+--------------+-----------------+------------------+-----------------+
|  count|           100000|            100000|   100000|     100000|             100000|            100000|            100000|         100000|        100000|           100000|            100000|           100000|
|   mean|          50999.5|10303.358732200059|     NULL|       NULL|               NULL|           0.15367|              NULL|           NULL|      

#Análise dos dados

In [16]:
#Criando um tempView

df_flatten.createOrReplaceTempView('pix')

In [18]:
#Elencando quais os bancos que mais receberam transações
spark.sql(
    '''
    SELECT count(*) as total_transacoes, destinatario_banco
    FROM pix
    GROUP BY destinatario_banco
    ORDER BY total_transacoes DESC
    '''
).show()

+----------------+------------------+
|total_transacoes|destinatario_banco|
+----------------+------------------+
|           14401|                XP|
|           14390|               BTG|
|           14297|            Nubank|
|           14281|              Itau|
|           14240|             Caixa|
|           14204|                C6|
|           14187|          Bradesco|
+----------------+------------------+



In [31]:
#Top 10 dias em que o cliente mais realiza transações
spark.sql(
    '''
    select count(*) as qtd_transacoes, day(transaction_date) as dia
    from pix
    group by dia
    order by qtd_transacoes desc
    '''
).show()

+--------------+---+
|qtd_transacoes|dia|
+--------------+---+
|          3415| 20|
|          3409| 12|
|          3360| 10|
|          3350| 15|
|          3346| 16|
|          3328|  9|
|          3327|  3|
|          3324|  2|
|          3312|  6|
|          3311| 22|
|          3309| 28|
|          3305| 21|
|          3299| 24|
|          3294|  8|
|          3294| 23|
|          3285| 17|
|          3279|  5|
|          3270| 11|
|          3268| 14|
|          3252| 19|
+--------------+---+
only showing top 20 rows



In [34]:
#Média dos valores de transações por mês por banco
spark.sql(
    '''
    select round(count(*)/month(transaction_date),2) as media_mensal, month(transaction_date) as mes
    from pix
    group by mes
    order by media_mensal desc
    '''
).show()

+------------+---+
|media_mensal|mes|
+------------+---+
|      8613.0|  1|
|      3824.5|  2|
|     2820.67|  3|
|     2042.25|  4|
|      1708.8|  5|
|     1388.17|  6|
|     1213.57|  7|
|     1059.75|  8|
|       914.0|  9|
|       843.5| 10|
|      740.73| 11|
|      704.33| 12|
+------------+---+



In [37]:
#Média dos valores de transações por mês por banco
spark.sql(
    '''
    select round(sum(valor),2) as total_transacao, destinatario_banco as banco
    from pix
    group by banco
    order by total_transacao desc
    '''
).show()

+---------------+--------+
|total_transacao|   banco|
+---------------+--------+
| 1.4987422863E8|Bradesco|
| 1.4873455871E8|      XP|
| 1.4749464881E8|  Nubank|
|  1.464361348E8|      C6|
| 1.4610714452E8|    Itau|
| 1.4602926358E8|   Caixa|
| 1.4565989417E8|     BTG|
+---------------+--------+



In [41]:
#Quantidade de transações por mês, por banco e por categoria
spark.sql(
    '''
    select count(*) as qtd_transacoes, month(transaction_date) as mes, destinatario_banco as banco, categoria
    from pix
    group by mes, banco, categoria
    order by mes, banco
    '''
).show()

+--------------+---+--------+-------------+
|qtd_transacoes|mes|   banco|    categoria|
+--------------+---+--------+-------------+
|           125|  1|     BTG|     educacao|
|           115|  1|     BTG|        lazer|
|           113|  1|     BTG|       outros|
|           120|  1|     BTG|        saude|
|           109|  1|     BTG|    presentes|
|           115|  1|     BTG|    vestuario|
|           118|  1|     BTG|   transporte|
|           123|  1|     BTG|  alimentacao|
|           307|  1|     BTG|transferencia|
|           117|  1|Bradesco|   transporte|
|           100|  1|Bradesco|     educacao|
|           131|  1|Bradesco|  alimentacao|
|           329|  1|Bradesco|transferencia|
|            95|  1|Bradesco|    presentes|
|           126|  1|Bradesco|        saude|
|           101|  1|Bradesco|       outros|
|           123|  1|Bradesco|    vestuario|
|           112|  1|Bradesco|        lazer|
|           100|  1|      C6|   transporte|
|           103|  1|      C6|   

In [50]:
#Total de gastos por categoria a cada ano
spark.sql(
    '''
    select round(sum(valor),2) as valor, categoria, year(transaction_date) as ano
    from pix
    group by categoria, ano
    order by ano desc
    '''
).show()

+--------------+-------------+----+
|         valor|    categoria| ano|
+--------------+-------------+----+
|     400683.64|        saude|2023|
|     501308.34|       outros|2023|
|      459528.7|    vestuario|2023|
|     392078.31|  alimentacao|2023|
|     469671.41|        lazer|2023|
|     427790.54|   transporte|2023|
|     362584.45|    presentes|2023|
| 1.614868262E7|transferencia|2023|
|     432305.66|     educacao|2023|
| 1.031158574E7|    presentes|2022|
| 1.004824507E7|        saude|2022|
| 1.069600648E7|    vestuario|2022|
| 1.036712444E7|     educacao|2022|
| 1.029574763E7|        lazer|2022|
| 1.055340829E7|   transporte|2022|
| 1.054578323E7|  alimentacao|2022|
|4.3019109879E8|transferencia|2022|
|  1.05666451E7|       outros|2022|
| 1.038466282E7|        saude|2021|
| 1.023792825E7|  alimentacao|2021|
+--------------+-------------+----+
only showing top 20 rows



In [54]:
#Quantidade de transações com fraude
spark.sql(
    '''
    select count(*) as qtd, fraude
    from pix
    group by fraude
    '''
).show()

+-----+------+
|  qtd|fraude|
+-----+------+
|15367|     1|
|84633|     0|
+-----+------+



In [64]:
#Qual categoria as fraudes mais acontecem?
spark.sql(
    '''
    select count(*) as qtd, fraude, categoria
    from pix
    group by categoria, fraude
    '''
).show()

+-----+------+-------------+
|  qtd|fraude|    categoria|
+-----+------+-------------+
| 9377|     0|       outros|
| 9460|     0|     educacao|
| 9377|     0|transferencia|
|15367|     1|transferencia|
| 9254|     0|    presentes|
| 9476|     0|        saude|
| 9464|     0|        lazer|
| 9174|     0|   transporte|
| 9503|     0|    vestuario|
| 9548|     0|  alimentacao|
+-----+------+-------------+



In [65]:
#Ranges de valores das transações fraudulentas

from pyspark.sql.functions import floor

df_flatten.filter(col('fraude') == 1).withColumn(
    'range',
    floor(col('valor')/1000)*1000
).groupBy('range').count().orderBy(col('range').desc()).show()

+-----+-----+
|range|count|
+-----+-----+
|89000|  222|
|88000|  208|
|87000|  230|
|86000|  203|
|85000|  205|
|84000|  245|
|83000|  206|
|82000|  206|
|81000|  214|
|80000|  213|
|79000|  205|
|78000|  230|
|77000|  237|
|76000|  232|
|75000|  190|
|74000|  207|
|73000|  237|
|72000|  234|
|71000|  234|
|70000|  222|
+-----+-----+
only showing top 20 rows



In [100]:
#Obtendo os valores máximo e mínimo das transações fraudulentas
spark.sql(
    '''
    select max(valor) as maximo, min(valor) as minimo
    from pix
    where fraude = 1
    '''
).show()

+--------+--------+
|  maximo|  minimo|
+--------+--------+
|89996.33|19999.98|
+--------+--------+



#Modelagem

In [90]:
from pyspark.sql.functions import udf

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression

In [71]:
#Removendo colunas não necessárias para o modelo
df = df_flatten.drop('remetente','id')

In [72]:
#Criando index para as categorias
indexer = StringIndexer(
    inputCols=['destinatario_nome', 'destinatario_banco', 'destinatario_tipo', 'categoria', 'chave_pix'],
    outputCols=['destinatario_nome_index', 'destinatario_banco_index', 'destinatario_tipo_index', 'categoria_index', 'chave_pix_index']
)

In [73]:
df_index = indexer.fit(df).transform(df)
df_index.show()

+------------+------------------+---------+-------------+-------------------+------+------------------+---------------+--------------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+
|id_transacao|             valor|chave_pix|    categoria|   transaction_date|fraude|    remetente_nome|remetente_banco|remetente_tipo|   destinatario_nome|destinatario_banco|destinatario_tipo|destinatario_nome_index|destinatario_banco_index|destinatario_tipo_index|categoria_index|chave_pix_index|
+------------+------------------+---------+-------------+-------------------+------+------------------+---------------+--------------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+
|        1000|            588.08|aleatoria|       outros|2021-07-16 05:00:55|     0|Jonathan Gonsalves|   

In [75]:
#Criando uma lista com as colunas que serão utilizadas no treino do modelo
cols_para_filtrar = [
    'valor',
    'transaction_date',
    'destinatario_nome_index',
    'destinatario_banco_index',
    'destinatario_tipo_index',
    'chave_pix_index',
    'categoria_index',
    'fraude'
]

In [79]:
#Separando o df em 2 (os com fraude e sem fraude)

eh_fraude = df_index.select(cols_para_filtrar).filter(col('fraude') == 1)
sem_fraude = df_index.select(cols_para_filtrar).filter(col('fraude') == 0)

#Tirando uma amostra de 1% de dados sem fraude
sem_fraude = sem_fraude.sample(False, 0.01, seed=42)

In [80]:
#Unindo os dados para o teste
df_concat = sem_fraude.union(eh_fraude)
df = df_concat.sort('transaction_date')
df.count()

16269

In [82]:
#Criando os dfs de treino e teste

train, test = df.randomSplit([0.75, 0.25], seed=42)
print('train = ',train.count(), 'teste =', test.count())

train =  12264 teste = 4005


In [83]:
#Criando uma coluna de fraude no df
eh_fraude = udf(lambda fraude: 1.0 if fraude > 0 else 0.0, DoubleType())
train = train.withColumn('eh_fraude', eh_fraude(train.fraude))

In [91]:
from pyspark.ml.feature import HashingTF

#Criando a feature de Vetores

assembler = VectorAssembler(
    inputCols=[x for x in train.columns if x not in ['transaction_date','fraude','eh_fraude']],
    outputCol='features'
)


#Usando o modelo de Regressão Logistica

lr = LogisticRegression().setParams(
    maxIter=10000,
    labelCol='eh_fraude',
    predictionCol='prediction'
)

#Criando a Pipeline do Modelo
model_lr = Pipeline(stages=[assembler, lr]).fit(train)

In [92]:
#Verificando a eficiência do modelo nos dados de teste
predicted_lr = model_lr.transform(test)
predicted_lr = predicted_lr.withColumn('eh_fraude', eh_fraude(predicted_lr.fraude))
predicted_lr.crosstab('eh_fraude','prediction').show()

+--------------------+---+----+
|eh_fraude_prediction|0.0| 1.0|
+--------------------+---+----+
|                 1.0|  0|3817|
|                 0.0|187|   1|
+--------------------+---+----+



In [96]:
from pyspark.ml.evaluation import RegressionEvaluator


#Avaliação do modelo

# Erro Quadrático Médio (MSE)
evaluator_mse = RegressionEvaluator(labelCol="fraude", predictionCol="prediction", metricName="mse")
mse = evaluator_mse.evaluate(predicted_lr)
print(f"Erro Quadrático Médio (MSE): {mse:.4f}")

# Erro Absoluto Médio (MAE)
evaluator_mae = RegressionEvaluator(labelCol="fraude", predictionCol="prediction", metricName="mae")
mae = evaluator_mae.evaluate(predicted_lr)
print(f"Erro Absoluto Médio (MAE): {mae:.4f}")

# R2
evaluator_r2 = RegressionEvaluator(labelCol="fraude", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predicted_lr)
print(f"R²: {r2:.4f}")

Erro Quadrático Médio (MSE): 0.0002
Erro Absoluto Médio (MAE): 0.0002
R²: 0.9944


#Conclusão

### O Cliente possui um grande volume de transações, nas quais, cerca de 15% são fraudulentas.

### Todas as transações fraudulentar se deram na categorias transferência, sendo que tratam-se de valores alto, no qual o menor valor foi de 19999,98 e o maior de 89996,33

### Notou-se que o banco que recebeu mais transações foi o XP e o que recebeu menos transações foi o Bradesco, entretanto, o banco Bradesco recebeu a maior quantidade (em valor R$) de transação

### Além disso, o modelo de Regressão logística utilizado obteve uma boa resposta com os dados de teste, no qual apresentou um Erro Quadrático Médio de 0,0002 e R2 de 0,9944