Autor: Tatiana Massae Sakata

# <a name='inicio'> Projeto Final de Spark </a>


Dados: https://bityli.com/eJhiDiYNj

Referência das Visualizações:
* Site: https://covid.saude.gov.br/
* Guia do Site: Painel Geral

* [Nivel Básico](#basico)
    * [Questão 01](#questao01)
    * [Questão 02](#questao02)
    * [Questão 03](#questao03)
    * [Questão 04](#questao04)
    * [Questão 05](#questao05)
    * [Questão 06](#questao06)
    * [Questão 07](#questao07)
    * [Questão 08](#questao08)
    * [Questão 09](#questao09)
* [Nivel Avançado](#avancado)
    * [Questão 01](#questao11)
    * [Questão 02](#questao12)
    * [Questão 03](#questao13)
    * [Questão 04](#questao14)
    * [Questão 05](#questao15)
    * [Questão 06](#questao16)
    * [Questão 07](#questao17)
    * [Questão 08](#questao18)
    * [Questão 09](#questao19)

## <a name="basico"> Nivel Básico </a>

### <a name="terminal"> Preparação de arquivos e estruturação do projeto (terminal WSL2) </a>

[voltar ao ínicio](#inicio)

**Atividades realizadas no terminal**  

Criação da pasta para o projeto:  
!hdfs dfs -mkdir -p /user/tatiana/projeto-final-data-engineer  


Envio dos arquivos para a pasta do projeto:  
!hdfs dfs -put /input/covid19-br/* /user/tatiana/projeto-final-data-engineer

In [1]:
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
spark

In [3]:
sc

In [4]:
spark.sparkContext.setLogLevel("INFO")

### <a name="questao01"> Questão 1 - Enviar os dados para o hdfs </a>

[voltar ao ínicio](#inicio)

In [5]:
#consultando os arquivo na pasta

!hdfs dfs -ls /user/tatiana/projeto-final-data-engineer

Found 5 items
-rw-r--r--   3 root supergroup   12445782 2022-12-06 01:00 /user/tatiana/projeto-final-data-engineer/04bd3419b22b9cc5c6efac2c6528100d_HIST_PAINEL_COVIDBR_06jul2021.rar
-rw-r--r--   3 root supergroup   62492959 2022-12-06 01:00 /user/tatiana/projeto-final-data-engineer/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup   76520681 2022-12-06 01:00 /user/tatiana/projeto-final-data-engineer/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rw-r--r--   3 root supergroup   91120916 2022-12-06 01:00 /user/tatiana/projeto-final-data-engineer/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup    3046774 2022-12-06 01:00 /user/tatiana/projeto-final-data-engineer/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv


### <a name="questao02"> Questão 2 - Otimizar todos os dados do hdfs para uma tabela Hive particionada por município. </a>

[voltar ao ínicio](#inicio)

In [6]:
data_covid_bruto = spark.read.csv("/user/tatiana/projeto-final-data-engineer/*csv",\
                                 sep = ";",\
                                 inferSchema= True,\
                                 header= True,\
                                 ignoreLeadingWhiteSpace= True,\
                                 mode= "DROPMALFORMED")

In [7]:
data_covid_bruto.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: decimal(10,0) (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: integer (nullable = true)



In [8]:
data_covid_bruto.show(5)

+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|regiao|estado|municipio|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|               data|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana|
+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|Brasil|  null|     null|   76|  null|          null|           null|2020-02-25 00:00:00|        9|       210147125|             0|         0|              0|          0|            null|                 null|                  null|
|Brasil|  null|     null|   76|  null|          null|           null

In [9]:
#selecionando as colunas com as informações para análise e geração das consultas

data_covid_limpo = data_covid_bruto.select('regiao','data','casosAcumulado','casosNovos','obitosAcumulado','obitosNovos','Recuperadosnovos','emAcompanhamentoNovos','municipio','populacaoTCU2019')
data_covid_limpo.show(5)

+------+-------------------+--------------+----------+---------------+-----------+----------------+---------------------+---------+----------------+
|regiao|               data|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|municipio|populacaoTCU2019|
+------+-------------------+--------------+----------+---------------+-----------+----------------+---------------------+---------+----------------+
|Brasil|2020-02-25 00:00:00|             0|         0|              0|          0|            null|                 null|     null|       210147125|
|Brasil|2020-02-26 00:00:00|             1|         1|              0|          0|            null|                 null|     null|       210147125|
|Brasil|2020-02-27 00:00:00|             1|         0|              0|          0|            null|                 null|     null|       210147125|
|Brasil|2020-02-28 00:00:00|             1|         0|              0|          0|            null|       

In [10]:
#tratando o formato da coluna data (alterando de timestamp para date)
#preenchendo as colunas com informações nulas por '0'

data_covid_final = data_covid_limpo.withColumn("data",to_date(col("data")))\
                                    .na.fill({'municipio': '0', 'Recuperadosnovos': 0, 'emAcompanhamentoNovos': 0, 'populacaoTCU2019': 0})
data_covid_final.limit(10).toPandas()

Unnamed: 0,regiao,data,casosAcumulado,casosNovos,obitosAcumulado,obitosNovos,Recuperadosnovos,emAcompanhamentoNovos,municipio,populacaoTCU2019
0,Brasil,2020-02-25,0,0,0,0,0,0,0,210147125
1,Brasil,2020-02-26,1,1,0,0,0,0,0,210147125
2,Brasil,2020-02-27,1,0,0,0,0,0,0,210147125
3,Brasil,2020-02-28,1,0,0,0,0,0,0,210147125
4,Brasil,2020-02-29,2,1,0,0,0,0,0,210147125
5,Brasil,2020-03-01,2,0,0,0,0,0,0,210147125
6,Brasil,2020-03-02,2,0,0,0,0,0,0,210147125
7,Brasil,2020-03-03,2,0,0,0,0,0,0,210147125
8,Brasil,2020-03-04,3,1,0,0,0,0,0,210147125
9,Brasil,2020-03-05,7,4,0,0,0,0,0,210147125


In [11]:
data_covid_final.show(5)

+------+----------+--------------+----------+---------------+-----------+----------------+---------------------+---------+----------------+
|regiao|      data|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|municipio|populacaoTCU2019|
+------+----------+--------------+----------+---------------+-----------+----------------+---------------------+---------+----------------+
|Brasil|2020-02-25|             0|         0|              0|          0|               0|                    0|        0|       210147125|
|Brasil|2020-02-26|             1|         1|              0|          0|               0|                    0|        0|       210147125|
|Brasil|2020-02-27|             1|         0|              0|          0|               0|                    0|        0|       210147125|
|Brasil|2020-02-28|             1|         0|              0|          0|               0|                    0|        0|       210147125|
|Brasil|2020-02-29| 

In [21]:
#executar caso o processo seja interrompido no meio do processo de criação das partições e o overwrite apresente erro

spark.conf.set("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation","true")

In [22]:
#execução da geração das tabelas

data_covid_final.write.mode("overwrite").partitionBy("municipio").saveAsTable("dados_covid_br")

In [12]:
#consultando as tabelas geradas

!hdfs dfs -ls /user/hive/warehouse/dados_covid_br | head

Found 5299 items
-rw-r--r--   2 root supergroup          0 2022-12-08 13:45 /user/hive/warehouse/dados_covid_br/_SUCCESS
drwxr-xr-x   - root supergroup          0 2022-12-08 13:45 /user/hive/warehouse/dados_covid_br/municipio=0
drwxr-xr-x   - root supergroup          0 2022-12-08 13:45 /user/hive/warehouse/dados_covid_br/municipio=Abadia de Goiás
drwxr-xr-x   - root supergroup          0 2022-12-08 13:45 /user/hive/warehouse/dados_covid_br/municipio=Abadia dos Dourados
drwxr-xr-x   - root supergroup          0 2022-12-08 13:45 /user/hive/warehouse/dados_covid_br/municipio=Abadiânia
drwxr-xr-x   - root supergroup          0 2022-12-08 13:45 /user/hive/warehouse/dados_covid_br/municipio=Abaetetuba
drwxr-xr-x   - root supergroup          0 2022-12-08 13:45 /user/hive/warehouse/dados_covid_br/municipio=Abaeté
drwxr-xr-x   - root supergroup          0 2022-12-08 13:45 /user/hive/warehouse/dados_covid_br/municipio=Abaiara
drwxr-xr-x   - root supergroup          0 2022-12-08 13:45 /user/hive/

In [13]:
#consultando as tabelas geradas

!hdfs dfs -ls /user/hive/warehouse/dados_covid_br/municipio=0 | head

Found 8 items
-rw-r--r--   2 root supergroup      58290 2022-12-08 13:25 /user/hive/warehouse/dados_covid_br/municipio=0/part-00000-6bcb600a-7417-4048-a21a-ac1f3a2d131e.c000.snappy.parquet
-rw-r--r--   2 root supergroup       7159 2022-12-08 13:25 /user/hive/warehouse/dados_covid_br/municipio=0/part-00001-6bcb600a-7417-4048-a21a-ac1f3a2d131e.c000.snappy.parquet
-rw-r--r--   2 root supergroup      79000 2022-12-08 13:25 /user/hive/warehouse/dados_covid_br/municipio=0/part-00002-6bcb600a-7417-4048-a21a-ac1f3a2d131e.c000.snappy.parquet
-rw-r--r--   2 root supergroup       8611 2022-12-08 13:25 /user/hive/warehouse/dados_covid_br/municipio=0/part-00003-6bcb600a-7417-4048-a21a-ac1f3a2d131e.c000.snappy.parquet
-rw-r--r--   2 root supergroup      99581 2022-12-08 13:25 /user/hive/warehouse/dados_covid_br/municipio=0/part-00004-6bcb600a-7417-4048-a21a-ac1f3a2d131e.c000.snappy.parquet
-rw-r--r--   2 root supergroup       7184 2022-12-08 13:25 /user/hive/warehouse/dados_covid_br/municipio=

In [14]:
spark.read.table("dados_covid_br").limit(5).toPandas()

Unnamed: 0,regiao,data,casosAcumulado,casosNovos,obitosAcumulado,obitosNovos,Recuperadosnovos,emAcompanhamentoNovos,populacaoTCU2019,municipio
0,Brasil,2021-01-01,7700578,24605,195411,462,6756284,748883,210147125,0
1,Brasil,2021-01-02,7716405,15827,195725,314,6769420,751260,210147125,0
2,Brasil,2021-01-03,7733746,17341,196018,293,6813008,724720,210147125,0
3,Brasil,2021-01-04,7753752,20006,196561,543,6875230,681961,210147125,0
4,Brasil,2021-01-05,7810400,56648,197732,1171,6963407,649261,210147125,0


In [15]:
spark.read.table("dados_covid_br").printSchema()

root
 |-- regiao: string (nullable = true)
 |-- data: date (nullable = true)
 |-- casosAcumulado: decimal(10,0) (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- municipio: string (nullable = true)



### <a name="questao03"> Questão 3 - Criar as 3 vizualizações pelo Spark com os dados enviados para o HDFS: </a>
**a.) Casos recuperados**  
**b.) Casos confirmados**  
**c.) Óbitos confirmados**  


![](quadros.png)


***Exemplo dos resultados a serem apresentados - data do painel: 06/07/2021

[voltar ao ínicio](#inicio)

**a.) Casos recuperados**

In [24]:
casos_recuperados = data_covid_final.filter("data == '2021-07-06'")\
                                    .select(sum('Recuperadosnovos').alias("Casos_recuperados"),\
                                            sum('emAcompanhamentoNovos').alias("Em_acompanhamento"))\
                                    

casos_recuperados_visualizacao = casos_recuperados.show()

+-----------------+-----------------+
|Casos_recuperados|Em_acompanhamento|
+-----------------+-----------------+
|         17262646|          1065477|
+-----------------+-----------------+



**b.) Casos confirmados**

In [26]:
casos_confirmados = data_covid_final.filter("data == '2021-07-06'")\
                                    .filter("populacaoTCU2019 is Not Null")\
                                    .select(max('casosAcumulado').alias("Casos_confirmados"),\
                                            max('casosNovos').alias("Casos_novos"),\
                                            max('populacaoTCU2019').alias("Populacao"))
                                            

casos_confirmados_incidencia = casos_confirmados.withColumn("Incidencia", format_number(100000*col("Casos_confirmados")/col("Populacao"),2))
                                    
casos_confirmados_visualizacao = casos_confirmados_incidencia.drop("Populacao").show()                                

+-----------------+-----------+----------+
|Casos_confirmados|Casos_novos|Incidencia|
+-----------------+-----------+----------+
|         18855015|      62504|  8,972.29|
+-----------------+-----------+----------+



**c.) Óbitos confirmados**

In [30]:
obitos_confirmados = data_covid_final.filter("data == '2021-07-06'")\
                                     .filter("populacaoTCU2019 is Not Null")\
                                     .select(max('obitosAcumulado').alias("Obitos_acumulados"),\
                                             max('obitosNovos').alias("Obitos_novos"),\
                                             max('casosAcumulado').alias("Casos_confirmados"),\
                                             max('populacaoTCU2019').alias("Populacao"))

letalidade = obitos_confirmados.withColumn("Letalidade", format_number(100*col("Obitos_acumulados")/col("Casos_confirmados"),2))
mortalidade = letalidade.withColumn("Mortalidade", format_number(100000.00*col("Obitos_acumulados")/col("Populacao"),2))

obitos_confirmados_visualizacao0 = mortalidade.drop("Casos_confirmados","Populacao").show()

+-----------------+------------+----------+-----------+
|Obitos_acumulados|Obitos_novos|Letalidade|Mortalidade|
+-----------------+------------+----------+-----------+
|           526892|        1780|      2.79|     250.73|
+-----------------+------------+----------+-----------+



In [31]:
#apenas para carregar as informações, por isso a duplicidade

obitos_confirmados = data_covid_final.filter("data == '2021-07-06'")\
                                     .filter("populacaoTCU2019 is Not Null")\
                                     .select(max('obitosAcumulado').alias("Obitos_acumulados"),\
                                             max('obitosNovos').alias("Obitos_novos"),\
                                             max('casosAcumulado').alias("Casos_confirmados"),\
                                             max('populacaoTCU2019').alias("Populacao"))

letalidade = obitos_confirmados.withColumn("Letalidade", format_number(100*col("Obitos_acumulados")/col("Casos_confirmados"),2))
mortalidade = letalidade.withColumn("Mortalidade", format_number(100000.00*col("Obitos_acumulados")/col("Populacao"),2))

obitos_confirmados_visualizacao = mortalidade.drop("Casos_confirmados","Populacao")

### <a name="questao04"> Questão 4 - Salvar a primeira visualização como tabela Hive </a>

[voltar ao ínicio](#inicio)

In [32]:
casos_recuperados.write.mode("overwrite").saveAsTable("tbl_casos_recuperados")

In [33]:
#verificando a criação da tabela

!hdfs dfs -ls /user/hive/warehouse/tbl_casos_recuperados

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-12-08 13:52 /user/hive/warehouse/tbl_casos_recuperados/_SUCCESS
-rw-r--r--   2 root supergroup        751 2022-12-08 13:52 /user/hive/warehouse/tbl_casos_recuperados/part-00000-47e55321-71c3-4936-9712-48cfb640dc01-c000.snappy.parquet


### <a name="questao05"> Questão 5 - Salvar a segunda visualização com formato parquet e compressão snappy </a>

[voltar ao ínicio](#inicio)

In [34]:
casos_confirmados.write.mode("overwrite").saveAsTable("tbl_casos_confirmados", compression='snappy')

In [35]:
#verificando a criação da tabela

!hdfs dfs -ls /user/hive/warehouse/tbl_casos_confirmados

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-12-08 13:52 /user/hive/warehouse/tbl_casos_confirmados/_SUCCESS
-rw-r--r--   2 root supergroup        930 2022-12-08 13:52 /user/hive/warehouse/tbl_casos_confirmados/part-00000-3b0665f3-6302-4e93-b741-4bc1bb3759ee-c000.snappy.parquet


In [29]:
# verificando as tabelas criadas

spark.catalog.listTables()

[Table(name='dados_covid_br', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='juros', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='juros1', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='tbl_casos_confirmados', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='tbl_casos_recuperados', database='default', description=None, tableType='MANAGED', isTemporary=False)]

### <a name="questao06"> Questão 6 - Salvar a terceira visualização em um tópico no Kafka </a>

[voltar ao ínicio](#inicio)

In [32]:
obitos_confirmados_visualizacao.printSchema()

root
 |-- Obitos_acumulados: integer (nullable = true)
 |-- Obitos_novos: integer (nullable = true)
 |-- Letalidade: string (nullable = true)
 |-- Mortalidade: string (nullable = true)



In [33]:
obitos_confirmados.selectExpr("to_json(struct(*)) As value")\
                                .write\
                                .format("kafka")\
                                .option("kafka.bootstrap.servers","kafka:9092")\
                                .option("topic", "topico_obitos_confirmados")\
                                .save()

In [34]:
obitos_confirmados_visualizacao.selectExpr("to_json(struct(*)) As value")\
                                .write\
                                .format("kafka")\
                                .option("kafka.bootstrap.servers","kafka:9092")\
                                .option("topic", "topico_obitos_novos")\
                                .save()

In [35]:
letalidade.selectExpr('to_json(struct(*)) As value')\
                .write\
                .format("kafka")\
                .option("kafka.bootstrap.servers","kafka:9092")\
                .option("topic","topic-letalidade")\
                .save()

In [36]:
mortalidade.selectExpr('to_json(struct(*)) As value')\
                .write\
                .format("kafka")\
                .option("kafka.bootstrap.servers","kafka:9092")\
                .option("topic","topic-mortalidade")\
                .save()

In [40]:
topico_obitos_confirmados = spark.read\
    .format('kafka')\
    .option('kafka.bootstrap.servers', 'kafka:9092')\
    .option('subscribe','topico_obitos_confirmados') \
    .load()

topic_string = topico_obitos_confirmados.select(col('value').cast('string'))
topic_string.show(truncate = False)

+---------------------------------------------------------------------------------------------------+
|value                                                                                              |
+---------------------------------------------------------------------------------------------------+
|{"Obitos_acumulados":526892,"Obitos_novos":1780,"Casos_confirmados":18855015,"Populacao":210147125}|
|{"Obitos_acumulados":526892,"Obitos_novos":1780,"Casos_confirmados":18855015,"Populacao":210147125}|
|{"Obitos_acumulados":526892,"Obitos_novos":1780,"Casos_confirmados":18855015,"Populacao":210147125}|
+---------------------------------------------------------------------------------------------------+



**Verificando a criação dos tópicos no terminal:**  

PS C:\Users\spark> docker exec -it kafka bash  
bash-4.4# kafka-topics.sh --bootstrap-server kafka:9092 --list  
topic-letalidade  
topic-mortalidade  
topico_obitos_confirmados  
topico_obitos_novos  

### <a name="questao07"> Questão 7 - Criar a visualização pelo Spark com os dados enviados para o HDFS: </a>
  
![](sintese.png)  
***Exemplo do quadro a ser apresentado -data do painel: 06/07/2021*

[voltar ao ínicio](#inicio)

In [41]:
visualizacao_hdfs = data_covid_final.groupBy('data','regiao')\
               .agg(sum('casosAcumulado').alias("Casos"),\
                 sum('obitosAcumulado').alias("Obitos"),\
                 sum('populacaoTCU2019').alias("Populacao"),\
                ).sort(desc("Casos")).filter("data == '2021-07-06'")

visualizacao_hdfs = visualizacao_hdfs.withColumn("Incidencia/100mil hab.",format_number(100000.00*col('Casos')/col('Populacao'),2))
visualizacao_hdfs = visualizacao_hdfs.withColumn("Mortalidade/100mil hab.", format_number(100000.00*col('Obitos')/col('Populacao'),2))
visualizacao_hdfs = visualizacao_hdfs.drop('Populacao').show()

+----------+------------+--------+------+----------------------+-----------------------+
|      data|      regiao|   Casos|Obitos|Incidencia/100mil hab.|Mortalidade/100mil hab.|
+----------+------------+--------+------+----------------------+-----------------------+
|2021-07-06|      Brasil|18855015|526892|              8,972.29|                 250.73|
|2021-07-06|     Sudeste|14277606|490622|              8,078.18|                 277.59|
|2021-07-06|    Nordeste| 8911474|215648|              7,807.27|                 188.93|
|2021-07-06|         Sul| 7222082|161410|             12,046.45|                 269.23|
|2021-07-06|Centro-Oeste| 3833238| 98414|             11,760.51|                 301.94|
|2021-07-06|       Norte| 3465630| 87690|              9,401.64|                 237.89|
+----------+------------+--------+------+----------------------+-----------------------+



### <a name="questao08"> Questão 8 - Salvar a visualização do exercício 6 em um tópico no Elastic </a>

[voltar ao ínicio](#inicio)

In [42]:
topic_elastic = obitos_confirmados_visualizacao.write\
                                               .format('csv')\
                                               .option("inferSchema", "true")\
                                               .option("header","true")\
                                               .save("/user/hive/warehouse/elastic_search/elastic.csv")

In [43]:
#verificando a criação do topico

!hdfs dfs -ls /user/hive/warehouse/elastic_search/elastic.csv

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-12-09 02:57 /user/hive/warehouse/elastic_search/elastic.csv/_SUCCESS
-rw-r--r--   2 root supergroup         78 2022-12-09 02:57 /user/hive/warehouse/elastic_search/elastic.csv/part-00000-35e349d8-e357-4ecf-9dfd-6100b1bfaf7f-c000.csv


**Inicio da sessão no terminal:**  

spark-shell --jars /opt/spark/jars/elasticsearch-spark_2.10-2.4.1.jar \  
        --conf spark.es.nodes="YOUR-IP-ELASTICSEARCH" \  
        --conf spark.es.port="9200" \  
        --conf spark.es.index.auto.create=true \  
        --conf spark.es.nodes.discovery=false  


import org.elasticsearch.spark.sql._  
val topic_elastic =   spark.read.option("inferSchema",true).option("header",true).csv("/user/hive/warehouse/elastic_search/elastic.csv")  
topic_elastic.show()  
topic_elastic.saveToEs("topico_elastic")  

### <a name="questao09"> Questão 9 - Criar um dashboard no Elastic para visualização dos novos dados enviados </a>

[voltar ao ínicio](#inicio)

In [None]:
Localhost:9200
    
    {
  "name" : "node1",
  "cluster_name" : "my_cluster",
  "cluster_uuid" : "tcDNvSItQjWpcx-K4ij7Jg",
  "version" : {
    "number" : "7.9.2",
    "build_flavor" : "default",
    "build_type" : "docker",
    "build_hash" : "d34da0ea4a966c4e49417f2da2f244e3e97b4e6e",
    "build_date" : "2020-09-23T00:45:33.626720Z",
    "build_snapshot" : false,
    "lucene_version" : "8.6.2",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
        
        

## Iniciar sessão com atributos abaixo

spark-shell --jars /opt/spark/jars/elasticsearch-spark_2.10-2.4.1.jar \
        --conf spark.es.nodes="YOUR-IP-ELASTICSEARCH" \
        --conf spark.es.port="9200" \
        --conf spark.es.index.auto.create=true \
        --conf spark.es.nodes.discovery=false

##Salvar view no Elastic

import org.elasticsearch.spark.sql._
val view3 = spark.read.option("inferSchema",true).option("header",true).csv("/user/hive/warehouse/elastic_search/view3_elastic.csv")
view3.show()
view3.saveToEs("view3_elastic")

## <a name="avancado"> Nivel Avançado </a>

Após encerrar as atividades, devemos dar um shutdown no arquivo, por que devemos uma unica sessão ativa do Spark.
Atenção, poderão dar erros nas aberturas de novas sessões.
