Projeto Final de Spark - Nível Básico
Campanha Nacional de Vacinação contra Covid-19: Dados e Referência das Visualizações

Aluno: Vanberto Zuim

Descrição das fórmulas que serão utilizadas para obter os dados do painel
Coeficiente de Incidência: Número de casos confirmados de COVID-19, por 100 mil habitantes, na população residente em determinado espaço geográfico, no período considerado.

Coeficiente de Mortalidade: Número de óbitos por doenças COVID-19, por 100 mil habitantes, na população residente em determinado espaço geográfico, no período considerado.

Taxa de Letalidade: Número de óbitos confirmados de COVID-19 em relação ao total de casos confirmados, na população residente em determinado espaço geográfico, no período considerado.

In [29]:
# Importação dos pacotes necessários

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as f

1 - Enviar os dados para o hdfs

Etapa realizada no terminal.

In [30]:
# Listagem dos arquivos presentes no hdfs

!hdfs dfs -ls -R /user/vanberto/covid

-rw-r--r--   3 root supergroup   62492959 2022-08-05 03:00 /user/vanberto/covid/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup   76520681 2022-08-05 03:00 /user/vanberto/covid/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rw-r--r--   3 root supergroup   91120916 2022-08-05 03:00 /user/vanberto/covid/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup    3046774 2022-08-05 03:00 /user/vanberto/covid/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv


In [31]:
# Leitura dos dados e visualizaçõs do schema

