### 2. Otimizar todos os dados do hdfs para uma tabela Hive particionada por município.

In [415]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [416]:
covid_br = spark.read.csv("/user/aluno/lidia/projeto_spark/*.csv", sep = ";", header = "true")

In [None]:
covid_br.write.mode("overwrite").partitionBy("municipio").saveAsTable("covid_br_municipio")

In [12]:
!hdfs dfs -ls /user/hive/warehouse/covid_br_municipio | head -5

Found 5299 items
-rw-r--r--   2 root supergroup          0 2021-07-12 04:38 /user/hive/warehouse/covid_br_municipio/_SUCCESS
drwxr-xr-x   - root supergroup          0 2021-07-12 04:34 /user/hive/warehouse/covid_br_municipio/municipio=Abadia de Goiás
drwxr-xr-x   - root supergroup          0 2021-07-12 04:34 /user/hive/warehouse/covid_br_municipio/municipio=Abadia dos Dourados
drwxr-xr-x   - root supergroup          0 2021-07-12 04:34 /user/hive/warehouse/covid_br_municipio/municipio=Abadiânia


### 3. Criar as 3 vizualizações pelo Spark com os dados enviados para o HDFS:

#### Visualização 1

In [418]:
recuperados = covid_br.withColumn("casos_recuperados", col("Recuperadosnovos").cast(IntegerType()))\
.withColumn("recuperados_em_acompanhamento", col("emAcompanhamentoNovos").cast(IntegerType()))

In [577]:
recuperados_visualizacao = recuperados.groupBy("regiao", "data")\
.agg(sum("casos_recuperados").alias("casos_recuperados"),\
     sum("recuperados_em_acompanhamento").alias("em_acompanhamento"))\
.where(col("regiao") == "Brasil").where(col("data") == "2021-07-06")

In [578]:
recuperados_visualizacao.select(col("casos_recuperados"), col("em_acompanhamento")).show()

+-----------------+-----------------+
|casos_recuperados|em_acompanhamento|
+-----------------+-----------------+
|         17262646|          1065477|
+-----------------+-----------------+



#### Visualização 2

In [421]:
confirmados = covid_br.withColumn("casos_acumulados", col("casosAcumulado").cast(IntegerType()))\
.withColumn("casos_novos", col("casosNovos").cast(IntegerType()))\
.withColumn("incidencia", col("casosAcumulado")/(col("populacaoTCU2019")/100000).cast(FloatType()))

In [437]:
confirmados_visualizacao = confirmados.groupBy("regiao", "data")\
.agg(format_number(sum("casos_acumulados"), 0).alias("casos_acumulados")\
     , format_number(sum("casos_novos"), 0).alias("casos_novos")\
     , format_number(sum("incidencia"), 1).alias("incidencia"))\
.where(col("regiao") == "Brasil").where(col("data") == "2021-07-06")

In [438]:
print("CASOS CONFIRMADOS")
confirmados_visualizacao.select(col("casos_acumulados"), col("casos_novos"), col("incidencia")).show()

CASOS CONFIRMADOS
+----------------+-----------+----------+
|casos_acumulados|casos_novos|incidencia|
+----------------+-----------+----------+
|      18,855,015|     62,504|   8,972.3|
+----------------+-----------+----------+



#### Visualização 3

In [424]:
obitos = covid_br.withColumn("obitos_acumulados", col("obitosAcumulado").cast(IntegerType()))\
.withColumn("obitos_novos", col("obitosNovos").cast(IntegerType()))\
.withColumn("letalidade", ((col("obitos_acumulados")/col("casosAcumulado"))*100).cast(FloatType()))\
.withColumn("mortalidade", col("obitos_acumulados")/(col("populacaoTCU2019")/100000).cast(FloatType()))

In [579]:
obitos_visualizacao = obitos.groupBy("regiao", "data")\
.agg(format_number(sum("obitos_acumulados"), 0).alias("obitos_acumulados")\
     , format_number(sum("obitos_novos"), 0).alias("obitos_novos")\
     , format_number(sum("letalidade"), 1).alias("letalidade")\
     , format_number(sum("mortalidade"), 1).alias("mortalidade"))\
.where(col("data") == "2021-07-06")

In [581]:
print("OBITOS CONFIRMADOS")
obitos_visualizacao.select(col("obitos_acumulados"), col("obitos_novos"), col("letalidade"), col("mortalidade")).where(col("regiao") == "Brasil").show()

OBITOS CONFIRMADOS
+-----------------+------------+----------+-----------+
|obitos_acumulados|obitos_novos|letalidade|mortalidade|
+-----------------+------------+----------+-----------+
|          526,892|       1,780|       2.8|      250.7|
+-----------------+------------+----------+-----------+



### 4. Salvar a primeira visualização como tabela Hive

In [14]:
recuperados_visualizacao.write.mode("overwrite").format("orc").saveAsTable("recuperados_covid")

In [16]:
!hdfs dfs -ls /user/hive/warehouse/recuperados_covid | head -5

