# Desafio: Consumo de Dados para Previsão do Tempo das Cidades do Vale do Paraíba.

## Objetivo

Avaliar conhecimentos nas linguagens Python e SQL e na engine de processamento Apache Spark.

## Descrição

Neste desafio, você desenvolverá um notebook que será responsável por extrair dados de previsão do tempo das cidades do Vale do Paraíba, região onde se localiza a Dataside. Para consultar todas as cidades dessa região, utilizaremos a API do IBGE. No caso, basta realizar uma requisição HTTP com o método GET, utilizando a URL abaixo:

```
https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios
```

Com esses dados, gerar um data frame e a partir dele uma temp view. Ex: "cities"

Utilizando os nomes das cidades, deverão ser consultados os dados de previsão de tempo para cada cidade. Para realizar essa consulta, poderá ser utilizada qualquer uma das APIs informadas no link abaixo.

[Public APIs - Wather](https://github.com/public-apis/public-apis#weather)

Obs.: Para algumas, pode ser necessário cadastrar-se para acessar sua API Key. Mas nenhuma delas deve precisar cadastrar cartão de crédito ou adicionar qualquer valor monetário para utilizar. Caso alguma solicite, basta optar por outra.

Com os dados consultados, gerar um data frame e partir dele outra temp view. Ex: "forecasts"

Com as temp views geradas, utilizar Spark SQL para criar queries e gerar data frames das seguintes tabelas:

- Tabela 1: dados de previsão do tempo para os próximos cinco dias, para cada data e cidade consultadas. As colunas dessa tabela serão:
    - Cidade
    - CodigoDaCidade
    - Data
    - Regiao
    - Pais
    - Latitude
    - Longigute
    - TemperaturaMaxima
    - TemperaturaMinima
    - TemperaturaMedia
    - VaiChover
    - ChanceDeChuva
    - CondicaoDoTempo
    - NascerDoSol
    - PorDoSol
    - VelocidadeMaximaDoVento
    
    Obs.: Os valores da coluna "VaiChover" deverá ser "Sim" ou "Não". E a coluna "CodigoDaCidade" é o ID retornado junto com os nomes da cidades na API do IBGE.
    Obs.: Dependendo da API utilizada, algumas colunas podem não existir e ficarão em branco. Você deve optar por uma API que traga o maior número de informações possível.

- Tabela 2: quantidade de dias com chuva e sem chuva para os dias consultados, para cada data consultada. Colunas:
    - Cidade
    - QtdDiasVaiChover
    - QtdDiasNaoVaiChover
    - TotalDiasMapeados

Essas tabelas deverão ser exportadas em formado CSV e entregue no final do desafio.

## To Do

[ ] - Consultar municípios do Vale do Paraíba, gerar um data frame e criar uma temp view com esses dados.
[ ] - Consultar dados do tempo para cada município, gerar um data frame e criar uma outra temp view.
[ ] - Utilizar Spark SQL para gerar os data frames das Tabelas 1 e 2.
[ ] - Exportar os data frames para CSV.

## Atenção

- Existe um limite de requisições de 10000 requests por conta cadastrada na m3o.
- Essa API pode retornar cidades de outras regiões que possuem nome semelhante a alguma cidade do Vale do Paraiba. Pode mantê-las ou filtrar para gerar as tabelas apenas com dados de Regiao = Sao Paulo. Fica a seu critério.

## Entregando o desafio

Concluindo todos os passos informados em To Do, basta salvar o arquivo .ipynb do notebook e enviar para a Dataside juntamente com os CSVs das duas tabelas.


In [1]:
import findspark
findspark.init()

import requests
import json
import unidecode
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()

In [2]:
# Buscar cidades do Vale do Paraíba
# TODO

url = "https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios"


def get_location():
    request = requests.get(url)
    return request.content


# Criar data frame com as cidades
# TODO

cities_temp = pd.DataFrame(eval(get_location()))

# Criar view com as cidades
# TODO

cities = cities_temp[['id', 'nome']]
cities_spk = spark.createDataFrame(cities)
cities_spk.createOrReplaceTempView('cities_spk')
cities_spk.show()

+-------+------------------+
|     id|              nome|
+-------+------------------+
|3502507|         Aparecida|
|3503158|            Arapeí|
|3503505|            Areias|
|3504909|           Bananal|
|3508504|          Caçapava|
|3508603|Cachoeira Paulista|
|3509700|  Campos do Jordão|
|3509957|             Canas|
|3510500|     Caraguatatuba|
|3513405|          Cruzeiro|
|3513603|             Cunha|
|3518404|     Guaratinguetá|
|3520202|           Igaratá|
|3520400|          Ilhabela|
|3524402|           Jacareí|
|3524907|          Jambeiro|
|3526308|          Lagoinha|
|3526605|         Lavrinhas|
|3527207|            Lorena|
|3531704|   Monteiro Lobato|
+-------+------------------+
only showing top 20 rows



In [3]:
# Buscar previsão do tempo para as cidades
# TODO

def get_forecasts(url):
    request = requests.get(url)
    return request

def pipeline_forecasts():
        temp_forecast_location = pd.DataFrame(forecasts_temp['location'][~forecasts_temp['location'].isnull()]).transpose().drop(columns=['localtime_epoch', 'localtime'])
        temp_forecast_forecast = pd.DataFrame(forecasts_temp['forecast'][~forecasts_temp['forecast'].isnull()]).transpose()

        daycast = pd.DataFrame(temp_forecast_forecast['forecastday'].iloc[0]).reset_index(drop=True)
        forecast_local = pd.DataFrame.from_dict(dict(daycast['day']), orient='index').reset_index(drop=True)
        forecast_local['condition'] = forecast_local['condition'].apply(lambda x: x.get('text'))
        astrocast = pd.DataFrame.from_dict(dict(daycast['astro']), orient='index').reset_index(drop=True)
        f_result = pd.concat([temp_forecast_location, daycast['date'], forecast_local, astrocast], axis=1)
        f_result.fillna(method="ffill", inplace=True)
        f_result.fillna(method="bfill", inplace=True)

        return f_result

# Criar data frame com as previsões
# TODO

API_KEY = "eacb618dcb074af4b3c203226230701"

cities_list = list(cities.nome.unique())

url_forecasts = f"https://api.weatherapi.com/v1/forecast.json?q={cities_list[0]}&key={API_KEY}&lang=pt_br&days=5"
        
forecasts_temp = pd.DataFrame(eval(get_forecasts(url_forecasts).content))

forecasts = pipeline_forecasts().copy()

for city in cities_list[1:]:    
    try:    
        url_forecasts = f"https://api.weatherapi.com/v1/forecast.json?q={city}&key={API_KEY}&lang=pt_br"        
        forecasts_temp = pd.DataFrame(eval(get_forecasts(url_forecasts).content))        
        forecasts = pd.concat([forecasts, pipeline_forecasts()], sort=False).reset_index(drop=True)            
    except Exception as err:        
        print(f'{city} não foi encontrado!')
        

# Criar view com as previsões
# TODO

forecasts_spk = spark.createDataFrame(forecasts)
forecasts_spk.createOrReplaceTempView('forecasts_spk')
forecasts_spk.show()

Guaratinguetá não foi encontrado!
Igaratá não foi encontrado!
Potim não foi encontrado!
+------------------+----------------+----------+------+------+------------------+----------+---------+---------+---------+-----------+-----------+--------------+--------------+------------+-----------+------------------+--------------------+--------------------+--------+--------+
|              name|          region|   country|   lat|   lon|             tz_id|      date|maxtemp_c|mintemp_c|avgtemp_c|maxwind_mph|maxwind_kph|totalprecip_mm|totalprecip_in|totalsnow_cm|avghumidity|daily_will_it_rain|daily_chance_of_rain|           condition| sunrise|  sunset|
+------------------+----------------+----------+------+------+------------------+----------+---------+---------+---------+-----------+-----------+--------------+--------------+------------+-----------+------------------+--------------------+--------------------+--------+--------+
|         Aparecida|       Sao Paulo|    Brazil|-22.83|-45.23| Americ

In [4]:
# Criar DF da Tabela 1
# TODO
from pyspark.sql.types import StringType

table_1_temp = spark.sql('''SELECT f.name as Cidade,
          c.id as CodigoDaCidade, 
          f.date as Data, 
          f.region as Regiao, 
          f.country as Pais, 
          f.lat as Latitude, 
          f.lon as Longitude, 
          f.maxtemp_c as TemperaturaMaxima, 
          f.mintemp_c as TemperaturaMinima, 
          f.avgtemp_c as TemperaturaMedia, 
          f.daily_will_it_rain as VaiChover, 
          f.daily_chance_of_rain as ChanceDeChuva, 
          f.condition as CondicaoDoTempo, 
          f.sunrise as NascerDoSol, 
          f.sunset as PorDoSol, 
          f.maxwind_mph as VelocidadeMaximaDoVento
          FROM forecasts_spk as f
          LEFT JOIN cities_spk as c 
          WHERE f.name = c.nome''')

table_1_temp_2 = table_1_temp.withColumn("VaiChover", table_1_temp.VaiChover.cast(StringType()))
table_1 = table_1_temp_2.replace(['1.0', '0.0'], ['Sim', 'Não'], 'VaiChover')

table_1.show()

+------------------+--------------+----------+----------+----------+--------+---------+-----------------+-----------------+----------------+---------+-------------+--------------------+-----------+--------+-----------------------+
|            Cidade|CodigoDaCidade|      Data|    Regiao|      Pais|Latitude|Longitude|TemperaturaMaxima|TemperaturaMinima|TemperaturaMedia|VaiChover|ChanceDeChuva|     CondicaoDoTempo|NascerDoSol|PorDoSol|VelocidadeMaximaDoVento|
+------------------+--------------+----------+----------+----------+--------+---------+-----------------+-----------------+----------------+---------+-------------+--------------------+-----------+--------+-----------------------+
|         Aparecida|       3502507|2023-01-11| Sao Paulo|    Brazil|  -22.83|   -45.23|             27.0|             14.3|            20.2|      Sim|         86.0|       Moderate rain|   05:27 AM|06:53 PM|                    4.7|
|         Aparecida|       3502507|2023-01-12| Sao Paulo|    Brazil|  -22.83

In [5]:
# Criar DF da Tabela 2
# TODO

table_2 = spark.sql('''SELECT name as Cidade,
          sum(case daily_will_it_rain when '1' then 1 else 0 end) as QtdDiasVaiChover,
          sum(case daily_will_it_rain when '0' then 1 else 0 end) as QtdDiasNaoVaiChover,
          count(daily_will_it_rain) as TotalDiasMapeados
          FROM forecasts_spk
          GROUP BY Cidade''')

table_2.show()

+--------------------+----------------+-------------------+-----------------+
|              Cidade|QtdDiasVaiChover|QtdDiasNaoVaiChover|TotalDiasMapeados|
+--------------------+----------------+-------------------+-----------------+
|           Paraibuna|               2|                  0|                2|
|Sao Luiz Do Parai...|               2|                  0|                2|
|           Aparecida|               6|                  0|                6|
|              Arapeí|               2|                  0|                2|
|        Santa Branca|               2|                  0|                2|
|Santo Antonio Do ...|               2|                  0|                2|
|     Pindamonhangaba|               2|                  0|                2|
| Natividade Da Serra|               2|                  0|                2|
|  Cachoeira Paulista|               2|                  0|                2|
|Monte Da Serra Da...|               2|                  0|     

In [10]:
# Exportar CSVs
# TODO
table_1.toPandas().to_csv("./output/table_1.csv", header=True, index=False)
table_2.toPandas().to_csv("./output/table_2.csv", header=True, index=False)