# Exemplo de Notebook Jupyter com Spark e Scala

Este notebook tem um exemplo de aplicação Spark feita em Scala no notebook jupyter com o kernel spylon através da imagem docker jupyter/all-spark-notebook. Para mais detalhes, ver o arquivo de instalação presente neste repositório.

Esta aplicação irá gerar um modelo de floresta aleatória para prever o time vencedor em uma partida de League of Legends.
Os dados são obtidos através da API oficial da Riot Games, e foram compilados no kaggle neste [link](https://www.kaggle.com/datasets/datasnaek/league-of-legends) pelo usuário [Mitchel J](https://www.kaggle.com/datasnaek).

## Importar as bibliotecas necessárias

In [1]:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.ml.tuning._
import org.apache.spark.sql.SparkSession

Intitializing Scala interpreter ...

Spark Web UI available at http://8c62f2285646:4040
SparkContext available as 'sc' (version = 3.2.1, master = local[*], app id = local-1655330486187)
SparkSession available as 'spark'


import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.ml.tuning._
import org.apache.spark.sql.SparkSession


## Definir a SparkSession

In [2]:
// Para definir um cluster externo master,
// basta trocar "local[*]" por "spark://<host>:<port>"
// com os dados de host e port do master.
val spark = SparkSession.builder.master("local[*]").getOrCreate

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@56d90cfa


## Carregar dados dos jogos

In [3]:
val gamesDF = spark.read
  .option("header", value = true)
  .option("inferSchema", value = true)
  .csv("games.csv")
  .persist()

gamesDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [gameId: bigint, creationTime: bigint ... 59 more fields]


## Analisar schema e dados no dataframe

In [4]:
gamesDF.printSchema

root
 |-- gameId: long (nullable = true)
 |-- creationTime: long (nullable = true)
 |-- gameDuration: integer (nullable = true)
 |-- seasonId: integer (nullable = true)
 |-- winner: integer (nullable = true)
 |-- firstBlood: integer (nullable = true)
 |-- firstTower: integer (nullable = true)
 |-- firstInhibitor: integer (nullable = true)
 |-- firstBaron: integer (nullable = true)
 |-- firstDragon: integer (nullable = true)
 |-- firstRiftHerald: integer (nullable = true)
 |-- t1_champ1id: integer (nullable = true)
 |-- t1_champ1_sum1: integer (nullable = true)
 |-- t1_champ1_sum2: integer (nullable = true)
 |-- t1_champ2id: integer (nullable = true)
 |-- t1_champ2_sum1: integer (nullable = true)
 |-- t1_champ2_sum2: integer (nullable = true)
 |-- t1_champ3id: integer (nullable = true)
 |-- t1_champ3_sum1: integer (nullable = true)
 |-- t1_champ3_sum2: integer (nullable = true)
 |-- t1_champ4id: integer (nullable = true)
 |-- t1_champ4_sum1: integer (nullable = true)
 |-- t1_champ4_sum2

In [5]:
gamesDF.show(5)

+----------+-------------+------------+--------+------+----------+----------+--------------+----------+-----------+---------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-------------+-----------------+-------------+--------------+------------------+-------+-------+-------+-------+-------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-------------+-----------------+-------------+--------------+------------------+-------+-------+-------+-------+-------+
|    gameId| creationTime|gameDuration|seasonId|winner|firstBlood|firstTower|firstInhibitor|firstBaron|firstDragon|firstRiftHerald|t1_champ1id|t1_champ1_sum1|t1_champ1_sum2|t1_champ2id|t1_champ2_sum1|t1_champ

## Separar em dados de treino e teste

In [6]:
val seed = 11011990
val Array(dataTrain, dataTest) = gamesDF.randomSplit(Array(0.8, 0.2), seed)

seed: Int = 11011990
dataTrain: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [gameId: bigint, creationTime: bigint ... 59 more fields]
dataTest: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [gameId: bigint, creationTime: bigint ... 59 more fields]


## Filtrar colunas relevantes ao modelo

In [7]:
val columns = gamesDF.columns.filter(
  !Array("gameId", "creationTime", "seasonId", "winner").contains(_)
)

columns: Array[String] = Array(gameDuration, firstBlood, firstTower, firstInhibitor, firstBaron, firstDragon, firstRiftHerald, t1_champ1id, t1_champ1_sum1, t1_champ1_sum2, t1_champ2id, t1_champ2_sum1, t1_champ2_sum2, t1_champ3id, t1_champ3_sum1, t1_champ3_sum2, t1_champ4id, t1_champ4_sum1, t1_champ4_sum2, t1_champ5id, t1_champ5_sum1, t1_champ5_sum2, t1_towerKills, t1_inhibitorKills, t1_baronKills, t1_dragonKills, t1_riftHeraldKills, t1_ban1, t1_ban2, t1_ban3, t1_ban4, t1_ban5, t2_champ1id, t2_champ1_sum1, t2_champ1_sum2, t2_champ2id, t2_champ2_sum1, t2_champ2_sum2, t2_champ3id, t2_champ3_sum1, t2_champ3_sum2, t2_champ4id, t2_champ4_sum1, t2_champ4_sum2, t2_champ5id, t2_champ5_sum1, t2_champ5_sum2, t2_towerKills, t2_inhibitorKills, t2_baronKills, t2_dragonKills, t2_riftHeraldKills, t2_ba...


## Gerar assembler para criar vetor de features com as colunas desejadas

In [8]:
val assembler = new VectorAssembler()
  .setInputCols(columns)
  .setOutputCol("features")

assembler: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_580d5cfb8808, handleInvalid=error, numInputCols=57


## Indexador para definir a coluna interesse

In [9]:
val indexer = new StringIndexer()
  .setInputCol("winner")
  .setOutputCol("label")

indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_819a9a0ec61e


## Definindo o classificador de Floresta Aleatória

In [10]:
val randomForestClassifier = new RandomForestClassifier()
  .setImpurity("gini")
  .setMaxDepth(3)
  .setNumTrees(20)
  .setFeatureSubsetStrategy("auto")

randomForestClassifier: org.apache.spark.ml.classification.RandomForestClassifier = rfc_e715460b59df


## Definindo o pipeline de operações

In [11]:
val stages = Array(assembler, indexer, randomForestClassifier)
val pipeline = new Pipeline().setStages(stages)

stages: Array[org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable}}] = Array(VectorAssembler: uid=vecAssembler_580d5cfb8808, handleInvalid=error, numInputCols=57, strIdx_819a9a0ec61e, rfc_e715460b59df)
pipeline: org.apache.spark.ml.Pipeline = pipeline_9de3fd580e09