Found 201 items
-rw-r--r--   2 root supergroup          0 2021-07-12 17:00 /user/hive/warehouse/recuperados_covid/_SUCCESS
-rw-r--r--   2 root supergroup        795 2021-07-12 16:59 /user/hive/warehouse/recuperados_covid/part-00000-65f09cbf-eb10-45ea-b112-775c54148fd1-c000.snappy.orc
-rw-r--r--   2 root supergroup        788 2021-07-12 16:59 /user/hive/warehouse/recuperados_covid/part-00001-65f09cbf-eb10-45ea-b112-775c54148fd1-c000.snappy.orc
-rw-r--r--   2 root supergroup        809 2021-07-12 16:59 /user/hive/warehouse/recuperados_covid/part-00002-65f09cbf-eb10-45ea-b112-775c54148fd1-c000.snappy.orc


### 5. Salvar a segunda visualização com formato parquet e compressão snappy

In [18]:
confirmados_visualizacao.write.parquet("/user/aluno/lidia/projeto_spark/confirmados_covid", compression="snappy")

In [19]:
!hdfs dfs -ls /user/aluno/lidia/projeto_spark/confirmados_covid | head -5

Found 201 items
-rw-r--r--   2 root supergroup          0 2021-07-12 17:06 /user/aluno/lidia/projeto_spark/confirmados_covid/_SUCCESS
-rw-r--r--   2 root supergroup       1880 2021-07-12 17:06 /user/aluno/lidia/projeto_spark/confirmados_covid/part-00000-84692cf8-efcb-4a42-af27-d99bcd8bdccf-c000.snappy.parquet
-rw-r--r--   2 root supergroup       1875 2021-07-12 17:06 /user/aluno/lidia/projeto_spark/confirmados_covid/part-00001-84692cf8-efcb-4a42-af27-d99bcd8bdccf-c000.snappy.parquet
-rw-r--r--   2 root supergroup       1880 2021-07-12 17:06 /user/aluno/lidia/projeto_spark/confirmados_covid/part-00002-84692cf8-efcb-4a42-af27-d99bcd8bdccf-c000.snappy.parquet


### 6. Salvar a terceira visualização em um tópico no Kafka

In [586]:
obitos_visualizacao.selectExpr("CAST(regiao AS STRING) AS key", "to_json(struct(*)) AS value")\
.write\
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092")\
.option("topic","obitos_vi") \
.save()

### 7. Criar a visualização pelo Spark com os dados enviados para o HDFS:

In [375]:
sintese_casos = covid_br.withColumn("casos_acumulados", col("casosAcumulado").cast(IntegerType()))\
.withColumn("obitos_acumulados", col("obitosAcumulado").cast(IntegerType()))\
.withColumn("populacao", col("populacaoTCU2019").cast(IntegerType()))\

In [412]:
sintese_visualizacao = sintese_casos.groupBy("data", "regiao", "codmun")\
.agg(format_number(sum("casos_acumulados"), 0).alias("casos")\
     , format_number(sum("obitos_acumulados"), 0).alias("obitos")\
     , format_number(sum("populacao"), 0).alias("populacao")\
     , format_number((sum("casos_acumulados")/(sum("populacao") / 100000)), 1).alias("incidencia")\
     , format_number((sum("obitos_acumulados")/(sum("populacao") / 100000)), 1).alias("mortalidade"))\
.where(col("codmun").isNull()).where(col("data") == "2021-07-06")\

In [427]:
print("Sintese de casos, obitos, incidencia e mortalidade")
sintese_visualizacao.select(col("regiao"), col("casos"), col("obitos"), col("incidencia"), col("mortalidade"), col("data")).show()

Sintese de casos, obitos, incidencia e mortalidade
+------------+----------+-------+----------+-----------+----------+
|      regiao|     casos| obitos|incidencia|mortalidade|      data|
+------------+----------+-------+----------+-----------+----------+
|      Brasil|18,855,015|526,892|   8,972.3|      250.7|2021-07-06|
|     Sudeste| 7,138,803|245,311|   8,078.2|      277.6|2021-07-06|
|         Sul| 3,611,041| 80,705|  12,046.4|      269.2|2021-07-06|
|Centro-Oeste| 1,916,619| 49,207|  11,760.5|      301.9|2021-07-06|
|    Nordeste| 4,455,737|107,824|   7,807.3|      188.9|2021-07-06|
|       Norte| 1,732,815| 43,845|   9,401.6|      237.9|2021-07-06|
+------------+----------+-------+----------+-----------+----------+



## 8. Salvar a visualização do exercício 6 em um tópico no Elastic 

***em andamento ***

In [647]:
topic_read = spark.read.format("kafka").option("kafka.bootstrap.servers","kafka:9092").option("subscribe","obitos_vi").load()

In [648]:
topic_read.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [635]:
topic_string = topic_read.select(col("value").cast("string"))
topic_string.show()

+--------------------+
|               value|
+--------------------+
|{"regiao":"Centro...|
|{"regiao":"Sul","...|
|{"regiao":"Nordes...|
|{"regiao":"Sudest...|
|{"regiao":"Norte"...|
|{"regiao":"Brasil...|
+--------------------+

