In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from kafka import KafkaProducer
from pyspark.streaming.kafka import KafkaUtils

In [None]:
dados = spark.read.csv("/user/lucio/covid_dados/", sep=";", header="true")


In [None]:
dados.printSchema()

In [None]:
dados.show(5)

In [None]:
df1 = dados.select(dados.Recuperadosnovos.alias("CasosRecuperados").cast("int"),
                  dados.emAcompanhamentoNovos.alias("EmAcompanhamento").cast("int"))\
            .filter(col("data")=="2021-07-06")\
            .filter(col("regiao")=="Brasil")

In [None]:
df1.show()

In [None]:
df1.write.mode("overwrite").saveAsTable("covid19.Visualizacao1")

In [None]:
spark.catalog.listDatabases()

In [None]:
spark.catalog.listTables(dbName="covid19")

In [None]:
df2 = dados.select(dados.casosAcumulado.alias("CasosConfirmados:Acumulado").cast("int"),
                  dados.casosNovos.alias("CasosNovos").cast("int"),
                  dados.populacaoTCU2019.alias("PopulaçãoTCU2019").cast("int"))\
            .filter(col("data")=="2021-07-06")\
            .filter(col("regiao")=="Brasil")

In [None]:
df2 = df2.withColumn("Incidência", (col("CasosConfirmados:Acumulado")/col("PopulaçãoTCU2019"))*100000)

In [None]:

df2 = df2.drop('PopulaçãoTCU2019')

In [None]:
df2.show()

In [None]:
df2.write.parquet("/user/hive/warehouse/covid19.db/Visualizacao2", mode="overwrite",compression="snappy")

In [None]:
df3 = dados.select(dados.casosAcumulado.alias("CasosConfirmados:Acumulado").cast("int"),
                  dados.obitosAcumulado.alias("Obitosacumulados").cast("int"),
                  dados.obitosNovos.alias("CasosNovos").cast("int"),
                  dados.populacaoTCU2019.alias("PopulaçãoTCU2019").cast("int"))\
            .filter(col("data")=="2021-07-06")\
            .filter(col("regiao")=="Brasil")

In [None]:
df3 = df3.withColumn("Letalidade", (col("Obitosacumulados")/col("CasosConfirmados:Acumulado"))*100)

In [None]:
df3 = df3.withColumn("Mortalidade", (col("Obitosacumulados")/col("PopulaçãoTCU2019"))*100000)

In [None]:

df3 = df3.drop('PopulaçãoTCU2019','CasosConfirmados:Acumulado')

In [None]:
df3.show()

In [None]:
df3.printSchema()

In [None]:
df4 = df3.withColumn("Obitosacumulados",col("Obitosacumulados").cast(StringType()))\
      .withColumn("CasosNovos",col("CasosNovos").cast(StringType()))\
      .withColumn("Letalidade",col("Letalidade").cast(StringType()))\
      .withColumn("Mortalidade",col("Mortalidade").cast(StringType()))

In [None]:
df4.printSchema()

In [None]:
df4.show()

In [None]:
topic_read = spark.read\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafka:9092")\
    .option("subscribe", "covid19")\
    .option("startingOffsets","earliest")\
    .load()

In [None]:
topic_string = topic_read.select(col("key").cast("string"), col("value").cast("string"))

In [None]:
topic_string.show()

In [None]:
producer = KafkaProducer(bootstrap_servers='kafka:9092')
producer.send('covid19', key=b'001', value=b"Novo Teste")

In [None]:
topic_string.show()

In [None]:
topic_string = topic_read.select(col("key").cast("string"), col("value").cast("string"))

In [None]:
topic_string.show()


In [None]:
visao3 = [('ObitosConfirmados', df4.collect()[0][0]), ("NovosObitosConfirmados",df4.collect()[0][1]), ("Letalidade",df4.collect()[0][2]), ("Mortalidade",df4.collect()[0][3])]

In [None]:
producer.send('covid19', key=b'300', value= visao3[0][0].encode())
producer.send('covid19', key=b'301', value= visao3[0][1].encode())
producer.send('covid19', key=b'302', value= visao3[1][0].encode())
producer.send('covid19', key=b'303', value= visao3[1][1].encode())
producer.send('covid19', key=b'304', value= visao3[2][0].encode())
producer.send('covid19', key=b'305', value= visao3[2][1].encode())
producer.send('covid19', key=b'306', value= visao3[3][0].encode())
producer.send('covid19', key=b'307', value= visao3[3][1].encode())



In [None]:
topic_string.show()

In [None]:
spark.sql("CREATE DATABASE dadoscovidspark")


In [None]:
spark.sql("USE dadoscovidspark")

In [None]:
# Criar a tabela
spark.sql("CREATE TABLE dadoscovidspark (regiao STRING, estado STRING, municipio STRING, coduf INT, codmun INT,codRegiaoSaude INT, nomeRegiaoSaude STRING, data DATE, semanaEpi STRING,populacaoTCU2019 INT, casoAcumulado INT, casosNovos INT, obitosAcumulado INT, obitosNovos INT, Recuperadosnovos INT,emAcompanhamentoNovos INT, interior_metropolitana STRING)")

In [None]:
spark.catalog.listDatabases()

In [None]:
spark.catalog.listTables()

In [None]:
spark.catalog.listColumns("dadoscovidspark")

In [None]:
esquema = "regiao STRING, estado STRING, municipio STRING, coduf INT, codmun INT,codRegiaoSaude INT, nomeRegiaoSaude STRING, data DATE, semanaEpi STRING,populacaoTCU2019 INT, casoAcumulado INT, casosNovos INT, obitosAcumulado INT, obitosNovos INT, Recuperadosnovos INT,emAcompanhamentoNovos INT, interior_metropolitana STRING"

In [None]:
dados_cov = spark.read.csv("/user/lucio/covid_dados", sep=";", schema=esquema, header="false")

In [None]:
dados_cov.show(10)

In [None]:
dados_cov.write.saveAsTable("dadoscovidspark_tb", mode="overwrite")

In [None]:
dados_cov_spark = spark.sql("SELECT regiao,casoAcumulado, obitosAcumulado, data FROM dadoscovidspark_tb WHERE data ='2021-07-06' ORDER BY casoAcumulado ASC")

In [None]:
dados_cov_spark.groupBy('regiao').max().show()