df = spark.read.csv("/user/vanberto/covid/*.csv", sep =';', header = True, inferSchema = True)
df.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: decimal(10,0) (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: integer (nullable = true)



In [32]:
# Ajuste do tipo dos dados - Informações são diárias, não necessita do campo hora

df = df.withColumn('data', f.from_unixtime(f.unix_timestamp(df.data), "yyyy-MM-dd"))
df = df.withColumn("obitosAcumulado", col("obitosAcumulado").cast(IntegerType()))

In [33]:
# Visualização de quatro colunas da tabela dados

df.select('data','regiao','casosNovos','casosAcumulado').show(10, truncate = False)

+----------+------+----------+--------------+
|data      |regiao|casosNovos|casosAcumulado|
+----------+------+----------+--------------+
|2021-01-01|Brasil|24605     |7700578       |
|2021-01-02|Brasil|15827     |7716405       |
|2021-01-03|Brasil|17341     |7733746       |
|2021-01-04|Brasil|20006     |7753752       |
|2021-01-05|Brasil|56648     |7810400       |
|2021-01-06|Brasil|63430     |7873830       |
|2021-01-07|Brasil|87843     |7961673       |
|2021-01-08|Brasil|52035     |8013708       |
|2021-01-09|Brasil|62290     |8075998       |
|2021-01-10|Brasil|29792     |8105790       |
+----------+------+----------+--------------+
only showing top 10 rows



2 - Otimizar todos os dados do hdfs para uma tabela Hive particionada por município

In [9]:
# Salvando os dados em tabela Hive particionada por município

df.write.mode('overwrite').partitionBy('municipio').saveAsTable('vanberto.projeto')

In [28]:
# Identificação da tabela Hive criada

!hdfs dfs -ls /user/hive/warehouse/vanberto.db

Found 1 items
drwxr-xr-x   - root supergroup          0 2022-08-05 13:14 /user/hive/warehouse/vanberto.db/projeto


In [34]:
# Visualização das partições criadas - apenas as 10 primeiras em ordem alfabética

spark.sql('show partitions vanberto.projeto').show(10, truncate = False)

+-----------------------------+
|partition                    |
+-----------------------------+
|municipio=Abadia de Goiás    |
|municipio=Abadia dos Dourados|
|municipio=Abadiânia          |
|municipio=Abaetetuba         |
|municipio=Abaeté             |
|municipio=Abaiara            |
|municipio=Abaré              |
|municipio=Abatiá             |
|municipio=Abaíra             |
|municipio=Abdon Batista      |
+-----------------------------+
only showing top 10 rows



3 - Criar as 3 visualizações pelo Spark com os dados enviados para o HDFS
PAINEL 1 - Casos Recuperados

In [35]:
# Os dados estão atualizados até o dia 06/07/2021

data = "2021-07-06"
br = df.where((df.regiao == 'Brasil') & (df.data == data))

In [36]:
# Número de casos recuperados e em acompanhamento - nível nacional (06/07/2021)

recuperados_br = br.select(br['regiao'].alias('Região'),\
                           br['Recuperadosnovos'].alias('Casos_Recuperados'),\
                           br['emAcompanhamentoNovos'].alias('Em_Acompanhamento'))
recuperados_br.show()

+------+-----------------+-----------------+
|Região|Casos_Recuperados|Em_Acompanhamento|
+------+-----------------+-----------------+
|Brasil|         17262646|          1065477|
+------+-----------------+-----------------+



PAINEL 2 - Casos Confirmados

In [37]:
# Número de casos acumulados, casos novos e incidência - nível nacional (06/07/2021)

casos_br = br.select(br['regiao'].alias('Região'),\
                     br['casosAcumulado'].alias('Casos_Acumulados'),\
                     br['casosNovos'].alias('Casos_Novos'),\
                    (f.round(br['casosAcumulado']/br['populacaoTCU2019']*100000,1)).alias('Incidência'))
casos_br.show()

+------+----------------+-----------+----------+
|Região|Casos_Acumulados|Casos_Novos|Incidência|
+------+----------------+-----------+----------+
|Brasil|        18855015|      62504|    8972.3|
+------+----------------+-----------+----------+



PAINEL 3 - Óbitos Confirmados

In [38]:
# Número de óbitos acumulados, óbitos novos, letalidade e mortalidade - nível nacional (06/07/2021)

obitos_br = br.select(br['regiao'].alias('Região'),\
                      br['obitosAcumulado'].alias('Óbitos_Acumulados'),\
                      br['obitosNovos'].alias('Óbitos_Novos'),\
                     (f.round(br['obitosAcumulado']/br['casosAcumulado']*100,1)).alias('Letalidade'),\
                     (f.round(br['obitosAcumulado']/br['populacaoTCU2019']*100000,1)).alias('Mortalidade'))
obitos_br.show()

+------+-----------------+------------+----------+-----------+
|Região|Óbitos_Acumulados|Óbitos_Novos|Letalidade|Mortalidade|
+------+-----------------+------------+----------+-----------+
|Brasil|           526892|        1780|       2.8|      250.7|
+------+-----------------+------------+----------+-----------+



4 - Salvar a primeira visualização como tabela Hive.

In [17]:
# Criação e identificação da tabela Hive com os dados do primeiro painel
# Por padrão o método save() salva no formato parquet e compressão snappy

recuperados_br.write.mode('overwrite').saveAsTable('recuperados_br_covid')


In [21]:
!hdfs dfs -ls /user/hive/warehouse/recuperados_br_covid

Found 3 items
-rw-r--r--   2 root supergroup          0 2022-08-05 13:38 /user/hive/warehouse/recuperados_br_covid/_SUCCESS
-rw-r--r--   2 root supergroup        483 2022-08-05 13:38 /user/hive/warehouse/recuperados_br_covid/part-00000-63f1ad7e-12e1-4ddf-8b56-15d95479a295-c000.snappy.parquet
-rw-r--r--   2 root supergroup        917 2022-08-05 13:38 /user/hive/warehouse/recuperados_br_covid/part-00002-63f1ad7e-12e1-4ddf-8b56-15d95479a295-c000.snappy.parquet


6 - Salvar a terceira visualização em um tópico no Kafka.

In [43]:
# Criação de um tópico no kafka com os dados do terceiro painel
# As colunas serão transformadas em um json para enviar todos os dados em um unico tópico

obitos_br\
    .selectExpr("to_json(struct(*)) AS value")\
    .write\
    .format('kafka')\
    .option('kafka.bootstrap.servers', 'kafka:9092')\
    .option('topic', 'obitos_brasil')\
    .save()

In [44]:
# Para conferir a criação do tópico, realizamos a leitura dele

obitos_topic = spark.read\
    .format('kafka')\
    .option('kafka.bootstrap.servers', 'kafka:9092')\
    .option('subscribe','obitos_brasil') \
    .load()

topic_string = obitos_topic.select(col('value').cast('string'))
topic_string.show(truncate = False)

+-------------------------------------------------------------------------------------------------------+
|value                                                                                                  |
+-------------------------------------------------------------------------------------------------------+
|{"Região":"Brasil","Óbitos_Acumulados":526892,"Óbitos_Novos":1780,"Letalidade":2.8,"Mortalidade":250.7}|
|{"Região":"Brasil","Óbitos_Acumulados":526892,"Óbitos_Novos":1780,"Letalidade":2.8,"Mortalidade":250.7}|
+-------------------------------------------------------------------------------------------------------+



7 - Criar a visualização pelo Spark com os dados enviados para o HDFS.

PAINEL 4 - Síntese de casos, óbitos, incidência e mortalidade

In [22]:
# Número de casos acumulados, óbitos acumulados, incidência e mortalidade - nível regional (06/07/2021)

sintese = df.groupBy(['regiao', 'estado'])\
            .agg({'casosAcumulado':'max', 'obitosAcumulado':'max', 'populacaoTCU2019':'max'})

sintese = (sintese
       .withColumnRenamed('max(populacaoTCU2019)','População')
       .withColumnRenamed('max(casosAcumulado)', 'Casos_Acumulados')
       .withColumnRenamed('max(obitosAcumulado)','Óbitos_Acumulados'))

sintese = (sintese
           .withColumn('Incidência', f.round(sintese['Casos_Acumulados']/sintese['População']*100000,1))
           .withColumn('Mortalidade', f.round(sintese['Óbitos_Acumulados']/sintese['População']*100000,1)))
                             
                             
sintese.drop('População').sort(col('regiao').asc()).show(30)

+------------+------+----------------+-----------------+----------+-----------+
|      regiao|estado|Casos_Acumulados|Óbitos_Acumulados|Incidência|Mortalidade|
+------------+------+----------------+-----------------+----------+-----------+
|      Brasil|  null|        18855015|           526892|    8972.3|      250.7|
|Centro-Oeste|    GO|          686433|            19485|    9780.5|      277.6|
|Centro-Oeste|    DF|          434708|             9322|   14416.9|      309.2|
|Centro-Oeste|    MS|          339323|             8400|   12210.3|      302.3|
|Centro-Oeste|    MT|          456155|            12000|   13091.1|      344.4|
|    Nordeste|    PI|          299084|             6662|    9137.3|      203.5|
|    Nordeste|    MA|          322052|             9190|    4551.9|      129.9|
|    Nordeste|    PE|          561505|            17953|    5875.3|      187.9|
|    Nordeste|    BA|         1141612|            24428|    7675.7|      164.2|
|    Nordeste|    RN|          347248|  

8 - Salvar a visualização do exercício 6 em um tópico no Elastic.

In [8]:
# Instalação do pacote necessário

!pip install elasticsearch==7.9

Collecting elasticsearch==7.9
  Downloading elasticsearch-7.9.0-py2.py3-none-any.whl (213 kB)
[K     |████████████████████████████████| 213 kB 523 kB/s eta 0:00:01
Installing collected packages: elasticsearch
Successfully installed elasticsearch-7.9.0


Found existing installation: acryl-datahub 0.8.27.1
Uninstalling acryl-datahub-0.8.27.1:
  Would remove:
    /opt/anaconda3/bin/datahub
    /opt/anaconda3/lib/python3.6/site-packages/acryl_datahub-0.8.27.1.dist-info/*
    /opt/anaconda3/lib/python3.6/site-packages/datahub/*
    /opt/anaconda3/lib/python3.6/site-packages/datahub_provider/*
Proceed (y/n)? 

In [12]:
from elasticsearch import Elasticsearch

# Configurando a conexão com o Elastic
es = Elasticsearch('192.168.56.140:9200')

In [13]:
# Verificando a configuração do Elastichsearch

es.info(pretty=True)

{'name': 'node1',
 'cluster_name': 'my_cluster',
 'cluster_uuid': '5PaO_OHESkuiq3X1i8a14w',
 'version': {'number': '7.9.2',
  'build_flavor': 'default',
  'build_type': 'docker',
  'build_hash': 'd34da0ea4a966c4e49417f2da2f244e3e97b4e6e',
  'build_date': '2020-09-23T00:45:33.626720Z',
  'build_snapshot': False,
  'lucene_version': '8.6.2',
  'minimum_wire_compatibility_version': '6.8.0',
  'minimum_index_compatibility_version': '6.0.0-beta1'},
 'tagline': 'You Know, for Search'}

In [39]:
# A visualização do exercício 6 é o terceito painel, com dataframe de nome obitos_br
# Transformando o tipo da coluna para float

obitos_br = obitos_br.withColumn("Letalidade", col("Letalidade").cast(FloatType()))\
                     .withColumn("Mortalidade", col("Mortalidade").cast(FloatType()))

In [45]:
# Enviando os dados para o Elastic

obitos_br.write.format("org.elasticsearch.spark.sql") \
                .option("es.nodes", "192.168.56.140") \
                .option("es.port", '9200')\
                .option("es.resource", 'br_dashboard/sample') \
                .option("es.nodes.wan.only", "true") \
                .mode('overwrite')\
                .save()

Py4JJavaError: An error occurred while calling o420.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 23.0 failed 1 times, most recent failure: Lost task 2.0 in stage 23.0 (TID 41, localhost, executor driver): org.apache.spark.util.TaskCompletionListenerException: Could not write all entries for bulk operation [1/1]. Error sample (first [5] error messages):
	org.elasticsearch.hadoop.rest.EsHadoopRemoteException: cluster_block_exception: index [br_dashboard] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded flood-stage watermark, index has read-only-allow-delete block];
	{"index":{}}
{"Região":"Brasil","Óbitos_Acumulados":526892,"Óbitos_Novos":1780,"Letalidade":2.8,"Mortalidade":250.7}

Bailing out...
	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
	at org.apache.spark.scheduler.Task.run(Task.scala:137)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.elasticsearch.spark.sql.EsSparkSQL$.saveToEs(EsSparkSQL.scala:101)
	at org.elasticsearch.spark.sql.ElasticsearchRelation.insert(DefaultSource.scala:620)
	at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:108)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.util.TaskCompletionListenerException: Could not write all entries for bulk operation [1/1]. Error sample (first [5] error messages):
	org.elasticsearch.hadoop.rest.EsHadoopRemoteException: cluster_block_exception: index [br_dashboard] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded flood-stage watermark, index has read-only-allow-delete block];
	{"index":{}}
{"Região":"Brasil","Óbitos_Acumulados":526892,"Óbitos_Novos":1780,"Letalidade":2.8,"Mortalidade":250.7}

Bailing out...
	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
	at org.apache.spark.scheduler.Task.run(Task.scala:137)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [46]:
# Verificando os dados no Elastic

es.search(index="br_dashboard")

{'took': 1,
 'timed_out': False,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
 'hits': {'total': {'value': 0, 'relation': 'eq'},
  'max_score': None,
  'hits': []}}

9 - Criar um dashboard no Elastic para visualização dos novos dados enviados.
O dashboard Não possui muitas informaçoes.