<a href="https://colab.research.google.com/github/mlteixei/notebooks/blob/main/projeto_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introdução
---

### Apache Spark
Apache Spark é uma plataforma de computação em *cluster* que fornece uma API para programação distribuída para processamento de dados em larga escala, semelhante ao modelo *MapReduce*, mas projetada para ser mais rápida .

O Spark permite processamento distribuído de tarefas em clusters com vários nós. A divisão dos dados torna mais fácil o trabalho com conjuntos de dados muito grandes porque cada nó processa apenas uma parte parte do volume total de dados.

### PySpark
PySpark é uma interface para Apache Spark em Python. O PySpark oferece suporte à maioria dos recursos do Spark, como Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) e Spark Core.

### Utilizando o Spark no Google Colab

Para utilizar o Google Colab e para configurar o PySpark utilizamos os comandos abaixo.

In [None]:
# instalando as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
# criando variáveis de ambiente para o JAVA e SPARK
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
# biblioteca para localizar o caminho do SPARK
import findspark
findspark.init()

# Carregamento dos dados
---

### Baixando o dataset do Kaggle
Vamos baixar o dataset diretamente do Kaggle e fazer a leitura dos dados

In [None]:
# baixando o dataset do Kaggle
!pip install opendatasets
!pip install pandas
import opendatasets as od
import pandas
# {"username":"matheusltr","key":"e4c5cddd081452edd92831397f77e91e"}  
od.download("https://www.kaggle.com/datasets/ealaxi/paysim1")

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting opendatasets
  Downloading opendatasets-0.1.22-py3-none-any.whl (15 kB)
Installing collected packages: opendatasets
Successfully installed opendatasets-0.1.22
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Please provide your Kaggle credentials to download this dataset. Learn more: http://bit.ly/kaggle-creds
Your Kaggle username: matheusltr
Your Kaggle Key: ··········
Downloading paysim1.zip to ./paysim1


100%|██████████| 178M/178M [00:01<00:00, 122MB/s]





### SparkSession
Para criar DataFrames, executar consultas SQL, armazenar em cache e ler arquivos utilizamos uma SparkSession, por meio dos seguintes comandos:

In [None]:
# criando sessão SPARK
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .getOrCreate()

In [None]:
spark

### Leitura dos dados

In [None]:
# leitura do dataset baixado 
base = spark.read.csv('/content/paysim1/PS_20174392719_1491204439457_log.csv', header=True)

In [None]:
# verificando a quantidade de registros
base.count()

6362620

In [None]:
# visualizando a base de dados
base.limit(5).toPandas()

Unnamed: 0,step,type,amount,nameOrig,oldbalanceOrg,newbalanceOrig,nameDest,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud
0,1,PAYMENT,9839.64,C1231006815,170136.0,160296.36,M1979787155,0.0,0.0,0,0
1,1,PAYMENT,1864.28,C1666544295,21249.0,19384.72,M2044282225,0.0,0.0,0,0
2,1,TRANSFER,181.0,C1305486145,181.0,0.0,C553264065,0.0,0.0,1,0
3,1,CASH_OUT,181.0,C840083671,181.0,0.0,C38997010,21182.0,0.0,1,0
4,1,PAYMENT,11668.14,C2048537720,41554.0,29885.86,M1230701703,0.0,0.0,0,0


# Pré-processamento dos dados
---


### Analisando os tipos dos dados

In [None]:
# verificando os tipos de dados dos atributos
base.printSchema()

