In [18]:
!pip install "-r" "/rapids/notebooks/ECD-TCC/jupyter/requirements.txt"
import matplotlib.pyplot as plt
from pyspark.conf import SparkConf
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col
from pyspark.sql.types import IntegerType


[0m

In [19]:
# Create PySpark SparkSession
sparkConf = (
    SparkConf()
    .setAppName("ECD_TCC")
    .setMaster("local")
    .setAll([(k, v) for k, v in {
        # "spark.driver.memory": "8g",
        # "spark.executor.memory": "8g",
        "spark.executor.resource.gpu.amount": "1",
        "spark.executor.resource.gpu.discoveryScript": "/opt/sparkRapidsPlugin/getGpusResources.sh",
        "spark.hadoop.parquet.summary.metadata.level": "none",
        "spark.jars": "/opt/sparkRapidsPlugin/rapids-4-spark_2.12-22.08.0.jar",
        "spark.locality.wait": "0s",
        "spark.plugins": "com.nvidia.spark.SQLPlugin",
        "spark.rapids.memory.pinnedPool.size": "2G",
        "spark.rapids.sql.concurrentGpuTasks": "1",
        "spark.sql.files.maxPartitionBytes": "512m",
        "spark.sql.parquet.mergeSchema": "false",
        "spark.sql.cache.serializer": "com.nvidia.spark.ParquetCachedBatchSerializer",
        "spark.rapids.sql.exec.CollectLimitExec": "true"
    }.items()])
)


In [20]:
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()


In [21]:
# Filtrando apenas casos de covid
df_raw = spark.read.parquet("/rapids/notebooks/ECD-TCC/jupyter/datasets/raw").where("CLASSI_FIN == 5")


[Stage 28:>                 (0 + 1) / 1][Stage 29:>                 (0 + 0) / 1]

22/10/05 18:06:03 WARN GpuOverrides: 
!Exec <HashAggregateExec> cannot run on GPU because not all expressions can be replaced
  @Expression <AggregateExpression> max(label#5664) could run on GPU
    !Expression <Max> max(label#5664) cannot run on GPU because Max aggregation on floating point columns that can contain NaNs will compute incorrect results. If it is known that there are no NaNs, set  spark.rapids.sql.hasNans to false.
      @Expression <AttributeReference> label#5664 could run on GPU
  @Expression <AttributeReference> max(label#5664)#5954 could run on GPU
  @Expression <Alias> max(label#5664)#5954 AS max(label)#5955 could run on GPU
    @Expression <AttributeReference> max(label#5664)#5954 could run on GPU

22/10/05 18:06:03 WARN GpuOverrides: 
!Exec <HashAggregateExec> cannot run on GPU because not all expressions can be replaced
  @Expression <AggregateExpression> max(label#5664) could run on GPU
    !Expression <Max> max(label#5664) cannot run on GPU because Max aggregat

                                                                                

In [22]:
# Selecionando apenas as colunas com fatores de risco
select_cols = ["PUERPERA", "CARDIOPATI", "HEMATOLOGI", "SIND_DOWN", "HEPATICA", "ASMA", "DIABETES", "NEUROLOGIC",
               "PNEUMOPATI", "IMUNODEPRE", "RENAL", "OBESIDADE", "VACINA_COV", "VACINA", "EVOLUCAO"]
feature_list = select_cols[:-1]


In [23]:
# Preenchendo valores nulos como ignorados e normalizando os preenchimentos de fatores de risco
df = df_raw.select(select_cols).where("VACINA_COV <> '12/02/2021'").cache()
for column in feature_list:
    df = df.withColumn(
        column,
        when(col(column) == "1", 1)
        .when(col(column) == "2", -1)
        .otherwise(0)
    )
df = df.withColumn("label", col("EVOLUCAO").cast(IntegerType()) - 1).cache()


22/10/05 18:06:03 WARN CacheManager: Asked to cache already cached data.
22/10/05 18:06:03 WARN CacheManager: Asked to cache already cached data.


In [24]:
print("Dataset com os dados relevantes")
df.pandas_api().head()


Dataset com os dados relevantes
22/10/05 18:06:03 WARN GpuOverrides: 
  ! <AttachDistributedSequenceExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.python.AttachDistributedSequenceExec
    @Expression <AttributeReference> __index_level_0__#7204L could run on GPU



[Stage 34:>                 (0 + 0) / 1][Stage 35:>                 (0 + 1) / 1]

22/10/05 18:06:07 ERROR Executor: Exception in task 0.0 in stage 35.0 (TID 27)
scala.MatchError: [null,1.0,(14,[1,12],[1.0,-1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$1(Predictor.scala:81)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIt

                                                                                

Unnamed: 0,PUERPERA,CARDIOPATI,HEMATOLOGI,SIND_DOWN,HEPATICA,ASMA,DIABETES,NEUROLOGIC,PNEUMOPATI,IMUNODEPRE,RENAL,OBESIDADE,VACINA_COV,VACINA,EVOLUCAO,label
0,-1,-1,-1,-1,-1,-1,1,-1,-1,-1,-1,-1,1,0,1.0,0.0
1,-1,-1,-1,-1,-1,-1,1,-1,-1,-1,-1,-1,1,0,1.0,0.0
2,0,0,0,0,0,0,0,0,0,0,0,0,-1,-1,1.0,0.0
3,0,0,0,0,0,0,1,0,0,0,0,0,-1,1,,
4,0,1,0,0,0,0,0,0,0,0,0,0,1,1,1.0,0.0


In [25]:
# Valores de camadas inciais e finais para o MLP
features_n = len(select_cols) - 1
classes_n = df.select("EVOLUCAO").distinct().count() + 1


22/10/05 18:06:07 WARN GpuOverrides: 
! <DeserializeToObjectExec> cannot run on GPU because not all expressions can be replaced; GPU does not currently support the operator class org.apache.spark.sql.execution.DeserializeToObjectExec
  ! <CreateExternalRow> createexternalrow(PUERPERA#423, CARDIOPATI#439, HEMATOLOGI#455, SIND_DOWN#471, HEPATICA#487, ASMA#503, DIABETES#519, NEUROLOGIC#535, PNEUMOPATI#551, IMUNODEPRE#567, RENAL#583, OBESIDADE#599, VACINA_COV#615, VACINA#631, EVOLUCAO#109.toString, label#8052, newInstance(class org.apache.spark.ml.linalg.VectorUDT).deserialize, CrossValidator_f495e622d061_rand#2384, StructField(PUERPERA,IntegerType,false), StructField(CARDIOPATI,IntegerType,false), StructField(HEMATOLOGI,IntegerType,false), StructField(SIND_DOWN,IntegerType,false), StructField(HEPATICA,IntegerType,false), StructField(ASMA,IntegerType,false), ... 12 more fields) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.express

[Stage 37:>                 (0 + 1) / 1][Stage 39:>                 (0 + 0) / 1]

22/10/05 18:07:21 WARN GpuOverrides: 
!Exec <HashAggregateExec> cannot run on GPU because not all expressions can be replaced
  @Expression <AggregateExpression> max(label#8052) could run on GPU
    !Expression <Max> max(label#8052) cannot run on GPU because Max aggregation on floating point columns that can contain NaNs will compute incorrect results. If it is known that there are no NaNs, set  spark.rapids.sql.hasNans to false.
      @Expression <AttributeReference> label#8052 could run on GPU
  @Expression <AttributeReference> max(label#8052)#8342 could run on GPU
  @Expression <Alias> max(label#8052)#8342 AS max(label)#8343 could run on GPU
    @Expression <AttributeReference> max(label#8052)#8342 could run on GPU

22/10/05 18:07:21 WARN GpuOverrides: 
!Exec <HashAggregateExec> cannot run on GPU because not all expressions can be replaced
  @Expression <AggregateExpression> max(label#8052) could run on GPU
    !Expression <Max> max(label#8052) cannot run on GPU because Max aggregat

                                                                                

In [26]:
# Juntar as variáveis em um vetor de features
assembler = VectorAssembler(inputCols=feature_list, outputCol="features")
assembledDF = assembler.transform(df)


22/10/05 18:07:22 WARN GpuOverrides: 
! <DeserializeToObjectExec> cannot run on GPU because not all expressions can be replaced; GPU does not currently support the operator class org.apache.spark.sql.execution.DeserializeToObjectExec
  ! <CreateExternalRow> createexternalrow(label#8718, 1.0#8719, newInstance(class org.apache.spark.ml.linalg.VectorUDT).deserialize, StructField(label,DoubleType,true), StructField(1.0,DoubleType,false), StructField(features,org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7,true)) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.CreateExternalRow
    @Expression <AttributeReference> label#8718 could run on GPU
    @Expression <AttributeReference> 1.0#8719 could run on GPU
    ! <Invoke> newInstance(class org.apache.spark.ml.linalg.VectorUDT).deserialize cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.Invoke


[Stage 45:>                                                         (0 + 1) / 1]

In [27]:
# Modelos a testar: regressão logística, Floresta aleatória e multilayerperceptron
lr = LogisticRegression().setFamily("binomial")
rf = RandomForestClassifier()
mlp = MultilayerPerceptronClassifier()


In [28]:
# Pipelines de transformações para a validação cruzada
pipeline_lr = Pipeline(stages=[rf])
pipeline_rf = Pipeline(stages=[rf])
pipeline_mlp = Pipeline(stages=[mlp])


In [29]:
# Parâmetros para a validação cruzada
paramGrid_lr = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.01, 0.1, 0.5, 1.0, 2.0])
    .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
    .addGrid(lr.maxIter, [1, 5, 10, 20, 50])
    .build()
)
paramGrid_rf = (
    ParamGridBuilder()
    .addGrid(rf.numTrees, [10, 30, 50])
    .addGrid(rf.maxDepth, [5, 10, 15])
    .build()
)
paramGrid_mlp = (
    ParamGridBuilder()
    .addGrid(mlp.layers, [
        [features_n, 3, 3, classes_n],
        [features_n, 4, 4, classes_n],
        [features_n, 3, 3, 3, classes_n],
        [features_n, 4, 4, 4, classes_n],
    ]).build()
)

In [30]:
# Validador cruzado
crossval_lr = CrossValidator(estimator=pipeline_lr, estimatorParamMaps=paramGrid_lr,
                             evaluator=MulticlassClassificationEvaluator())
crossval_rf = CrossValidator(estimator=pipeline_rf, estimatorParamMaps=paramGrid_rf,
                             evaluator=MulticlassClassificationEvaluator())
crossval_mlp = CrossValidator(estimator=pipeline_mlp, estimatorParamMaps=paramGrid_mlp,
                              evaluator=MulticlassClassificationEvaluator())


In [31]:
# Separar os dados em treino e teste
taxa_de_treino = 0.00001
(trainingData, testData) = df.randomSplit([taxa_de_treino, 1 - taxa_de_treino])


In [32]:
# Salvando as massa utilizadas em treino e teste para fazer análises depois de gerar os modelos
trainingData.write.mode("overwrite").parquet("/rapids/notebooks/ECD-TCC/jupyter/datasets/training")
testData.write.mode("overwrite").parquet("/rapids/notebooks/ECD-TCC/jupyter/datasets/test")

[Stage 47:>                 (0 + 1) / 1][Stage 48:>                 (0 + 0) / 1]

22/10/05 18:07:27 ERROR Executor: Exception in task 0.0 in stage 47.0 (TID 36)
scala.MatchError: [null,1.0,(14,[1,12],[1.0,-1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$1(Predictor.scala:81)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIt

                                                                                

22/10/05 18:07:27 WARN GpuOverrides: 
!Exec <HashAggregateExec> cannot run on GPU because not all expressions can be replaced
  @Expression <AggregateExpression> max(label#9540) could run on GPU
    !Expression <Max> max(label#9540) cannot run on GPU because Max aggregation on floating point columns that can contain NaNs will compute incorrect results. If it is known that there are no NaNs, set  spark.rapids.sql.hasNans to false.
      @Expression <AttributeReference> label#9540 could run on GPU
  @Expression <AttributeReference> max(label#9540)#9830 could run on GPU
  @Expression <Alias> max(label#9540)#9830 AS max(label)#9831 could run on GPU
    @Expression <AttributeReference> max(label#9540)#9830 could run on GPU
  !Exec <ShuffleExchangeExec> cannot run on GPU because Columnar exchange without columnar children is inefficient
    @Partitioning <SinglePartition$> could run on GPU
    !Exec <HashAggregateExec> cannot run on GPU because not all expressions can be replaced
      @Expr

[Stage 49:>                 (0 + 1) / 1][Stage 50:>                 (0 + 0) / 1]

22/10/05 18:08:44 WARN GpuOverrides: 
!Exec <HashAggregateExec> cannot run on GPU because not all expressions can be replaced
  @Expression <AggregateExpression> max(label#9540) could run on GPU
    !Expression <Max> max(label#9540) cannot run on GPU because Max aggregation on floating point columns that can contain NaNs will compute incorrect results. If it is known that there are no NaNs, set  spark.rapids.sql.hasNans to false.
      @Expression <AttributeReference> label#9540 could run on GPU
  @Expression <AttributeReference> max(label#9540)#9830 could run on GPU
  @Expression <Alias> max(label#9540)#9830 AS max(label)#9831 could run on GPU
    @Expression <AttributeReference> max(label#9540)#9830 could run on GPU

22/10/05 18:08:44 WARN GpuOverrides: 
!Exec <HashAggregateExec> cannot run on GPU because not all expressions can be replaced
  @Expression <AggregateExpression> max(label#9540) could run on GPU
    !Expression <Max> max(label#9540) cannot run on GPU because Max aggregat

                                                                                

22/10/05 18:08:45 WARN GpuOverrides: 
! <DeserializeToObjectExec> cannot run on GPU because not all expressions can be replaced; GPU does not currently support the operator class org.apache.spark.sql.execution.DeserializeToObjectExec
  ! <CreateExternalRow> createexternalrow(label#10199, 1.0#10200, newInstance(class org.apache.spark.ml.linalg.VectorUDT).deserialize, StructField(label,DoubleType,true), StructField(1.0,DoubleType,false), StructField(features,org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7,true)) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.CreateExternalRow
    @Expression <AttributeReference> label#10199 could run on GPU
    @Expression <AttributeReference> 1.0#10200 could run on GPU
    ! <Invoke> newInstance(class org.apache.spark.ml.linalg.VectorUDT).deserialize cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.Inv

                                                                                

In [33]:
crossval_lr_model = crossval_lr.fit(assembledDF)

22/10/05 18:08:47 WARN GpuOverrides: 
!Exec <FilterExec> cannot run on GPU because unsupported data types in output: org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 [features#9009]; unsupported data types in input: org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 [features#9009]
  @Expression <And> ((CrossValidator_84eacfa9d0dd_rand#10475 >= 0.0) AND (CrossValidator_84eacfa9d0dd_rand#10475 < 0.3333333333333333)) could run on GPU
    @Expression <GreaterThanOrEqual> (CrossValidator_84eacfa9d0dd_rand#10475 >= 0.0) could run on GPU
      @Expression <AttributeReference> CrossValidator_84eacfa9d0dd_rand#10475 could run on GPU
      @Expression <Literal> 0.0 could run on GPU
    @Expression <LessThan> (CrossValidator_84eacfa9d0dd_rand#10475 < 0.3333333333333333) could run on GPU
      @Expression <AttributeReference> CrossValidator_84eacfa9d0dd_rand#10475 could run on GPU
      @Expression <Literal> 0.3333333333333333 could run on GPU
  !Exec <ProjectExec> cannot run on GPU because unsupported d

[Stage 55:>                                                         (0 + 1) / 1]

22/10/05 18:08:47 WARN GpuOverrides: 
! <DeserializeToObjectExec> cannot run on GPU because not all expressions can be replaced; GPU does not currently support the operator class org.apache.spark.sql.execution.DeserializeToObjectExec
  ! <CreateExternalRow> createexternalrow(PUERPERA#6947, CARDIOPATI#6963, HEMATOLOGI#6979, SIND_DOWN#6995, HEPATICA#7011, ASMA#7027, DIABETES#7043, NEUROLOGIC#7059, PNEUMOPATI#7075, IMUNODEPRE#7091, RENAL#7107, OBESIDADE#7123, VACINA_COV#7139, VACINA#7155, EVOLUCAO#6432.toString, label#11155, newInstance(class org.apache.spark.ml.linalg.VectorUDT).deserialize, CrossValidator_84eacfa9d0dd_rand#10475, StructField(PUERPERA,IntegerType,false), StructField(CARDIOPATI,IntegerType,false), StructField(HEMATOLOGI,IntegerType,false), StructField(SIND_DOWN,IntegerType,false), StructField(HEPATICA,IntegerType,false), StructField(ASMA,IntegerType,false), ... 12 more fields) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql

[Stage 55:>                 (0 + 1) / 1][Stage 56:>                 (0 + 0) / 1]

22/10/05 18:08:48 ERROR Executor: Exception in task 0.0 in stage 55.0 (TID 43)
scala.MatchError: [null,1.0,(14,[1,12],[1.0,-1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$1(Predictor.scala:81)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIt

[Stage 56:>                 (0 + 1) / 1][Stage 57:>                 (0 + 0) / 1]

22/10/05 18:09:51 WARN GpuOverrides: 
!Exec <HashAggregateExec> cannot run on GPU because not all expressions can be replaced
  @Expression <AggregateExpression> max(label#11155) could run on GPU
    !Expression <Max> max(label#11155) cannot run on GPU because Max aggregation on floating point columns that can contain NaNs will compute incorrect results. If it is known that there are no NaNs, set  spark.rapids.sql.hasNans to false.
      @Expression <AttributeReference> label#11155 could run on GPU
  @Expression <AttributeReference> max(label#11155)#11445 could run on GPU
  @Expression <Alias> max(label#11155)#11445 AS max(label)#11446 could run on GPU
    @Expression <AttributeReference> max(label#11155)#11445 could run on GPU

22/10/05 18:09:51 WARN GpuOverrides: 
!Exec <HashAggregateExec> cannot run on GPU because not all expressions can be replaced
  @Expression <AggregateExpression> max(label#11155) could run on GPU
    !Expression <Max> max(label#11155) cannot run on GPU because 

[Stage 57:>                 (0 + 1) / 1][Stage 59:>                 (0 + 0) / 1]

22/10/05 18:10:53 WARN GpuOverrides: 
!Exec <HashAggregateExec> cannot run on GPU because not all expressions can be replaced
  @Expression <AggregateExpression> max(label#11815) could run on GPU
    !Expression <Max> max(label#11815) cannot run on GPU because Max aggregation on floating point columns that can contain NaNs will compute incorrect results. If it is known that there are no NaNs, set  spark.rapids.sql.hasNans to false.
      @Expression <AttributeReference> label#11815 could run on GPU
  @Expression <AttributeReference> max(label#11815)#12105 could run on GPU
  @Expression <Alias> max(label#11815)#12105 AS max(label)#12106 could run on GPU
    @Expression <AttributeReference> max(label#11815)#12105 could run on GPU

22/10/05 18:10:53 WARN GpuOverrides: 
!Exec <HashAggregateExec> cannot run on GPU because not all expressions can be replaced
  @Expression <AggregateExpression> max(label#11815) could run on GPU
    !Expression <Max> max(label#11815) cannot run on GPU because 

                                                                                

22/10/05 18:10:53 WARN GpuOverrides: 
! <DeserializeToObjectExec> cannot run on GPU because not all expressions can be replaced; GPU does not currently support the operator class org.apache.spark.sql.execution.DeserializeToObjectExec
  ! <CreateExternalRow> createexternalrow(label#12474, 1.0#12477, newInstance(class org.apache.spark.ml.linalg.VectorUDT).deserialize, StructField(label,DoubleType,true), StructField(1.0,DoubleType,false), StructField(features,org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7,true)) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.CreateExternalRow
    @Expression <AttributeReference> label#12474 could run on GPU
    @Expression <AttributeReference> 1.0#12477 could run on GPU
    ! <Invoke> newInstance(class org.apache.spark.ml.linalg.VectorUDT).deserialize cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.Inv

[Stage 64:>                 (0 + 1) / 1][Stage 65:>                 (0 + 0) / 1]

22/10/05 18:11:00 ERROR Executor: Exception in task 0.0 in stage 64.0 (TID 50)
scala.MatchError: [null,1.0,(14,[1,12],[1.0,-1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$1(Predictor.scala:81)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIt

[Stage 65:>                                                         (0 + 1) / 1]

Py4JJavaError: An error occurred while calling o1869.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 64.0 failed 1 times, most recent failure: Lost task 0.0 in stage 64.0 (TID 50) (ec49e0c80609 executor driver): scala.MatchError: [null,1.0,(14,[1,12],[1.0,-1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$1(Predictor.scala:81)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$2(RDD.scala:1198)
	at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2322)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$1(RDD.scala:1200)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1193)
	at org.apache.spark.ml.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:125)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:274)
	at org.apache.spark.ml.classification.RandomForestClassifier.$anonfun$train$1(RandomForestClassifier.scala:161)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:138)
	at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:46)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:115)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: scala.MatchError: [null,1.0,(14,[1,12],[1.0,-1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$1(Predictor.scala:81)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$2(RDD.scala:1198)
	at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2322)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
crossval_lr_model.write().save("/rapids/notebooks/ECD-TCC/jupyter/model/lr")

In [None]:
# Avaliando o modelo
predictions = cvModel.transform(testData)
evaluator = MulticlassClassificationEvaluator()
rmse = evaluator.evaluate(predictions)

predictions.select("label", "features", "rawPrediction",
                   "probability", "prediction").show()
predictions.select("prediction").distinct().show()

result = predictions.toPandas()

plt.plot(result.label, result.prediction, 'bo')
plt.xlabel('Sobrevivencia')
plt.ylabel('Prediction')
plt.suptitle("Model Performance RMSE: %f" % rmse)
plt.show()


In [None]:
# Selecionando o melhor modelo
bestPipeline = cvModel.bestModel
bestModel = bestPipeline.stages[2]

importances = bestModel.featureImportances

x_values = list(range(len(importances)))

plt.bar(x_values, importances, orientation='vertical')
plt.xticks(x_values, feature_list, rotation=40)
plt.ylabel('Importance')
plt.xlabel('Feature')
plt.title('Feature Importances')


In [None]:
print('numTrees - ', bestModel.getNumTrees)
print('maxDepth - ', bestModel.getOrDefault('maxDepth'))