## Definindo o avaliador que será usado na validação cruzada

In [12]:
val evaluator = new BinaryClassificationEvaluator()
  .setLabelCol("label")
  .setMetricName("areaUnderROC")

evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = BinaryClassificationEvaluator: uid=binEval_04394fe481c0, metricName=areaUnderROC, numBins=1000


## Definindo a matriz de parâmetros que serão modificados ao se realizar várias predições. O validador cruzado irá escolher o conjunto de parâmetros com a melhor predição, de acordo com o classificador definido previamente.

In [13]:
val paramGrid = new ParamGridBuilder()
  .addGrid(randomForestClassifier.maxBins, Array(25, 28, 31))
  .addGrid(randomForestClassifier.maxDepth, Array(4, 6, 8))
  .addGrid(randomForestClassifier.impurity, Array("entropy", "gini"))
  .build()

paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfc_e715460b59df-impurity: entropy,
	rfc_e715460b59df-maxBins: 25,
	rfc_e715460b59df-maxDepth: 4
}, {
	rfc_e715460b59df-impurity: entropy,
	rfc_e715460b59df-maxBins: 25,
	rfc_e715460b59df-maxDepth: 6
}, {
	rfc_e715460b59df-impurity: entropy,
	rfc_e715460b59df-maxBins: 25,
	rfc_e715460b59df-maxDepth: 8
}, {
	rfc_e715460b59df-impurity: gini,
	rfc_e715460b59df-maxBins: 25,
	rfc_e715460b59df-maxDepth: 4
}, {
	rfc_e715460b59df-impurity: gini,
	rfc_e715460b59df-maxBins: 25,
	rfc_e715460b59df-maxDepth: 6
}, {
	rfc_e715460b59df-impurity: gini,
	rfc_e715460b59df-maxBins: 25,
	rfc_e715460b59df-maxDepth: 8
}, {
	rfc_e715460b59df-impurity: entropy,
	rfc_e715460b59df-maxBins: 28,
	rfc_e715460b59df-maxDepth: 4
}, {
	rfc_e715460b59df-impu...


## Definindo o validador cruzado

In [14]:
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(5)

cv: org.apache.spark.ml.tuning.CrossValidator = cv_d8ad7c2793dc


## Treinando o modelo

In [15]:
val cvModel: CrossValidatorModel = cv.fit(dataTrain)

cvModel: org.apache.spark.ml.tuning.CrossValidatorModel = CrossValidatorModel: uid=cv_d8ad7c2793dc, bestModel=pipeline_9de3fd580e09, numFolds=5


## Avaliando a previsão

In [16]:
val cvPredictionDf = cvModel.transform(dataTest)
cvPredictionDf.show

+----------+-------------+------------+--------+------+----------+----------+--------------+----------+-----------+---------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-------------+-----------------+-------------+--------------+------------------+-------+-------+-------+-------+-------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-------------+-----------------+-------------+--------------+------------------+-------+-------+-------+-------+-------+--------------------+-----+--------------------+--------------------+----------+
|    gameId| creationTime|gameDuration|seasonId|winner|firstBlood|firstTower|firstInhibitor|firstBaron|firstDragon|firstRiftHera

cvPredictionDf: org.apache.spark.sql.DataFrame = [gameId: bigint, creationTime: bigint ... 64 more fields]


## Salvando o modelo para poder ser utilizado depois em novas previsões

In [17]:
cvModel.write.overwrite().save("model/lol")


In [18]:
cvModel.write.overwrite().save("model/lol")


## Definir a SparkSession

In [19]:
// Para definir um cluster externo master,
// basta trocar "local[*]" por "spark://<host>:<port>"
// com os dados de host e port do master.
val spark = SparkSession.builder.master("local[*]").getOrCreate

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@56d90cfa


## Carregar dados dos jogos

In [20]:
val gamesDF = spark.read
  .option("header", value = true)
  .option("inferSchema", value = true)
  .csv("games.csv")
  .persist()

gamesDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [gameId: bigint, creationTime: bigint ... 59 more fields]


## Analisar schema e dados no dataframe

In [21]:
gamesDF.printSchema

root
 |-- gameId: long (nullable = true)
 |-- creationTime: long (nullable = true)
 |-- gameDuration: integer (nullable = true)
 |-- seasonId: integer (nullable = true)
 |-- winner: integer (nullable = true)
 |-- firstBlood: integer (nullable = true)
 |-- firstTower: integer (nullable = true)
 |-- firstInhibitor: integer (nullable = true)
 |-- firstBaron: integer (nullable = true)
 |-- firstDragon: integer (nullable = true)
 |-- firstRiftHerald: integer (nullable = true)
 |-- t1_champ1id: integer (nullable = true)
 |-- t1_champ1_sum1: integer (nullable = true)
 |-- t1_champ1_sum2: integer (nullable = true)
 |-- t1_champ2id: integer (nullable = true)
 |-- t1_champ2_sum1: integer (nullable = true)
 |-- t1_champ2_sum2: integer (nullable = true)
 |-- t1_champ3id: integer (nullable = true)
 |-- t1_champ3_sum1: integer (nullable = true)
 |-- t1_champ3_sum2: integer (nullable = true)
 |-- t1_champ4id: integer (nullable = true)
 |-- t1_champ4_sum1: integer (nullable = true)
 |-- t1_champ4_sum2

In [22]:
gamesDF.show(5)

+----------+-------------+------------+--------+------+----------+----------+--------------+----------+-----------+---------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-------------+-----------------+-------------+--------------+------------------+-------+-------+-------+-------+-------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-------------+-----------------+-------------+--------------+------------------+-------+-------+-------+-------+-------+
|    gameId| creationTime|gameDuration|seasonId|winner|firstBlood|firstTower|firstInhibitor|firstBaron|firstDragon|firstRiftHerald|t1_champ1id|t1_champ1_sum1|t1_champ1_sum2|t1_champ2id|t1_champ2_sum1|t1_champ

## Separar em dados de treino e teste

In [23]:
val seed = 11011990
val Array(dataTrain, dataTest) = gamesDF.randomSplit(Array(0.8, 0.2), seed)

seed: Int = 11011990
dataTrain: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [gameId: bigint, creationTime: bigint ... 59 more fields]
dataTest: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [gameId: bigint, creationTime: bigint ... 59 more fields]


## Filtrar colunas relevantes ao modelo

In [24]:
val columns = gamesDF.columns.filter(
  !Array("gameId", "creationTime", "seasonId", "winner").contains(_)
)

columns: Array[String] = Array(gameDuration, firstBlood, firstTower, firstInhibitor, firstBaron, firstDragon, firstRiftHerald, t1_champ1id, t1_champ1_sum1, t1_champ1_sum2, t1_champ2id, t1_champ2_sum1, t1_champ2_sum2, t1_champ3id, t1_champ3_sum1, t1_champ3_sum2, t1_champ4id, t1_champ4_sum1, t1_champ4_sum2, t1_champ5id, t1_champ5_sum1, t1_champ5_sum2, t1_towerKills, t1_inhibitorKills, t1_baronKills, t1_dragonKills, t1_riftHeraldKills, t1_ban1, t1_ban2, t1_ban3, t1_ban4, t1_ban5, t2_champ1id, t2_champ1_sum1, t2_champ1_sum2, t2_champ2id, t2_champ2_sum1, t2_champ2_sum2, t2_champ3id, t2_champ3_sum1, t2_champ3_sum2, t2_champ4id, t2_champ4_sum1, t2_champ4_sum2, t2_champ5id, t2_champ5_sum1, t2_champ5_sum2, t2_towerKills, t2_inhibitorKills, t2_baronKills, t2_dragonKills, t2_riftHeraldKills, t2_ba...


## Gerar assembler para criar vetor de features com as colunas desejadas

In [25]:
val assembler = new VectorAssembler()
  .setInputCols(columns)
  .setOutputCol("features")

assembler: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_29927025299a, handleInvalid=error, numInputCols=57


## Indexador para definir a coluna interesse

In [26]:
val indexer = new StringIndexer()
  .setInputCol("winner")
  .setOutputCol("label")

indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_612d14505635


## Definindo o classificador de Floresta Aleatória

In [27]:
val randomForestClassifier = new RandomForestClassifier()
  .setImpurity("gini")
  .setMaxDepth(3)
  .setNumTrees(20)
  .setFeatureSubsetStrategy("auto")

randomForestClassifier: org.apache.spark.ml.classification.RandomForestClassifier = rfc_2ada3bfb9574


## Definindo o pipeline de operações

In [28]:
val stages = Array(assembler, indexer, randomForestClassifier)
val pipeline = new Pipeline().setStages(stages)

stages: Array[org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable}}] = Array(VectorAssembler: uid=vecAssembler_29927025299a, handleInvalid=error, numInputCols=57, strIdx_612d14505635, rfc_2ada3bfb9574)
pipeline: org.apache.spark.ml.Pipeline = pipeline_f60f8d1b87df


