<a href="https://colab.research.google.com/github/railanderreis/previsaoTempo/blob/main/previsao_tempo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Desafio: Consumo de Dados para Previsão do Tempo das Cidades do Vale do Paraíba.

## Objetivo

Avaliar conhecimentos nas linguagens Python e SQL e na engine de processamento Apache Spark.

## Descrição

Neste desafio, você desenvolverá um notebook que será responsável por extrair dados de previsão do tempo das cidades do Vale do Paraíba, região onde se localiza a Dataside. Para consultar todas as cidades dessa região, utilizaremos a API do IBGE. No caso, basta realizar uma requisição HTTP com o método GET, utilizando a URL abaixo:

```
https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios
```

Com esses dados, gerar um data frame e a partir dele uma temp view. Ex: "cities"

Utilizando os nomes das cidades, deverão ser consultados os dados de previsão de tempo para cada cidade. Para realizar essa consulta, poderá ser utilizada qualquer uma das APIs informadas no link abaixo.

[Public APIs - Wather](https://github.com/public-apis/public-apis#weather)

Obs.: Para algumas, pode ser necessário cadastrar-se para acessar sua API Key. Mas nenhuma delas deve precisar cadastrar cartão de crédito ou adicionar qualquer valor monetário para utilizar. Caso alguma solicite, basta optar por outra.

Com os dados consultados, gerar um data frame e partir dele outra temp view. Ex: "forecasts"

Com as temp views geradas, utilizar Spark SQL para criar queries e gerar data frames das seguintes tabelas:

- Tabela 1: dados de previsão do tempo para os próximos cinco dias, para cada data e cidade consultadas. As colunas dessa tabela serão:
    - Cidade
    - CodigoDaCidade
    - Data
    - Regiao
    - Pais
    - Latitude
    - Longigute
    - TemperaturaMaxima
    - TemperaturaMinima
    - TemperaturaMedia
    - VaiChover
    - ChanceDeChuva
    - CondicaoDoTempo
    - NascerDoSol
    - PorDoSol
    - VelocidadeMaximaDoVento
    
    Obs.: Os valores da coluna "VaiChover" deverá ser "Sim" ou "Não". E a coluna "CodigoDaCidade" é o ID retornado junto com os nomes da cidades na API do IBGE.
    Obs.: Dependendo da API utilizada, algumas colunas podem não existir e ficarão em branco. Você deve optar por uma API que traga o maior número de informações possível.

- Tabela 2: quantidade de dias com chuva e sem chuva para os dias consultados, para cada data consultada. Colunas:
    - Cidade
    - QtdDiasVaiChover
    - QtdDiasNaoVaiChover
    - TotalDiasMapeados

Essas tabelas deverão ser exportadas em formado CSV e entregue no final do desafio.

## To Do

[ ] - Consultar municípios do Vale do Paraíba, gerar um data frame e criar uma temp view com esses dados.
[ ] - Consultar dados do tempo para cada município, gerar um data frame e criar uma outra temp view.
[ ] - Utilizar Spark SQL para gerar os data frames das Tabelas 1 e 2.
[ ] - Exportar os data frames para CSV.

## Atenção

- Existe um limite de requisições de 10000 requests por conta cadastrada na m3o.
- Essa API pode retornar cidades de outras regiões que possuem nome semelhante a alguma cidade do Vale do Paraiba. Pode mantê-las ou filtrar para gerar as tabelas apenas com dados de Regiao = Sao Paulo. Fica a seu critério.

## Entregando o desafio

Concluindo todos os passos informados em To Do, basta salvar o arquivo .ipynb do notebook e enviar para a Dataside juntamente com os CSVs das duas tabelas.


In [None]:
# Instalando o PySpark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m16.8 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=932409109c438c405572e8c9c32f76dac9ca9887f142258fc1a3aae80859cd43
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [None]:
!pip install unidecode

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting unidecode
  Downloading Unidecode-1.3.6-py3-none-any.whl (235 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/235.9 KB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.9/235.9 KB[0m [31m15.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: unidecode
Successfully installed unidecode-1.3.6


In [None]:
# Importando a Lib
from pyspark.sql import SparkSession
import pyspark.pandas as ps
import requests
import json
import unidecode
import os
import pandas as pd

from pyspark.sql.functions import *

# Ignorando avisos
import warnings
warnings.filterwarnings("ignore")

spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()



Estou utilizando a api do do site open weather map.

In [None]:
# Buscar cidades do Vale do Paraíba
params = {"accept": "application/json"}
url = "https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios"
data = requests.get(url, params=params).json()

os.makedirs('./data/raw/municipios/', exist_ok=True)
save_path ='data/raw/municipios/municipios.json'
with open(save_path, 'w', encoding='utf-8') as f:
    json.dump(data, f, ensure_ascii=False, indent=4)

df = spark.read.option('multiline', 'true').json('data/raw/municipios/municipios.json')
df_municipios = df.select(
    col("id").alias("id_cidade"),
    col("nome").alias("nome_cidade"),
    col("microrregiao.id").alias("id_microrregiao"),
    col("microrregiao.nome").alias("nome_microrregiao"),
    col("microrregiao.mesorregiao.id").alias("id_mesorregiao"),
    col("microrregiao.mesorregiao.nome").alias("nome_mesorregiao"),
    col("microrregiao.mesorregiao.UF.id").alias("id_UF"),
    col("microrregiao.mesorregiao.UF.sigla").alias("sigla_UF"),
    col("microrregiao.mesorregiao.UF.nome").alias("nome_UF"),
    col("microrregiao.mesorregiao.UF.regiao.id").alias("id_regiao"),
    col("microrregiao.mesorregiao.UF.regiao.sigla").alias("sigla_regiao"),
    col("microrregiao.mesorregiao.UF.regiao.nome").alias("nome_regiao"))

df_municipios.createOrReplaceTempView("municipios")

cities = df_municipios.select('nome_cidade') \
             .rdd.flatMap(lambda x: x).collect()

# Definindo as variaveis
state = "SP"
country_code = "BRA"
api_key = '52bf0898f7a845d2a533e2394ab7c9a9'

# Realizar download dos arquivos
for i in cities:
  url_api = f"http://api.openweathermap.org/data/2.5/forecast?q={i},BR-{state},{country_code}&appid={api_key}&lang=pt_br"
  data = requests.get(url_api, params=params).json()

  os.makedirs('./data/raw/cities/', exist_ok=True)

  save_path = f'data/raw/cities/{i}.json'
  with open(save_path, 'w', encoding='utf-8') as f:
      json.dump(data, f, ensure_ascii=False, indent=4)

# Criar data frame com as cidades
df_cities = spark.read.option('multiline', 'true').json('data/raw/cities/*.json')

df_cities = df_cities.select(
            col("city.id").alias("idCidade"),
            col("city.name").alias("nomeCidade"),
            col("list").alias("listao"),
            col("city.country").alias("pais"),
            col("city.coord.lat").alias("latitude"),
            col("city.coord.lon").alias("longetitude"),
            col("city.sunrise").alias("nascerDoSol"),
            col("city.sunset").alias("porDoSol"))

df_lista = df_cities.select('*',explode(df_cities.listao).alias("lista")) \
                          .drop("listao")

df_lista = df_lista.select('*',explode(df_lista.lista.weather).alias("tempoLista"))


# Essa parte tive que usa o pandas para poder remover os acentos da coluna "nomeCidade".
# Pois a api do IBGE tras os nomes das cidades sem acento.
# OBS: poderia ter usado a "API do Pandas - spark" mas da forma que usei de encode ainda nao esta implementada, por isso fiz dessa forma.
df_pd = df_lista.toPandas()
df_pd.nomeCidade = df_pd.nomeCidade.str.normalize('NFKD').str.encode('ascii', errors='ignore').str.decode('utf-8')
df_lista_sp = spark.createDataFrame(df_pd)
# --------------------------------------------

df_lista_final = df_lista_sp.select(
            col("idCidade"),
            col("nomeCidade"),
            col("pais"),
            col("latitude"),
            col("longetitude"),
            col("nascerDoSol"),
            col("porDoSol"),
            col("lista.dt_txt").alias("data"),
            col("lista.main.temp").alias("tempMed"),
            col("lista.main.temp").alias("tempMax"),
            col("lista.main.temp").alias("tempMin"),
            col("lista.clouds.all").alias("chanceChuva"),
            col("tempoLista.description").alias("tempo"),
            col("tempoLista.main").alias("vaiChover"),
            col("lista.wind.speed").alias("velMaxVento"))

# Criar view com as cidades
df_lista_final.createOrReplaceTempView("cities_api")


In [None]:
# Buscar previsão do tempo para as cidades
df_join = spark.sql("""SELECT * FROM cities_api
             FULL OUTER JOIN municipios ON cities_api.nomeCidade = municipios.nome_cidade
             ORDER BY municipios.id_cidade""")

# Criar data frame com as previsões
df_join = df_join.drop("idCidade","nome_cidade","nome_mesorregiao","id_mesorregiao", "id_microrregiao","sigla_regiao","nome_microrregiao","nome_UF","id_regiao","siga_regiao")

df_silver = df_join.withColumn("vaiChover", 
                                  when((col("vaiChover") == "Clear" ) | (col("vaiChover") == "Few clouds") | (col("vaiChover") == "Scattered clouds") | (col("vaiChover") == "Broken clouds") | (col("vaiChover") == "Snow") | (col("vaiChover") == "Mist") | (col("vaiChover") == "Clouds"),"Não")
                                  .when((col("vaiChover") == "Shower rain") | (col("vaiChover") == "Rain") | (col("vaiChover") == "Thunderstorm"), "Sim"))

df_silver_transf = df_silver.withColumn("tempMax", concat((round((col("tempMax")-273.15),2)).cast("double"),lit('°C'))) \
                  .withColumn("tempMin", concat((round((col("tempMin")-273.15),2)).cast("double"),lit('°C'))) \
                  .withColumn("tempMed", concat((round((col("tempMed")-273.15),2)).cast("double"),lit('°C'))) \
                  .withColumn("chanceChuva", concat(((col("chanceChuva"))).cast("double"),lit('%'))) \
                  .withColumn("nascerDoSol",from_unixtime(col("nascerDoSol"))) \
                  .withColumn("porDoSol",from_unixtime(col("porDoSol")))


In [None]:
# Criar DF da Tabela 1

df_gold = df_silver_transf[['id_cidade', 'nomeCidade', 'data','nome_regiao','pais','latitude','longetitude','tempMax','tempMin','tempMed','vaiChover','chanceChuva','nascerDoSol','porDoSol','velMaxVento']]

# Criar view com as previsões
df_gold.createOrReplaceTempView("cities_previsao")

# TODO

In [None]:
# Criar DF da Tabela 2

df_gold_sql = spark.sql("""SELECT nomeCidade AS cidade,COUNT(data) AS totalDiasMapeados,
             COUNT(CASE WHEN vaiChover = 'Não' THEN 'id_cidade' ELSE NULL END) AS QtdDiasNaoChover,
             COUNT(CASE WHEN vaiChover = 'Sim' THEN 'id_cidade' ELSE NULL END) AS QtdDiasVaiChover 
             FROM cities_previsao
             WHERE data LIKE '%12%'
             GROUP BY nomeCidade
             ORDER BY nomeCidade ASC
             """)

# OBS: utilizei o 'WHERE LIKE %12%', pois a API utilizada me retornava 5 dias de consultas
#      a cada 3 horas e como nao conseguir fazer um query mais adequada eu deixei do jeito que eu conseguir
#      trazer solicitado.
# TODO

In [None]:
# Exportar CSVs

# CSV dos dados formatados
df_gold.write.option("header",True) \
       .option('sep', ',') \
   .csv('data/gold/municipios.csv')

# CSV previsao do tempo.
df_gold_sql.write.option("header",True) \
              .option('sep', ',') \
              .option('mode','append') \
              .csv('./data/gold/previsaoTempo.csv')
# TODO