In [0]:
# Data Science com o Registro de Ocupação Hospitalar COVID-19
# https://opendatasus.saude.gov.br/dataset/registro-de-ocupacao-hospitalar

import requests
import json
import time
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import ArrayType, StructType, DateType, TimestampType
from pyspark.sql.functions import max, col, size, array_contains, map_keys, explode, explode_outer, to_timestamp

def normaliza(nested_df):
  # Script por Amit Rawat
  # https://intellipaat.com/community/13569/how-to-flatten-a-struct-in-a-spark-dataframe
  
  flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
  nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
  flat_df = nested_df.select(flat_cols +
                             [col(nc+'.'+c).alias(nc+'_'+c)
                              for nc in nested_cols
                              for c in nested_df.select(nc+'.*').columns]
                            )
  return flat_df

# Credências de acesso da API

usuario = "user-api-leitos"
senha = "aQbLL3ZStaTr38tj"

# Aquisição da base de dados

url = "https://elastic-leitos.saude.gov.br/leito_ocupacao/_search"
r = requests.post(
  url, 
  auth=(usuario, senha), 
  json={
    "size": 10000, 
    "query": {"match_all": {}}
  }
)

base = r.json()["hits"]["hits"]

f = []
for i in base:
  f.append(json.dumps(i))

base = spark.read.json(sc.parallelize(f))

# Pré processamento

base = normaliza(base) # Normaliza colunas
base = base.withColumn("_source_dataNotificacaoOcupacao", col('_source_dataNotificacaoOcupacao').cast(TimestampType())) # Corrige timestamp da data de notificação da ocupação

base.printSchema()
print(base.count())

In [0]:
### Qual a contagem de leitos ocupados por estado?

base \
  .groupBy(col("_source_estado")) \
  .count() \
  .select(col("_source_estado").alias("Estado"), col("count").alias("Leitos")) \
  .sort(col("Leitos").desc()) \
  .show(truncate=False)

In [0]:
### Qual a contagem de leitos ocupados por cidade?

base \
  .filter(col("_source_nomeCnes").isNotNull()) \
  .groupBy(col("_source_nomeCnes")) \
  .count() \
  .select(col("_source_nomeCnes").alias("Hospital"), col("count").alias("Leitos")) \
  .sort(col("Leitos").desc()) \
  .show(truncate=False)

In [0]:
### Qual a contagem de leitos ocupados por hospital?

base \
  .groupBy(col("_source_municipio")) \
  .count() \
  .select(col("_source_municipio").alias("Cidade"), col("count").alias("Leitos")) \
  .sort(col("Leitos").desc()) \
  .show(truncate=False)

In [0]:
### Quais os hospitais com leitos ocupados em Rolândia - PR

base \
  .filter(col("_source_municipio") == "Rolândia") \
  .groupBy(col("_source_nomeCnes")) \
  .count() \
  .select(col("_source_nomeCnes").alias("Hospital"), col("count").alias("Leitos")) \
  .show(truncate=False)