<a href="https://colab.research.google.com/github/yfaleiro/dadosPix_Spark/blob/main/PipelinePIX_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##Setup

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=dd47867ed17287afc3b866001b16989f7818e471c271b99ece8868cdaed4d193
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
# Iniciando sessão spark
from pyspark.sql import SparkSession

spark = (SparkSession.builder.config('spark.ui.port', '4050').appName('PipelinePix').getOrCreate())


In [24]:
df = spark.read.json(path='./dados_transacoes.json')

##Limpeza dos dados

In [25]:
df.show()

+-------------+---------+--------------------+------+------------+--------------------+-------------------+------------------+
|    categoria|chave_pix|        destinatario|fraude|id_transacao|           remetente|   transaction_date|             valor|
+-------------+---------+--------------------+------+------------+--------------------+-------------------+------------------+
|       outros|aleatoria|{Caixa, Calebe Me...|     0|        1000|{BTG, Jonathan Go...|2021-07-16 05:00:55|            588.08|
|transferencia|  celular|{Caixa, Davi Luca...|     1|        1001|{BTG, Jonathan Go...|2022-04-20 12:34:01|           80682.5|
|        lazer|      cpf|{Nubank, Sabrina ...|     0|        1002|{BTG, Jonathan Go...|2022-07-10 16:51:34|             549.9|
|   transporte|aleatoria|{Nubank, Francisc...|     0|        1003|{BTG, Jonathan Go...|2022-10-20 10:57:36|             90.83|
|transferencia|    email|{BTG, Isabelly Fe...|     0|        1004|{BTG, Jonathan Go...|2021-04-06 20:26:51|1327

In [26]:
df.printSchema()

root
 |-- categoria: string (nullable = true)
 |-- chave_pix: string (nullable = true)
 |-- destinatario: struct (nullable = true)
 |    |-- banco: string (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- tipo: string (nullable = true)
 |-- fraude: long (nullable = true)
 |-- id_transacao: long (nullable = true)
 |-- remetente: struct (nullable = true)
 |    |-- banco: string (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- tipo: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- valor: double (nullable = true)



In [27]:
# Como a base de dados se origina das transações feitas por uma unica conta de
# uma única pessoa, podemos checar se realmente a coluna 'remetente' tem todos
# os valores idênticos

df.select('remetente').distinct().show()

+--------------------+
|           remetente|
+--------------------+
|{BTG, Jonathan Go...|
+--------------------+



In [28]:
# Dropando coluna de remetente que não agrega na análise
df = df.drop('remetente')

Conforme schema acima, o dado de data de transação está em formato de string, o que deve ser corrigido. Além disso, dados do destinatário estão em formato de estrutura, o que também deve ser ajustado, para que cada coluna dentro dessa estrutura passe a integrar a tabela como qualquer outra coluna.

In [29]:
from pyspark.sql.functions import to_timestamp, col
df_aberta = df.withColumns({'destinatario_banco': col('destinatario').getField('banco'),
                            'destinatario_nome': col('destinatario').getField('nome'),
                            'destinatario_tipo': col('destinatario').getField('tipo')}).drop('destinatario')
df_aberta.show()

+-------------+---------+------+------------+-------------------+------------------+------------------+--------------------+-----------------+
|    categoria|chave_pix|fraude|id_transacao|   transaction_date|             valor|destinatario_banco|   destinatario_nome|destinatario_tipo|
+-------------+---------+------+------------+-------------------+------------------+------------------+--------------------+-----------------+
|       outros|aleatoria|     0|        1000|2021-07-16 05:00:55|            588.08|             Caixa|         Calebe Melo|               PF|
|transferencia|  celular|     1|        1001|2022-04-20 12:34:01|           80682.5|             Caixa|  Davi Lucas Pereira|               PJ|
|        lazer|      cpf|     0|        1002|2022-07-10 16:51:34|             549.9|            Nubank|      Sabrina Castro|               PF|
|   transporte|aleatoria|     0|        1003|2022-10-20 10:57:36|             90.83|            Nubank|Francisco da Conc...|               PJ|

In [30]:
df_aberta = df_aberta.withColumn('data_transacao', to_timestamp('transaction_date', "yyyy-MM-dd HH:mm:ss")).drop('transaction_date')

In [31]:
from pyspark.sql.functions import date_format

df_aberta = df_aberta.withColumn('ano_mes', date_format(col('data_transacao'), 'yyyy-MM'))

In [32]:
df_aberta.printSchema()
df_aberta.show()

root
 |-- categoria: string (nullable = true)
 |-- chave_pix: string (nullable = true)
 |-- fraude: long (nullable = true)
 |-- id_transacao: long (nullable = true)
 |-- valor: double (nullable = true)
 |-- destinatario_banco: string (nullable = true)
 |-- destinatario_nome: string (nullable = true)
 |-- destinatario_tipo: string (nullable = true)
 |-- data_transacao: timestamp (nullable = true)
 |-- ano_mes: string (nullable = true)

+-------------+---------+------+------------+------------------+------------------+--------------------+-----------------+-------------------+-------+
|    categoria|chave_pix|fraude|id_transacao|             valor|destinatario_banco|   destinatario_nome|destinatario_tipo|     data_transacao|ano_mes|
+-------------+---------+------+------------+------------------+------------------+--------------------+-----------------+-------------------+-------+
|       outros|aleatoria|     0|        1000|            588.08|             Caixa|         Calebe Melo|    

##Análise exploratória de dados

In [33]:
df_aberta.select('valor').describe().show()

+-------+------------------+
|summary|             valor|
+-------+------------------+
|  count|            100000|
|   mean|10303.358732200059|
| stddev| 20874.99768875586|
|    min|               0.0|
|    max|          89996.33|
+-------+------------------+



In [34]:
# Contagem das variáveis categóricas
df_aberta.groupby('chave_pix').count().orderBy('count', ascending=0).show()
df_aberta.groupby('destinatario_banco').count().orderBy('count', ascending=0).show()
df_aberta.groupby('destinatario_tipo').count().orderBy('count', ascending=0).show()
df_aberta.groupby('fraude').count().orderBy('count', ascending=0).show()
df_aberta.groupby('categoria').count().orderBy('count', ascending=0).show()

+---------+-----+
|chave_pix|count|
+---------+-----+
|    email|25213|
|      cpf|25114|
|  celular|24863|
|aleatoria|24810|
+---------+-----+

+------------------+-----+
|destinatario_banco|count|
+------------------+-----+
|                XP|14401|
|               BTG|14390|
|            Nubank|14297|
|              Itau|14281|
|             Caixa|14240|
|                C6|14204|
|          Bradesco|14187|
+------------------+-----+

+-----------------+-----+
|destinatario_tipo|count|
+-----------------+-----+
|               PJ|50244|
|               PF|49756|
+-----------------+-----+

+------+-----+
|fraude|count|
+------+-----+
|     0|84633|
|     1|15367|
+------+-----+

+-------------+-----+
|    categoria|count|
+-------------+-----+
|transferencia|24744|
|  alimentacao| 9548|
|    vestuario| 9503|
|        saude| 9476|
|        lazer| 9464|
|     educacao| 9460|
|       outros| 9377|
|    presentes| 9254|
|   transporte| 9174|
+-------------+-----+



In [35]:
# Média de valores das variáveis categóricas
from pyspark.sql.types import DecimalType

df_aberta.groupBy('chave_pix').avg('valor').select('chave_pix', col('avg(valor)').cast(DecimalType(38,2)).alias('media_valor')).show()
df_aberta.groupBy('destinatario_banco').avg('valor').select('destinatario_banco', col('avg(valor)').cast(DecimalType(38,2)).alias('media_valor')).show()
df_aberta.groupBy('destinatario_tipo').avg('valor').select('destinatario_tipo', col('avg(valor)').cast(DecimalType(38,2)).alias('media_valor')).show()
df_aberta.groupBy('fraude').avg('valor').select('fraude', col('avg(valor)').cast(DecimalType(38,2)).alias('media_valor')).show()
df_aberta.groupBy('categoria').avg('valor').select('categoria', col('avg(valor)').cast(DecimalType(38,2)).alias('media_valor')).show()

+---------+-----------+
|chave_pix|media_valor|
+---------+-----------+
|aleatoria|   10374.64|
|  celular|   10191.64|
|    email|   10235.63|
|      cpf|   10411.54|
+---------+-----------+

+------------------+-----------+
|destinatario_banco|media_valor|
+------------------+-----------+
|            Nubank|   10316.48|
|                C6|   10309.50|
|               BTG|   10122.30|
|                XP|   10328.07|
|             Caixa|   10254.86|
|          Bradesco|   10564.19|
|              Itau|   10230.88|
+------------------+-----------+

+-----------------+-----------+
|destinatario_tipo|media_valor|
+-----------------+-----------+
|               PF|   10314.21|
|               PJ|   10292.61|
+-----------------+-----------+

+------+-----------+
|fraude|media_valor|
+------+-----------+
|     0|    2230.07|
|     1|   54766.61|
+------+-----------+

+-------------+-----------+
|    categoria|media_valor|
+-------------+-----------+
|        saude|    2198.56|
|        la

In [36]:
from pyspark.sql.functions import count, avg, round

df_aberta.groupBy('ano_mes').agg(
  count('*').alias('contagem'),
  round(avg('valor'), 2).alias('media_valor')).orderBy('ano_mes', ascending=1).show(25)

+-------+--------+-----------+
|ano_mes|contagem|media_valor|
+-------+--------+-----------+
|2021-01|    2415|    9961.15|
|2021-02|    3794|   10254.57|
|2021-03|    4179|   10226.79|
|2021-04|    4061|   10174.01|
|2021-05|    4303|   10009.76|
|2021-06|    4238|   10185.07|
|2021-07|    4159|   10171.93|
|2021-08|    4250|   11072.61|
|2021-09|    4096|   10280.05|
|2021-10|    4234|    11022.0|
|2021-11|    4050|   10227.24|
|2021-12|    4232|   10480.79|
|2022-01|    4239|   10507.26|
|2022-02|    3855|    10123.1|
|2022-03|    4283|   10558.32|
|2022-04|    4108|   10355.51|
|2022-05|    4241|   10297.86|
|2022-06|    4091|   10172.86|
|2022-07|    4336|   10040.67|
|2022-08|    4228|   10797.37|
|2022-09|    4130|   10462.07|
|2022-10|    4201|   10280.49|
|2022-11|    4098|    9824.62|
|2022-12|    4220|    9742.44|
|2023-01|    1959|   10002.37|
+-------+--------+-----------+



In [71]:
df_aberta.groupBy('fraude').min('valor').show()
df_aberta.groupBy('fraude').max('valor').show()

+------+----------+
|fraude|min(valor)|
+------+----------+
|     0|       0.0|
|     1|  19999.98|
+------+----------+

+------+----------+
|fraude|max(valor)|
+------+----------+
|     0|  19994.39|
|     1|  89996.33|
+------+----------+



##Preparação dos dados

Para preparar os dados para a modelagem, vamos dropar algumas colunas e indexar as colunas categóricas

In [37]:
df_aberta.show()

+-------------+---------+------+------------+------------------+------------------+--------------------+-----------------+-------------------+-------+
|    categoria|chave_pix|fraude|id_transacao|             valor|destinatario_banco|   destinatario_nome|destinatario_tipo|     data_transacao|ano_mes|
+-------------+---------+------+------------+------------------+------------------+--------------------+-----------------+-------------------+-------+
|       outros|aleatoria|     0|        1000|            588.08|             Caixa|         Calebe Melo|               PF|2021-07-16 05:00:55|2021-07|
|transferencia|  celular|     1|        1001|           80682.5|             Caixa|  Davi Lucas Pereira|               PJ|2022-04-20 12:34:01|2022-04|
|        lazer|      cpf|     0|        1002|             549.9|            Nubank|      Sabrina Castro|               PF|2022-07-10 16:51:34|2022-07|
|   transporte|aleatoria|     0|        1003|             90.83|            Nubank|Francisco d

In [38]:
df_modelo = df_aberta.drop('id_transacao', 'ano_mes')
df_modelo.show()

+-------------+---------+------+------------------+------------------+--------------------+-----------------+-------------------+
|    categoria|chave_pix|fraude|             valor|destinatario_banco|   destinatario_nome|destinatario_tipo|     data_transacao|
+-------------+---------+------+------------------+------------------+--------------------+-----------------+-------------------+
|       outros|aleatoria|     0|            588.08|             Caixa|         Calebe Melo|               PF|2021-07-16 05:00:55|
|transferencia|  celular|     1|           80682.5|             Caixa|  Davi Lucas Pereira|               PJ|2022-04-20 12:34:01|
|        lazer|      cpf|     0|             549.9|            Nubank|      Sabrina Castro|               PF|2022-07-10 16:51:34|
|   transporte|aleatoria|     0|             90.83|            Nubank|Francisco da Conc...|               PJ|2022-10-20 10:57:36|
|transferencia|    email|     0|13272.619999999999|               BTG|   Isabelly Ferreira

In [39]:
from pyspark.ml.feature import StringIndexer

indexador = StringIndexer(
    inputCols=['destinatario_nome',
               'destinatario_banco',
               'destinatario_tipo',
               'categoria',
               'chave_pix'],
    outputCols=['idx_destinatario_nome',
                'idx_destinatario_banco',
                'idx_destinatario_tipo',
                'idx_categoria',
                'idx_chave_pix']
)

In [47]:
df_index = indexador.fit(df_modelo).transform(df_modelo)
df_index = df_index.drop('categoria', 'chave_pix', 'destinatario_banco', 'destinatario_nome', 'destinatario_tipo')
df_index.show()

+------+------------------+-------------------+---------------------+----------------------+---------------------+-------------+-------------+
|fraude|             valor|     data_transacao|idx_destinatario_nome|idx_destinatario_banco|idx_destinatario_tipo|idx_categoria|idx_chave_pix|
+------+------------------+-------------------+---------------------+----------------------+---------------------+-------------+-------------+
|     0|            588.08|2021-07-16 05:00:55|              12045.0|                   4.0|                  1.0|          6.0|          3.0|
|     1|           80682.5|2022-04-20 12:34:01|                259.0|                   4.0|                  0.0|          0.0|          2.0|
|     0|             549.9|2022-07-10 16:51:34|                132.0|                   2.0|                  1.0|          4.0|          1.0|
|     0|             90.83|2022-10-20 10:57:36|              10475.0|                   2.0|                  0.0|          8.0|          3.0|

##Modelling

Para criar um modelo de regressão logística que irá detectar transações pix fraudulentas na conta do cliente, é necessário que vetorizar todas as features categoricas da tabela em uma unica coluna

In [54]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

In [52]:
assembler = VectorAssembler(
    inputCols = [x for x in df_index.columns if x not in ['data_transacao', 'fraude']],
    outputCol = 'features'
)

lr = LogisticRegression(featuresCol='features', labelCol='fraude')

df_vector = assembler.transform(df_index)

In [55]:
treino, teste = df_vector.randomSplit([0.8, 0.2], seed=123)

In [56]:
modelo = lr.fit(treino)

In [63]:
#Montando métricas do modelo

from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator

predicao = modelo.transform(teste)

# AUC-ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="fraude")
auc = evaluator.evaluate(predicao)

# Accuracy, Precision, e Recall
multi_evaluator = MulticlassClassificationEvaluator(labelCol="fraude", predictionCol="prediction")
accuracy = multi_evaluator.evaluate(predicao, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predicao, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predicao, {multi_evaluator.metricName: "weightedRecall"})

print('Métricas do modelo')
print(f"AUC-ROC: {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

Métricas do modelo
AUC-ROC: 1.0000
Accuracy: 0.9999
Precision: 0.9999
Recall: 0.9999


In [62]:
#Checando o quanto o modelo errou
predicao.crosstab('fraude', 'prediction').show()

+-----------------+-----+----+
|fraude_prediction|  0.0| 1.0|
+-----------------+-----+----+
|                0|16893|   1|
|                1|    0|3070|
+-----------------+-----+----+



##Conclusões

Após a criação de um modelo de identificação de transações fraudulentas e análise exploratória dos dados, chegamos as seguintes conclusões:

1. A categoria com o maior número de transferências foi a de "Transferência Bancária"
2. O banco mais utilizado foi o banco XP, mas com uma margem pequena em relação aos outros bancos.
3. É possível supor que esse cliente usa essa conta pessoa física para fazer movimentações de natureza PJ, devido ao número e valor mensal das transações realizadas.
4. Apenas pela análise dos dados, foi possível identificar que todas as transações fraudulentas tiveram valores maiores que R$ 19.999,00; o que é maior que qualquer transação não fraudulenta realizada pelo cliente.
5. Uma sugestão de ação para reduzir o número de tentativas de transações fradulentas seria diminuir o valor máximo de transferência em PIX.

OBS: Devido a natureza fictícia da base, construída para esse tipo de exercício, o modelo desempenhou de forma quase perfeita, provavelmente pela distribuição dos dados, o que está bem evidenciado no ponto 4 acima, onde o valor é um indicador muito forte de fraude.