## Definindo o avaliador que será usado na validação cruzada

In [29]:
val evaluator = new BinaryClassificationEvaluator()
  .setLabelCol("label")
  .setMetricName("areaUnderROC")

evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = BinaryClassificationEvaluator: uid=binEval_9f36fada8c1b, metricName=areaUnderROC, numBins=1000


## Definindo a matriz de parâmetros que serão modificados ao se realizar várias predições. O validador cruzado irá escolher o conjunto de parâmetros com a melhor predição, de acordo com o classificador definido previamente.

In [30]:
val paramGrid = new ParamGridBuilder()
  .addGrid(randomForestClassifier.maxBins, Array(25, 28, 31))
  .addGrid(randomForestClassifier.maxDepth, Array(4, 6, 8))
  .addGrid(randomForestClassifier.impurity, Array("entropy", "gini"))
  .build()

paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfc_2ada3bfb9574-impurity: entropy,
	rfc_2ada3bfb9574-maxBins: 25,
	rfc_2ada3bfb9574-maxDepth: 4
}, {
	rfc_2ada3bfb9574-impurity: entropy,
	rfc_2ada3bfb9574-maxBins: 25,
	rfc_2ada3bfb9574-maxDepth: 6
}, {
	rfc_2ada3bfb9574-impurity: entropy,
	rfc_2ada3bfb9574-maxBins: 25,
	rfc_2ada3bfb9574-maxDepth: 8
}, {
	rfc_2ada3bfb9574-impurity: gini,
	rfc_2ada3bfb9574-maxBins: 25,
	rfc_2ada3bfb9574-maxDepth: 4
}, {
	rfc_2ada3bfb9574-impurity: gini,
	rfc_2ada3bfb9574-maxBins: 25,
	rfc_2ada3bfb9574-maxDepth: 6
}, {
	rfc_2ada3bfb9574-impurity: gini,
	rfc_2ada3bfb9574-maxBins: 25,
	rfc_2ada3bfb9574-maxDepth: 8
}, {
	rfc_2ada3bfb9574-impurity: entropy,
	rfc_2ada3bfb9574-maxBins: 28,
	rfc_2ada3bfb9574-maxDepth: 4
}, {
	rfc_2ada3bfb9574-impu...


## Definindo o validador cruzado

In [31]:
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(5)

cv: org.apache.spark.ml.tuning.CrossValidator = cv_89611e944a3a


## Treinando o modelo

In [32]:
val cvModel: CrossValidatorModel = cv.fit(dataTrain)

cvModel: org.apache.spark.ml.tuning.CrossValidatorModel = CrossValidatorModel: uid=cv_89611e944a3a, bestModel=pipeline_f60f8d1b87df, numFolds=5


## Avaliando a previsão

In [33]:
val cvPredictionDf = cvModel.transform(dataTest)
cvPredictionDf.show

+----------+-------------+------------+--------+------+----------+----------+--------------+----------+-----------+---------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-------------+-----------------+-------------+--------------+------------------+-------+-------+-------+-------+-------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-----------+--------------+--------------+-------------+-----------------+-------------+--------------+------------------+-------+-------+-------+-------+-------+--------------------+-----+--------------------+--------------------+----------+
|    gameId| creationTime|gameDuration|seasonId|winner|firstBlood|firstTower|firstInhibitor|firstBaron|firstDragon|firstRiftHera

cvPredictionDf: org.apache.spark.sql.DataFrame = [gameId: bigint, creationTime: bigint ... 64 more fields]


## Salvando o modelo para poder ser utilizado depois em novas previsões

In [34]:
cvModel.write.overwrite().save("model/lol")