## 1 - Preparação do Ambiente de Desenvolvimento

<details>
<summary>Preparação do ambiente</summary>

### Preparação do ambiente

#### IDEs utilizada

 - VSCode

#### Criar ambiente virtual

- Command Pallet (ctrl + shift+ p)
- Python: Create Environment > Venv > Python Version (3.12)'

#### Ativar .venv

In VsCode terminal, alterar a política de execução de scripts para ativar o ambiente virtual.

```bash
Set-ExecutionPolicy Unrestricted -Scope Process

# ativar ambiente virtual
.\.venv\Scripts\activate

```

</details>

## 2 - Data Undesrtanting

Primeiramente, devemos entender tudo sobre a fonte dos dados
- Como o dado chega até nós?
- Qual formato virá? 
- Aonde o processamento será executado (AWS EMR, Cluster On-Premise)? 
- De quanto em quanto tempo eu preciso gerar esse relatório (mensal, diário, near-real time)?


Os dados foram compartilhados via `*.json`. Saber como os dados serão ingeridos são de vital importância para delimitar a forma como lidaremos com nosso projeto. Análises em tempo real (streaming) são diferentes de análises em lotes (bacthes). Análises pontuais como esta também adotam uma estratégia diferentes das que requerem análises periódicas.

### Data Schema

```json
{
  "id_transacao": inteiro,
  "valor": texto,
  "remetente": {
      "nome": texto,
      "banco": texto,
      "tipo": texto
  }, 
  "destinatario": {
      "nome": texto, 
      "banco":texto,
      "tipo": texto
  },        
  "categoria": texto,
  "transaction_date":texto,
  "chave_pix":texto,
  "fraude":inteiro,
}
```

## 3 - Preparação dos Dados

Agora é hora de começar a preparar os dados de acordo com as necessidades do escopo de trabalho.

In [3]:
%pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Installing backend dependencies ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m6.8 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (pyproject.toml) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=5bc88419a137357dea2effa4ec5826623d64b048159718d468a21ce1bda560ca

In [1]:
# funções para inicializar o spark

def spark_initialize_session(app_name = 'My Analysis'):
    from pyspark.sql import SparkSession
    spark = (
        SparkSession.builder
        .config('spark.ui.port', '4050')
        .appName(app_name)
        .getOrCreate()
    )

    return spark


In [6]:
import os

In [13]:
df_path = f'data{os.sep}case_final.json'
spark = spark_initialize_session()

**não usar o json formatado, isso causa lentidão e erros no algoritmo**

<details>
<summary>json schema anotations</summary>

### Data Schema

```json
{
  "id_transacao": inteiro,
  "valor": texto,
  "remetente": {
      "nome": texto,
      "banco": texto,
      "tipo": texto
  }, 
  "destinatario": {
      "nome": texto, 
      "banco":texto,
      "tipo": texto
  },        
  "categoria": texto,
  "transaction_date":texto,
  "chave_pix":texto,
  "fraude":inteiro,
}
```

</details>

In [15]:
# definição do data schema

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType


# amostra dos dados
# { "id_transacao": 100999, "valor": 7058.09, "remetente": { "nome": "Jonathan Gonsalves", "banco": "BTG", "tipo": "PF" }, "destinatario": { "nome": "Lais Nascimento", "banco": "Nubank", "tipo": "PF" }, "chave_pix": "aleatoria", "categoria": "vestuario", "transaction_date": "2022-02-25 09:31:47", "fraude": 0 }

data_schema_pix_remetente_destinatario = StructType([
    StructField('nome', StringType()),
    StructField('banco', StringType()),
    StructField('tipo', StringType())
    ])

data_schema_pix = StructType([
    StructField('id_transacao', IntegerType()),
    StructField('valor', DoubleType()),
    StructField('remetente', data_schema_pix_remetente_destinatario),   
    StructField('destinatario', data_schema_pix_remetente_destinatario),
    StructField('categoria', StringType()),
    StructField('transaction_date', StringType()),
    StructField('chave_pix', StringType(), True),
    StructField('fraude', IntegerType(), True),
    ])

In [18]:
df_path = f'data{os.sep}case_final.json'
spark = spark_initialize_session()