root
 |-- step: string (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: string (nullable = true)
 |-- newbalanceOrig: string (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: string (nullable = true)
 |-- newbalanceDest: string (nullable = true)
 |-- isFraud: string (nullable = true)
 |-- isFlaggedFraud: string (nullable = true)



### Modificando os tipos de dados

In [None]:
# importando bibliotecas para conversão de tipos de dados
from pyspark.sql.types import IntegerType, DoubleType

In [None]:
# convertendo os tipos de dados para numéricos
base = base\
  .withColumn('step', base['step'].cast(IntegerType()))\
  .withColumn('amount', base['amount'].cast(DoubleType()))\
  .withColumn('oldbalanceOrg', base['oldbalanceOrg'].cast(DoubleType()))\
  .withColumn('newbalanceOrig', base['newbalanceOrig'].cast(DoubleType()))\
  .withColumn('oldbalanceDest', base['oldbalanceDest'].cast(DoubleType()))\
  .withColumn('newbalanceDest', base['newbalanceDest'].cast(DoubleType()))\
  .withColumn('isFraud', base['isFraud'].cast(IntegerType()))\
  .withColumn('isFlaggedFraud', base['isFlaggedFraud'].cast(IntegerType()))

In [None]:
# visualizando os tipos de dados após conversão
base.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [None]:
# visualizando base de dados
base.show()

+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT|  9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT|  1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|    181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|    181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT| 11668.14|C2048537720|      41554.0|      29885.86|M123070170

### Verificando transações marcadas como fraudes

In [None]:
# quantidade de transações como fraude
base.where('isFraud==1').count()

8213

In [None]:
# percentual de transações que são fraudes
(base.where('isFraud==1').count() / base.count())*100

0.12908204481801522

## Transformação dos dados


In [None]:
# importando biblioteca para consultas na base
from pyspark.sql import functions as f
base.groupBy('nameOrig').pivot('type').agg(f.lit(1)).na.fill(0).show()

+-----------+-------+--------+-----+-------+--------+
|   nameOrig|CASH_IN|CASH_OUT|DEBIT|PAYMENT|TRANSFER|
+-----------+-------+--------+-----+-------+--------+
|C1298557761|      0|       0|    0|      1|       0|
| C842031308|      0|       1|    0|      0|       0|
|C1398324588|      1|       0|    0|      0|       0|
| C971397568|      0|       0|    0|      1|       0|
| C539524487|      0|       1|    0|      0|       0|
| C314630213|      1|       0|    0|      0|       0|
|C1418400884|      0|       0|    0|      1|       0|
| C500869003|      0|       1|    0|      0|       0|
|C1427037170|      0|       1|    0|      0|       0|
|C1085818726|      0|       0|    0|      0|       1|
|C1658939450|      1|       0|    0|      0|       0|
| C187908769|      0|       0|    0|      1|       0|
| C481022227|      0|       0|    0|      1|       0|
|C1927868514|      0|       1|    0|      0|       0|
| C625817784|      0|       0|    0|      1|       0|
|C1476003646|      0|       

In [None]:
# transformação de dados multicategóricos em atributos
base_processada = base.groupBy('nameOrig').pivot('type').agg(f.lit(1)).na.fill(0)

In [None]:
# junção de base transformada com a base original
base_processada = base.join(base_processada, 'nameOrig', how='inner')

In [None]:
# visualizando base de dados após transformação de dados multicategóricos em atributos
base_processada.show()

+-----------+----+--------+---------+-------------+--------------+-----------+--------------+--------------+-----+--------------+-------+--------+-----+-------+--------+
|   nameOrig|step|    type|   amount|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|label|isFlaggedFraud|CASH_IN|CASH_OUT|DEBIT|PAYMENT|TRANSFER|
+-----------+----+--------+---------+-------------+--------------+-----------+--------------+--------------+-----+--------------+-------+--------+-----+-------+--------+
|C1000031397| 259|CASH_OUT|163976.41|          0.0|           0.0|C1030970900|    2281442.13|    2445418.54|    0|             0|      0|       1|    0|      0|       0|
|C1000050727| 354|CASH_OUT| 33110.41|          0.0|           0.0|C1183531360|     370166.97|     403277.38|    0|             0|      0|       1|    0|      0|       0|
|C1000138136| 166|TRANSFER|361405.04|          0.0|           0.0|C1794058350|     846606.36|     1208011.4|    0|             0|      0|       0|    

In [None]:
# visualizando tipos de dados após junção
base_processada.printSchema()

root
 |-- nameOrig: string (nullable = true)
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)
 |-- CASH_IN: integer (nullable = true)
 |-- CASH_OUT: integer (nullable = true)
 |-- DEBIT: integer (nullable = true)
 |-- PAYMENT: integer (nullable = true)
 |-- TRANSFER: integer (nullable = true)



## Seleção dos atributos
Vamos escolher quais atributos serão utilizados pelo modelo preditivo criando um vetor com os dados de interesse


In [None]:
# importanto biblioteca para criar vetor com os atributos de intesse
from pyspark.ml.feature import VectorAssembler

In [None]:
# especificando atributo de interesse 
base_processada = base_processada.withColumnRenamed('isFraud', 'label')
base_processada.show()

+-----------+----+--------+---------+-------------+--------------+-----------+--------------+--------------+-----+--------------+-------+--------+-----+-------+--------+
|   nameOrig|step|    type|   amount|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|label|isFlaggedFraud|CASH_IN|CASH_OUT|DEBIT|PAYMENT|TRANSFER|
+-----------+----+--------+---------+-------------+--------------+-----------+--------------+--------------+-----+--------------+-------+--------+-----+-------+--------+
|C1000031397| 259|CASH_OUT|163976.41|          0.0|           0.0|C1030970900|    2281442.13|    2445418.54|    0|             0|      0|       1|    0|      0|       0|
|C1000050727| 354|CASH_OUT| 33110.41|          0.0|           0.0|C1183531360|     370166.97|     403277.38|    0|             0|      0|       1|    0|      0|       0|
|C1000138136| 166|TRANSFER|361405.04|          0.0|           0.0|C1794058350|     846606.36|     1208011.4|    0|             0|      0|       0|    

In [None]:
# criando vetor com atributos de interesse
X = [
    'step',
    'amount',
     'oldbalanceOrg',
     'newbalanceOrig',
     'oldbalanceDest',
     'newbalanceDest',
     'isFlaggedFraud',
     'CASH_IN',
     'CASH_OUT',
     'DEBIT',
     'PAYMENT',
     'TRANSFER'
]

In [None]:
assembler = VectorAssembler(inputCols = X, outputCol= 'features')

In [None]:
base_prep = assembler.transform(base_processada).select('features', 'label')

# Modelagem
---


## Regressão linear
Vamos utilizar a biblioteca de regressão linear para criação do modelo preditivo

In [None]:
# importando biblioteca para regressão linear
from pyspark.ml.regression import LinearRegression

### Divisão em treino e teste

In [None]:
# divindo os dados em treino e teste
treino, teste = base_prep.randomSplit([0.7, 0.3])

In [None]:
# quantidade de dados de treino
treino.count()

4453616

In [None]:
# quantidade de dados de teste
teste.count()

1909004

In [None]:
lr = LinearRegression()

### Ajuste do modelo

In [None]:
# ajustando o modelo com os dados de treino
modelo_lr = lr.fit(treino)

In [None]:
previsoes_lr_treino = modelo_lr.transform(treino)

In [None]:
# visualizando as previsões do modelo
previsoes_lr_treino.show()

+--------------------+-----+--------------------+
|            features|label|          prediction|
+--------------------+-----+--------------------+
|(12,[0,1,2,3,4,7]...|    0| 0.01054989926520365|
|(12,[0,1,2,3,4,7]...|    0|0.016155999954652715|
|(12,[0,1,2,3,4,7]...|    0|0.015396075372486925|
|(12,[0,1,2,3,4,7]...|    0|8.732547056276956E-4|
|(12,[0,1,2,3,4,7]...|    0|1.045830946912948...|
|(12,[0,1,2,3,4,7]...|    0|6.036457589161069E-4|
|(12,[0,1,2,3,4,7]...|    0|-0.01387746686487...|
|(12,[0,1,2,3,4,7]...|    0|0.010239443055382207|
|(12,[0,1,2,3,4,7]...|    0|-0.00499557676461...|
|(12,[0,1,2,3,4,7]...|    0|-0.02060984695768...|
|(12,[0,1,2,3,4,7]...|    0|-0.03511721580321611|
|(12,[0,1,2,3,4,7]...|    0|0.013970108089142712|
|(12,[0,1,2,3,4,7]...|    0|-0.04890715961478442|
|(12,[0,1,2,3,4,7]...|    0|0.012866943648059764|
|(12,[0,1,2,3,4,7]...|    0|0.011274234117390338|
|(12,[0,1,2,3,4,7]...|    0| 0.01004587468401701|
|(12,[0,1,2,3,4,7]...|    0|-0.01287301225958...|


## Árvore de Decisão

In [None]:
# importando biblioteca para árvore de decisão
from pyspark.ml.regression import DecisionTreeRegressor

In [None]:
dtr = DecisionTreeRegressor(seed=191, maxDepth=7)

### Ajuste do modelo

In [None]:
# ajustando o modelo com os dados de treino
odelo_dtr = dtr.fit(treino)

In [None]:
previsoes_dtr_treino = modelo_dtr.transform(treino)

In [None]:
# visualizando as previsões do modelo
previsoes_dtr_treino.show()

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|(12,[0,1,2,3,4,7]...|    0|       0.0|
|(12,[0,1,2,3,4,7]...|    0|       0.0|
|(12,[0,1,2,3,4,7]...|    0|       0.0|
|(12,[0,1,2,3,4,7]...|    0|       0.0|
|(12,[0,1,2,3,4,7]...|    0|       0.0|
|(12,[0,1,2,3,4,7]...|    0|       0.0|
|(12,[0,1,2,3,4,7]...|    0|       0.0|
|(12,[0,1,2,3,4,7]...|    0|       0.0|
|(12,[0,1,2,3,4,7]...|    0|       0.0|
|(12,[0,1,2,3,4,7]...|    0|       0.0|
|(12,[0,1,2,3,4,7]...|    0|       0.0|
|(12,[0,1,2,3,4,7]...|    0|       0.0|
|(12,[0,1,2,3,4,7]...|    0|       0.0|
|(12,[0,1,2,3,4,7]...|    0|       0.0|
|(12,[0,1,2,3,4,7]...|    0|       0.0|
|(12,[0,1,2,3,4,7]...|    0|       0.0|
|(12,[0,1,2,3,4,7]...|    0|       0.0|
|(12,[0,1,2,3,4,7]...|    0|       0.0|
|(12,[0,1,2,3,4,7]...|    0|       0.0|
|(12,[0,1,2,3,4,7]...|    0|       0.0|
+--------------------+-----+----------+
only showing top 20 rows



# Avaliação dos modelos
---
Para avaliar o desempenho do modelo preditivo usamos as métricas R2 e RMSE


### Regressão Linear

In [None]:
resumo_treino = modelo_lr.summary

In [None]:
# avaliando métrica R2 dos dados de treino
resumo_treino.r2

0.18695576782043977

In [None]:
# avaliando métrica RMSE dos dados de treino
resumo_treino.rootMeanSquaredError

0.032490582273677814

In [None]:
resumo_teste = modelo_lr.evaluate(teste)

In [None]:
# avaliando métrica R2 dos dados de teste
resumo_teste.r2

0.17787193769689302

In [None]:
# avaliando métrica RMSE dos dados de teste
resumo_teste.rootMeanSquaredError

0.032282505622328254

##Arvore de decisão

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
evaluator = RegressionEvaluator()

In [None]:
# avaliando métrica R2 dos dados de treino
print(evaluator.evaluate(previsoes_dtr_treino, {evaluator.metricName: "r2"}))

0.7027698595588623


In [None]:
# avaliando métrica RMSE dos dados de treino
print(evaluator.evaluate(previsoes_dtr_treino, {evaluator.metricName: "rmse"}))

0.019644764698096014


In [None]:
previsoes_dtr_teste = modelo_dtr.transform(teste)

In [None]:
# avaliando métrica R2 dos dados de teste
print("R²: %f" % evaluator.evaluate(previsoes_dtr_treino, {evaluator.metricName: "r2"}))

R²: 0.702770


In [None]:
# avaliando métrica RMSE dos dados de teste
print("RMSE: %f" % evaluator.evaluate(previsoes_dtr_teste, {evaluator.metricName: "rmse"}))

RMSE: 0.019784


### Comparativo de modelos

In [None]:
print('Regressão Linear')
print("Dados de Treino")
print("R²: %f" % resumo_treino.r2)
print("RMSE: %f" % resumo_treino.rootMeanSquaredError)
print("Dados de Teste")
print("R²: %f" % resumo_teste.r2)
print("RMSE: %f" % resumo_teste.rootMeanSquaredError)
print("")
print('Árvore de Decisão')
print("Dados de Treino")
print("R²: %f" % evaluator.evaluate(previsoes_dtr_treino, {evaluator.metricName: "r2"}))
print("RMSE: %f" % evaluator.evaluate(previsoes_dtr_treino, {evaluator.metricName: "rmse"}))
print("Dados de Teste")
print("R²: %f" % evaluator.evaluate(previsoes_dtr_teste, {evaluator.metricName: "r2"}))
print("RMSE: %f" % evaluator.evaluate(previsoes_dtr_teste, {evaluator.metricName: "rmse"}))



Regressão Linear
Dados de Treino
R²: 0.186956
RMSE: 0.032491
Dados de Teste
R²: 0.177872
RMSE: 0.032283

Árvore de Decisão
Dados de Treino
R²: 0.702770
RMSE: 0.019645
Dados de Teste
R²: 0.691222
RMSE: 0.019784