In [19]:
# "transaction_date": "2022-02-25 09:31:47"
df = spark.read.json(df_path, 
                     schema=data_schema_pix, 
                     timestampFormat='yyyy-MM-dd HH:mm:ss')

In [20]:
# verificar tipo dos dados
df.printSchema()

root
 |-- id_transacao: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- remetente: struct (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- banco: string (nullable = true)
 |    |-- tipo: string (nullable = true)
 |-- destinatario: struct (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- banco: string (nullable = true)
 |    |-- tipo: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- chave_pix: string (nullable = true)
 |-- fraude: integer (nullable = true)



In [None]:
# visualiar os dados no dataframe
df.show(5)

+------------+------------------+--------------------+--------------------+-------------+-------------------+---------+------+
|id_transacao|             valor|           remetente|        destinatario|    categoria|   transaction_date|chave_pix|fraude|
+------------+------------------+--------------------+--------------------+-------------+-------------------+---------+------+
|        1000|            588.08|{Jonathan Gonsalv...|{Calebe Melo, Cai...|       outros|2021-07-16 05:00:55|aleatoria|     0|
|        1001|           80682.5|{Jonathan Gonsalv...|{Davi Lucas Perei...|transferencia|2022-04-20 12:34:01|  celular|     1|
|        1002|             549.9|{Jonathan Gonsalv...|{Sabrina Castro, ...|        lazer|2022-07-10 16:51:34|      cpf|     0|
|        1003|             90.83|{Jonathan Gonsalv...|{Francisco da Con...|   transporte|2022-10-20 10:57:36|aleatoria|     0|
|        1004|13272.619999999999|{Jonathan Gonsalv...|{Isabelly Ferreir...|transferencia|2021-04-06 20:26:51|  

*precisamos remover as estruturas aninhadas que estão nas colunas remetente e destinatário*

In [21]:
from pyspark.sql.functions import col

df_flatten = df.withColumns({
    'remetente_nome': col('remetente').getField('nome'),
    'remetente_banco': col('remetente').getField('banco'),
    'remetente_tipo': col('remetente').getField('tipo'),

    'destinatario_nome': col('destinatario').getField('nome'),
    'destinatario_banco': col('destinatario').getField('banco'),
    'destinatario_tipo': col('destinatario').getField('tipo'),
    }).drop('remetente', 'destinatario')

In [None]:
print(df_flatten.printSchema())
print(df_flatten.show(5))

root
 |-- id_transacao: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- categoria: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- chave_pix: string (nullable = true)
 |-- fraude: integer (nullable = true)
 |-- remetente_nome: string (nullable = true)
 |-- remetente_banco: string (nullable = true)
 |-- remetente_tipo: string (nullable = true)
 |-- destinatario_nome: string (nullable = true)
 |-- destinatario_banco: string (nullable = true)
 |-- destinatario_tipo: string (nullable = true)

None
+------------+------------------+-------------+-------------------+---------+------+------------------+---------------+--------------+--------------------+------------------+-----------------+
|id_transacao|             valor|    categoria|   transaction_date|chave_pix|fraude|    remetente_nome|remetente_banco|remetente_tipo|   destinatario_nome|destinatario_banco|destinatario_tipo|
+------------+------------------+-------------+------------------

### Sumarização dos dados

In [22]:
df_flatten.describe().show()

24/05/27 13:59:26 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+-----------------+------------------+-----------+-------------------+---------+------------------+------------------+---------------+--------------+-----------------+------------------+-----------------+
|summary|     id_transacao|             valor|  categoria|   transaction_date|chave_pix|            fraude|    remetente_nome|remetente_banco|remetente_tipo|destinatario_nome|destinatario_banco|destinatario_tipo|
+-------+-----------------+------------------+-----------+-------------------+---------+------------------+------------------+---------------+--------------+-----------------+------------------+-----------------+
|  count|           100000|            100000|     100000|             100000|   100000|            100000|            100000|         100000|        100000|           100000|            100000|           100000|
|   mean|          50999.5|10303.358732200059|       NULL|               NULL|     NULL|           0.15367|              NULL|           NULL|      

## 4 - Modelagem

- Para qual banco esse cliente mais transfere?
- Qual é a média de transferências por período que esse cliente faz?
- Baseando-se no valor das transferências, poderia dar um aumento de crédito?
- Para o que esse cliente mais usa as transferências?
- Executar um algoritmo de machine learning que identifique possíveis transações com fraude.


In [23]:
# Para qual banco foram feitas mais transações?
df_flatten.groupBy('destinatario_banco').count().orderBy(col('count').desc()).show()




+------------------+-----+
|destinatario_banco|count|
+------------------+-----+
|                XP|14401|
|               BTG|14390|
|            Nubank|14297|
|              Itau|14281|
|             Caixa|14240|
|                C6|14204|
|          Bradesco|14187|
+------------------+-----+



                                                                                

In [24]:
# total de transações por mês e por banco

from pyspark.sql.functions import date_format

df_flatten.groupBy(
    date_format('transaction_date', 'yyyy-MM').alias('ano_mes'), 'destinatario_banco'
    ).count().orderBy(col('ano_mes').desc()).show()



+-------+------------------+-----+
|ano_mes|destinatario_banco|count|
+-------+------------------+-----+
|2023-01|              Itau|  267|
|2023-01|             Caixa|  277|
|2023-01|                XP|  277|
|2023-01|          Bradesco|  280|
|2023-01|            Nubank|  290|
|2023-01|                C6|  290|
|2023-01|               BTG|  278|
|2022-12|                XP|  615|
|2022-12|               BTG|  603|
|2022-12|                C6|  576|
|2022-12|          Bradesco|  575|
|2022-12|            Nubank|  602|
|2022-12|              Itau|  633|
|2022-12|             Caixa|  616|
|2022-11|          Bradesco|  579|
|2022-11|               BTG|  580|
|2022-11|              Itau|  614|
|2022-11|            Nubank|  620|
|2022-11|             Caixa|  543|
|2022-11|                C6|  561|
+-------+------------------+-----+
only showing top 20 rows



                                                                                

In [25]:
from pyspark.sql.functions import col, avg, format_number, count

# Valor de transação médio para cada banco
#df_flatten.groupBy('destinatario_banco').avg('valor').orderBy(col('avg(valor)').asc()).show()
average_df = df_flatten.groupBy('destinatario_banco').avg('valor')
formatted_average_df = average_df.withColumn('avg(valor)', format_number(col('avg(valor)'), 2))
formatted_average_df.show()




+------------------+----------+
|destinatario_banco|avg(valor)|
+------------------+----------+
|            Nubank| 10,316.48|
|                C6| 10,309.50|
|               BTG| 10,122.30|
|                XP| 10,328.07|
|             Caixa| 10,254.86|
|          Bradesco| 10,564.19|
|              Itau| 10,230.88|
+------------------+----------+



                                                                                

In [26]:
# Valor de transação total para cada banco
sum_df = df_flatten.groupBy('destinatario_banco').sum('valor')
formatted_sum_df = sum_df.withColumn('sum(valor)', format_number(col('sum(valor)'), 2))
formatted_sum_df.show()




+------------------+--------------+
|destinatario_banco|    sum(valor)|
+------------------+--------------+
|            Nubank|147,494,648.81|
|                C6|146,436,134.80|
|               BTG|145,659,894.17|
|                XP|148,734,558.71|
|             Caixa|146,029,263.58|
|          Bradesco|149,874,228.63|
|              Itau|146,107,144.52|
+------------------+--------------+



                                                                                

In [14]:
# total de transações por mês/banco por categoria

df_flatten.groupBy(
    date_format('transaction_date', 'yyyy-MM').alias('ano_mes'), 'destinatario_banco', 'categoria'
    ).count().orderBy(col('ano_mes').desc()).show()

+-------+------------------+-------------+-----+
|ano_mes|destinatario_banco|    categoria|count|
+-------+------------------+-------------+-----+
|2023-01|            Nubank|     educacao|   21|
|2023-01|               BTG|    presentes|   22|
|2023-01|              Itau|       outros|   27|
|2023-01|                XP|   transporte|   32|
|2023-01|               BTG|     educacao|   37|
|2023-01|            Nubank|        lazer|   29|
|2023-01|             Caixa|     educacao|   26|
|2023-01|             Caixa|        lazer|   21|
|2023-01|               BTG|        saude|   28|
|2023-01|                XP|        lazer|   26|
|2023-01|              Itau|     educacao|   30|
|2023-01|             Caixa|    presentes|   31|
|2023-01|            Nubank|    presentes|   25|
|2023-01|             Caixa|  alimentacao|   31|
|2023-01|                C6|  alimentacao|   30|
|2023-01|            Nubank|        saude|   34|
|2023-01|            Nubank|       outros|   35|
|2023-01|           

In [27]:
# total de transações por categoria/ano

df_flatten.groupBy(
    date_format('transaction_date', 'yyyy').alias('ano'), 'categoria'
    ).count().orderBy(col('ano').desc()).show()

+----+-------------+-----+
| ano|    categoria|count|
+----+-------------+-----+
|2023|        saude|  193|
|2023|    vestuario|  174|
|2023|     educacao|  202|
|2023|    presentes|  163|
|2023|   transporte|  178|
|2023|  alimentacao|  189|
|2023|        lazer|  193|
|2023|transferencia|  475|
|2023|       outros|  192|
|2022|       outros| 4702|
|2022|        saude| 4784|
|2022|        lazer| 4784|
|2022|  alimentacao| 4799|
|2022|   transporte| 4593|
|2022|transferencia|12269|
|2022|    vestuario| 4731|
|2022|     educacao| 4681|
|2022|    presentes| 4687|
|2021|  alimentacao| 4560|
|2021|    presentes| 4404|
+----+-------------+-----+
only showing top 20 rows



In [28]:
# total de transações por categoria

df_flatten.groupBy('categoria'
    ).count().orderBy(col('count').desc()).show()

+-------------+-----+
|    categoria|count|
+-------------+-----+
|transferencia|24744|
|  alimentacao| 9548|
|    vestuario| 9503|
|        saude| 9476|
|        lazer| 9464|
|     educacao| 9460|
|       outros| 9377|
|    presentes| 9254|
|   transporte| 9174|
+-------------+-----+



In [17]:
# Conta o número de transações por ano
df_flatten.groupBy(date_format(col("transaction_date"), "yyyy").alias("ano")).agg(
    count("id_transacao").alias("count")
).orderBy("ano", ascending=False).show()

+----+-----+
| ano|count|
+----+-----+
|2023| 1959|
|2022|50030|
|2021|48011|
+----+-----+



In [29]:
# valor total de transações por ano
df_flatten.groupBy(date_format(col("transaction_date"), "yyyy").alias("ano")).sum(
    "valor"
).select("ano", format_number(col("sum(valor)"), 2).alias("total")).orderBy(
    "ano", ascending=False
).show()

+----+--------------+
| ano|         total|
+----+--------------+
|2023| 19,594,633.67|
|2022|513,575,644.77|
|2021|497,165,594.78|
+----+--------------+



In [19]:
# valor médio de transações por ano
df_flatten.groupBy(date_format(col("transaction_date"), "yyyy").alias("ano")).avg(
    "valor"
).select("ano", format_number(col("avg(valor)"), 2).alias("avg")).orderBy(
    "ano", ascending=False
).show()

+----+---------+
| ano|      avg|
+----+---------+
|2023|10,002.37|
|2022|10,265.35|
|2021|10,355.24|
+----+---------+



In [30]:
# Quantidade de fraudes
df_flatten.groupBy('fraude').count().show()

+------+-----+
|fraude|count|
+------+-----+
|     1|15367|
|     0|84633|
+------+-----+



                                                                                

In [21]:
# Quantidade de fraudes por ano
df_flatten.filter(col("fraude") == 1).groupBy(
    date_format(col("transaction_date"), "yyyy").alias("ano")
).count().select("ano", format_number(col("count"), 2).alias("total")).orderBy(
    "ano", ascending=False
).show()

+----+--------+
| ano|   total|
+----+--------+
|2023|  284.00|
|2022|7,642.00|
|2021|7,441.00|
+----+--------+



In [31]:
#verificar o total de fraudes por ano e tirar a prova dos valores comparados ao método anteriror

from pyspark.sql.functions import lit

# Conta o número de fraudes por ano
df_yearly = df_flatten.filter(col("fraude") == 1).groupBy(
    date_format(col("transaction_date"), "yyyy").alias("ano")
).count().select(
    "ano", format_number(col("count"), 2).alias("total")
).orderBy(
    "ano", ascending=False
)

# Conta o número total de fraudes
df_total = df_flatten.filter(col("fraude") == 1).select(
    lit("Total").alias("ano"), format_number(count("*"), 2).alias("total")
)

# Adiciona a linha total ao DataFrame
df_result = df_yearly.union(df_total)

# Mostra o resultado
df_result.show()

[Stage 28:>                                                         (0 + 2) / 2]

+-----+---------+
|  ano|    total|
+-----+---------+
| 2023|   284.00|
| 2022| 7,642.00|
| 2021| 7,441.00|
|Total|15,367.00|
+-----+---------+



                                                                                

In [23]:
# Agrupamento por cateria e fraude
df_flatten.groupBy('fraude', 'categoria').count().orderBy('categoria', ascending =False).show()

+------+-------------+-----+
|fraude|    categoria|count|
+------+-------------+-----+
|     0|    vestuario| 9503|
|     0|   transporte| 9174|
|     1|transferencia|15367|
|     0|transferencia| 9377|
|     0|        saude| 9476|
|     0|    presentes| 9254|
|     0|       outros| 9377|
|     0|        lazer| 9464|
|     0|     educacao| 9460|
|     0|  alimentacao| 9548|
+------+-------------+-----+



In [32]:
# faixa de valores em que ocorreram fraudes

from pyspark.sql.functions import floor

df_flatten.filter(col("fraude") == 1).withColumn(
    "range", floor(col("valor") / 1000) * 1000
).groupBy("range").count().orderBy("range").show()



+-----+-----+
|range|count|
+-----+-----+
|19000|    1|
|20000|  242|
|21000|  231|
|22000|  227|
|23000|  230|
|24000|  195|
|25000|  233|
|26000|  227|
|27000|  242|
|28000|  222|
|29000|  233|
|30000|  207|
|31000|  242|
|32000|  192|
|33000|  207|
|34000|  203|
|35000|  254|
|36000|  253|
|37000|  252|
|38000|  221|
+-----+-----+
only showing top 20 rows



                                                                                

In [25]:
# Faixa máxima e mínima de valores que ocorreram fraudes

from pyspark.sql.functions import floor, max, min

df_flatten.filter(col("fraude") == 1).withColumn(
    "range", floor(col("valor") / 1000) * 1000
).select(max("range").alias('faixa_max_fraude'), min('range').alias('faixa_min_fraude')).show()

+----------------+----------------+
|faixa_max_fraude|faixa_min_fraude|
+----------------+----------------+
|           89000|           19000|
+----------------+----------------+



## 5 - Modelo de Predição de Fraudes

In [26]:
#%pip install distutils

In [51]:
from pyspark.sql.functions import col, udf, round, lit



from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression

In [34]:
df_flatten.columns

['id_transacao',
 'valor',
 'categoria',
 'transaction_date',
 'chave_pix',
 'fraude',
 'remetente_nome',
 'remetente_banco',
 'remetente_tipo',
 'destinatario_nome',
 'destinatario_banco',
 'destinatario_tipo']

In [29]:
df.columns

['id_transacao',
 'valor',
 'remetente',
 'destinatario',
 'categoria',
 'transaction_date',
 'chave_pix',
 'fraude']

In [35]:
indexer = StringIndexer(
    inputCols=[
        "destinatario_nome", 
        "destinatario_banco",
        "destinatario_tipo",
        "categoria",
        "chave_pix"
    ], 
    outputCols=[
        "destinatario_nome_index", 
        "destinatario_banco_index",
        "destinatario_tipo_index",
        "categoria_index",
        "chave_pix_index"
    ])

In [36]:
df_index = indexer.fit(df_flatten).transform(df_flatten)
df_index.show()

24/05/27 14:01:13 WARN DAGScheduler: Broadcasting large task binary with size 1278.8 KiB
[Stage 47:>                                                         (0 + 1) / 1]

+------------+------------------+-------------+-------------------+---------+------+------------------+---------------+--------------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+
|id_transacao|             valor|    categoria|   transaction_date|chave_pix|fraude|    remetente_nome|remetente_banco|remetente_tipo|   destinatario_nome|destinatario_banco|destinatario_tipo|destinatario_nome_index|destinatario_banco_index|destinatario_tipo_index|categoria_index|chave_pix_index|
+------------+------------------+-------------+-------------------+---------+------+------------------+---------------+--------------+--------------------+------------------+-----------------+-----------------------+------------------------+-----------------------+---------------+---------------+
|        1000|            588.08|       outros|2021-07-16 05:00:55|aleatoria|     0|Jonathan Gonsalves|   

                                                                                

In [37]:
# para filtros, podemos usar somente colunas numéricas e de data
cols_para_filtrar = [
  "valor",
  "transaction_date",
  "destinatario_nome_index", 
  "destinatario_banco_index",
  "destinatario_tipo_index",
  "chave_pix_index",
  "categoria_index",
  "fraude"
]

In [38]:
is_fraud = df_index.select(cols_para_filtrar).filter(col("fraude") == 1)
not_fraud = df_index.select(cols_para_filtrar).filter(col("fraude") == 0)


In [39]:
# separar amostra dos dados de fraude
not_fraud = not_fraud.sample(False, 0.1, 42)

In [40]:
df_concat = not_fraud.union(is_fraud)
df = df_concat.sort("transaction_date")
df.count()

23712

In [41]:
train, test = df.randomSplit([0.7, 0.3], seed = 123)
print("train =", train.count(), " test =", test.count())

24/05/27 14:01:40 WARN DAGScheduler: Broadcasting large task binary with size 1290.0 KiB
24/05/27 14:01:42 WARN DAGScheduler: Broadcasting large task binary with size 1302.2 KiB
24/05/27 14:01:44 WARN DAGScheduler: Broadcasting large task binary with size 1296.6 KiB
24/05/27 14:01:44 WARN DAGScheduler: Broadcasting large task binary with size 1290.0 KiB
24/05/27 14:01:46 WARN DAGScheduler: Broadcasting large task binary with size 1302.2 KiB
24/05/27 14:01:48 WARN DAGScheduler: Broadcasting large task binary with size 1296.6 KiB


train = 16504  test = 7208


                                                                                

In [42]:
is_fraud = udf(lambda fraud: 1.0 if fraud > 0 else 0.0, DoubleType())
train = train.withColumn("is_fraud", is_fraud(train.fraude))

In [38]:
# train = train.repartition(2)

In [39]:
# spark.conf.set("spark.executor.heartbeatInterval", "60s")


In [43]:
train.rdd.getNumPartitions()

24/05/27 14:02:05 WARN DAGScheduler: Broadcasting large task binary with size 1290.0 KiB
24/05/27 14:02:07 WARN DAGScheduler: Broadcasting large task binary with size 1302.2 KiB

1

In [44]:
train.write.mode("overwrite").parquet(f"data{os.sep}train.parquet")

24/05/27 14:02:25 WARN DAGScheduler: Broadcasting large task binary with size 1290.0 KiB
24/05/27 14:02:26 WARN DAGScheduler: Broadcasting large task binary with size 1302.2 KiB
24/05/27 14:02:28 WARN DAGScheduler: Broadcasting large task binary with size 2041.4 KiB
                                                                                

In [45]:
# Create the feature vectors.
assembler = VectorAssembler(
  inputCols = [x for x in train.columns if x not in ["transaction_date", "fraude", "is_fraud"]],
  outputCol = "features")

# Use Logistic Regression.
lr = LogisticRegression().setParams(
    maxIter = 100000,
    labelCol = "is_fraud",
    predictionCol = "prediction")

spark = spark.builder.config("spark.network.timeout", "600s").getOrCreate()

# Repartition the train DataFrame into 4 partitions
# train = train.repartition()

# This will train a logistic regression model on the input data and return a 
# LogisticRegressionModel object which can be used to make predictions on new data.
model = Pipeline(stages = [assembler, lr]).fit(train)

24/05/27 14:02:58 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/05/27 14:02:58 WARN DAGScheduler: Broadcasting large task binary with size 1290.0 KiB
24/05/27 14:03:00 WARN DAGScheduler: Broadcasting large task binary with size 1302.2 KiB
24/05/27 14:03:02 WARN DAGScheduler: Broadcasting large task binary with size 1290.0 KiB
24/05/27 14:03:03 WARN DAGScheduler: Broadcasting large task binary with size 1302.2 KiB
24/05/27 14:03:05 WARN DAGScheduler: Broadcasting large task binary with size 1340.8 KiB
24/05/27 14:03:07 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/05/27 14:03:07 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
24/05/27 14:03:07 WARN DAGScheduler: Broadcasting large task binary with size 1341.5 KiB
24/05/27 14:03:07 WARN DAGScheduler: Broadcasting large task binary with size 1341.5 KiB
24/05/27 14:03:08 WARN DAGScheduler: Bro

In [53]:
# Transforma os dados de teste usando o modelo
predicted = model.transform(test)

# Adiciona uma nova coluna 'is_fraud' ao DataFrame
predicted = predicted.withColumn("is_fraud", is_fraud(predicted.fraude))

# Cria uma tabela de contingência (crosstab) entre 'is_fraud' e 'prediction'
crosstab_df = predicted.crosstab("is_fraud", "prediction")

# Converte os nomes das colunas para strings
crosstab_df = crosstab_df.toDF(*(c.replace('.', '_') for c in crosstab_df.columns))

# Calcula a soma total de todas as linhas
total = crosstab_df.select(*(col(c).cast("int") for c in crosstab_df.columns)).rdd.flatMap(lambda x: x).reduce(lambda x, y: x + y)

# Adiciona novas colunas ao DataFrame para mostrar as porcentagens
for column in crosstab_df.columns[1:]:
    crosstab_df = crosstab_df.withColumn(column + '_percent', round((col(column) / lit(total)) * 100, 2))

crosstab_df.show()

24/05/27 14:25:16 WARN DAGScheduler: Broadcasting large task binary with size 1290.0 KiB
24/05/27 14:25:17 WARN DAGScheduler: Broadcasting large task binary with size 1302.2 KiB


24/05/27 14:25:18 WARN DAGScheduler: Broadcasting large task binary with size 1339.8 KiB
24/05/27 14:25:19 WARN DAGScheduler: Broadcasting large task binary with size 1330.9 KiB
24/05/27 14:25:19 WARN DAGScheduler: Broadcasting large task binary with size 1330.6 KiB
24/05/27 14:25:19 WARN DAGScheduler: Broadcasting large task binary with size 1328.2 KiB
24/05/27 14:25:19 WARN DAGScheduler: Broadcasting large task binary with size 1290.0 KiB
24/05/27 14:25:21 WARN DAGScheduler: Broadcasting large task binary with size 1302.2 KiB
24/05/27 14:25:22 WARN DAGScheduler: Broadcasting large task binary with size 1346.0 KiB
24/05/27 14:25:22 WARN DAGScheduler: Broadcasting large task binary with size 1334.1 KiB
24/05/27 14:25:22 WARN DAGScheduler: Broadcasting large task binary with size 1344.4 KiB
24/05/27 14:25:23 WARN DAGScheduler: Broadcasting large task binary with size 1290.0 KiB
24/05/27 14:25:24 WARN DAGScheduler: Broadcasting large task binary with size 1302.2 KiB
24/05/27 14:25:25 WAR

+-------------------+----+----+-----------+-----------+
|is_fraud_prediction| 0_0| 1_0|0_0_percent|1_0_percent|
+-------------------+----+----+-----------+-----------+
|                1.0|   0|4636|        0.0|      64.31|
|                0.0|2571|   1|      35.66|       0.01|
+-------------------+----+----+-----------+-----------+



Este modelo acertou quase que 100% os casos de fraude, exceto em 1 caso, marcando como falso negativo (onde não era fraude, mas ele determinou que era).

Para este caso em específico, deveremos analisar o algoritmo para verificar o motivo desta predição errada. Mas para este projeto, foi considerado aceitável esta pequena margem de erro, em comparação aos erros que ocorriam com os dados reais.

# Avaliação do Modelo
Será que seu modelo atinge todas as necessidades que foram definidas inicialmente? (e.g. pessoa em cima da bicicleta muda o resultado final)



# Deployment
Apresente o relatório com os resultados obtidos.




  