# Desafio: Consumo de Dados para Previsão do Tempo das Cidades do Vale do Paraíba.

## Objetivo

Avaliar conhecimentos nas linguagens Python e SQL e na engine de processamento Apache Spark.

## Descrição

Neste desafio, você desenvolverá um notebook que será responsável por extrair dados de previsão do tempo das cidades do Vale do Paraíba, região onde se localiza a Dataside. Para consultar todas as cidades dessa região, utilizaremos a API do IBGE. No caso, basta realizar uma requisição HTTP com o método GET, utilizando a URL abaixo:

```
https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios
```

Com esses dados, gerar um data frame e a partir dele uma temp view. Ex: "cities"

Utilizando os nomes das cidades, deverão ser consultados os dados de previsão de tempo para cada cidade. Para realizar essa consulta, poderá ser utilizada qualquer uma das APIs informadas no link abaixo.

[Public APIs - Wather](https://github.com/public-apis/public-apis#weather)

Obs.: Para algumas, pode ser necessário cadastrar-se para acessar sua API Key. Mas nenhuma delas deve precisar cadastrar cartão de crédito ou adicionar qualquer valor monetário para utilizar. Caso alguma solicite, basta optar por outra.

Com os dados consultados, gerar um data frame e partir dele outra temp view. Ex: "forecasts"

Com as temp views geradas, utilizar Spark SQL para criar queries e gerar data frames das seguintes tabelas:

- Tabela 1: dados de previsão do tempo para os próximos cinco dias, para cada data e cidade consultadas. As colunas dessa tabela serão:
    - Cidade
    - CodigoDaCidade
    - Data
    - Regiao
    - Pais
    - Latitude
    - Longigute
    - TemperaturaMaxima
    - TemperaturaMinima
    - TemperaturaMedia
    - VaiChover
    - ChanceDeChuva
    - CondicaoDoTempo
    - NascerDoSol
    - PorDoSol
    - VelocidadeMaximaDoVento
    
    Obs.: Os valores da coluna "VaiChover" deverá ser "Sim" ou "Não". E a coluna "CodigoDaCidade" é o ID retornado junto com os nomes da cidades na API do IBGE.
    Obs.: Dependendo da API utilizada, algumas colunas podem não existir e ficarão em branco. Você deve optar por uma API que traga o maior número de informações possível.

- Tabela 2: quantidade de dias com chuva e sem chuva para os dias consultados, para cada data consultada. Colunas:
    - Cidade
    - QtdDiasVaiChover
    - QtdDiasNaoVaiChover
    - TotalDiasMapeados

Essas tabelas deverão ser exportadas em formado CSV e entregue no final do desafio.

## To Do

[ ] - Consultar municípios do Vale do Paraíba, gerar um data frame e criar uma temp view com esses dados.
[ ] - Consultar dados do tempo para cada município, gerar um data frame e criar uma outra temp view.
[ ] - Utilizar Spark SQL para gerar os data frames das Tabelas 1 e 2.
[ ] - Exportar os data frames para CSV.

## Atenção

- Existe um limite de requisições de 10000 requests por conta cadastrada na m3o.
- Essa API pode retornar cidades de outras regiões que possuem nome semelhante a alguma cidade do Vale do Paraiba. Pode mantê-las ou filtrar para gerar as tabelas apenas com dados de Regiao = Sao Paulo. Fica a seu critério.

## Entregando o desafio

Concluindo todos os passos informados em To Do, basta salvar o arquivo .ipynb do notebook e enviar para a Dataside juntamente com os CSVs das duas tabelas.


In [56]:
# Importar bibliotecas

import findspark
findspark.init()

import os
import requests
import urllib3
import ssl
import json
import unidecode
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()

In [10]:
class CustomHttpAdapter (requests.adapters.HTTPAdapter):
    # "Transport adapter" that allows us to use custom ssl_context.

    def __init__(self, ssl_context=None, **kwargs):
        self.ssl_context = ssl_context
        super().__init__(**kwargs)

    def init_poolmanager(self, connections, maxsize, block=False):
        self.poolmanager = urllib3.poolmanager.PoolManager(
            num_pools=connections, maxsize=maxsize,
            block=block, ssl_context=self.ssl_context)


def get_legacy_session():
    ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
    ctx.options |= 0x4  # OP_LEGACY_SERVER_CONNECT
    session = requests.session()
    session.mount('https://', CustomHttpAdapter(ctx))
    return session

In [74]:
# Buscar cidades do Vale do Paraíba
# TODO
IBGE_API = 'https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios'
REQ_CITIES = get_legacy_session().get(IBGE_API)

In [83]:
# Criar data frame com as cidades
# TODO
CITIES_REQ = REQ_CITIES.json()
CITIES_FILE = "./data/cities/cities.json"
os.makedirs(os.path.dirname(CITIES_FILE), exist_ok=True)
with open(CITIES_FILE, 'w') as f:
  json.dump(CITIES_REQ, f)

cities = spark.read.json(CITIES_FILE)

In [84]:
# Criar view com as cidades
# TODO
cities.show()

+-------+--------------------+------------------+--------------------+
|     id|        microrregiao|              nome|     regiao-imediata|
+-------+--------------------+------------------+--------------------+
|3502507|{35051, {{35, São...|         Aparecida|{350052, Guaratin...|
|3503158|{35052, {{35, São...|            Arapeí|{350053, Cruzeiro...|
|3503505|{35052, {{35, São...|            Areias|{350053, Cruzeiro...|
|3504909|{35052, {{35, São...|           Bananal|{350053, Cruzeiro...|
|3508504|{35050, {{35, São...|          Caçapava|{350049, São José...|
|3508603|{35051, {{35, São...|Cachoeira Paulista|{350053, Cruzeiro...|
|3509700|{35049, {{35, São...|  Campos do Jordão|{350050, Taubaté ...|
|3509957|{35051, {{35, São...|             Canas|{350052, Guaratin...|
|3510500|{35054, {{35, São...|     Caraguatatuba|{350051, Caraguat...|
|3513405|{35051, {{35, São...|          Cruzeiro|{350053, Cruzeiro...|
|3513603|{35053, {{35, São...|             Cunha|{350052, Guaratin...|
|35184

In [85]:
# Selecionar colunas relevantes
cities_filtered = cities.select(['nome', 'id'])
cities_filtered.show()

+------------------+-------+
|              nome|     id|
+------------------+-------+
|         Aparecida|3502507|
|            Arapeí|3503158|
|            Areias|3503505|
|           Bananal|3504909|
|          Caçapava|3508504|
|Cachoeira Paulista|3508603|
|  Campos do Jordão|3509700|
|             Canas|3509957|
|     Caraguatatuba|3510500|
|          Cruzeiro|3513405|
|             Cunha|3513603|
|     Guaratinguetá|3518404|
|           Igaratá|3520202|
|          Ilhabela|3520400|
|           Jacareí|3524402|
|          Jambeiro|3524907|
|          Lagoinha|3526308|
|         Lavrinhas|3526605|
|            Lorena|3527207|
|   Monteiro Lobato|3531704|
+------------------+-------+
only showing top 20 rows



In [87]:
# Renomear colunas
cities_renamed = cities_filtered.withColumnRenamed('nome', 'Cidade') \
                                .withColumnRenamed('id', 'CodigoDaCidade')
cities_renamed.show()

+------------------+--------------+
|            Cidade|CodigoDaCidade|
+------------------+--------------+
|         Aparecida|       3502507|
|            Arapeí|       3503158|
|            Areias|       3503505|
|           Bananal|       3504909|
|          Caçapava|       3508504|
|Cachoeira Paulista|       3508603|
|  Campos do Jordão|       3509700|
|             Canas|       3509957|
|     Caraguatatuba|       3510500|
|          Cruzeiro|       3513405|
|             Cunha|       3513603|
|     Guaratinguetá|       3518404|
|           Igaratá|       3520202|
|          Ilhabela|       3520400|
|           Jacareí|       3524402|
|          Jambeiro|       3524907|
|          Lagoinha|       3526308|
|         Lavrinhas|       3526605|
|            Lorena|       3527207|
|   Monteiro Lobato|       3531704|
+------------------+--------------+
only showing top 20 rows



In [88]:
cities_renamed.printSchema()

root
 |-- Cidade: string (nullable = true)
 |-- CodigoDaCidade: long (nullable = true)



In [90]:
cities_list = cities_renamed.rdd.map(lambda x: x.Cidade).collect()
print(cities_list)

['Aparecida', 'Arapeí', 'Areias', 'Bananal', 'Caçapava', 'Cachoeira Paulista', 'Campos do Jordão', 'Canas', 'Caraguatatuba', 'Cruzeiro', 'Cunha', 'Guaratinguetá', 'Igaratá', 'Ilhabela', 'Jacareí', 'Jambeiro', 'Lagoinha', 'Lavrinhas', 'Lorena', 'Monteiro Lobato', 'Natividade da Serra', 'Paraibuna', 'Pindamonhangaba', 'Piquete', 'Potim', 'Queluz', 'Redenção da Serra', 'Roseira', 'Santa Branca', 'Santo Antônio do Pinhal', 'São Bento do Sapucaí', 'São José do Barreiro', 'São José dos Campos', 'São Luiz do Paraitinga', 'São Sebastião', 'Silveiras', 'Taubaté', 'Tremembé', 'Ubatuba']


In [45]:
# Buscar previsão do tempo para as cidades
# TODO

M3O_API_KEY = "ZTc0NTIzMTAtYjY2Ni00MDJkLWI3MjUtOTZlZjUzMzhiZGMx"
M3O_API_WEATHER_FORECAST = "https://api.m3o.com/v1/weather/Forecast"

WEATHER_PATH = "./data/weather/"
os.makedirs(os.path.dirname(WEATHER_PATH), exist_ok=True)

cities_list = cities.rdd.map(lambda x: x.nome).collect()
days = 5

for city in cities_list:
  PARAMS = {"days":days,"location":city}
  headers = {
      'Content-Type': "application/json",
      'Authorization': "Bearer " + M3O_API_KEY
  }
  REQ_WEATHER = requests.get(M3O_API_WEATHER_FORECAST, headers=headers, params=PARAMS)

  WEATHER = REQ_WEATHER.json()

  with open(f'{WEATHER_PATH}{city}.json', 'w') as f:
    json.dump(WEATHER, f)

In [93]:
# Criar data frame com as previsões
# TODO
weather = spark.read.option("multiline", "true").json("./data/weather/*.json")

# Criar view com as previsões
# TODO
weather.show()

+----+----------+------+--------------------+----+--------+----------------+--------------------+---------+----------+------+------------------+
|code|   country|detail|            forecast|  id|latitude|      local_time|            location|longitude|    region|status|          timezone|
+----+----------+------+--------------------+----+--------+----------------+--------------------+---------+----------+------+------------------+
|null|    Brazil|  null|[{20.2, 68.4, 86,...|null|  -22.83|2023-01-11 17:20|           Aparecida|   -45.23| Sao Paulo|  null| America/Sao_Paulo|
|null|    Brazil|  null|[{23.8, 74.9, 79,...|null|  -12.63|2023-01-11 17:21|  Caldeirao Do Bento|   -40.27|     Bahia|  null|     America/Bahia|
|null|    Brazil|  null|[{28.0, 82.3, 82,...|null|   -7.97|2023-01-11 17:21|     Saco Dos Campos|   -38.08|Pernambuco|  null|    America/Recife|
|null|    Brazil|  null|[{26.8, 80.3, 85,...|null|  -10.32|2023-01-11 17:21|            Lagoinha|    -38.9|     Bahia|  null|     

In [95]:
weather_filtered = weather.select(["location","local_time", "region", "country", "latitude", "longitude", "forecast"])
weather_filtered.show()

+--------------------+----------------+----------+----------+--------+---------+--------------------+
|            location|      local_time|    region|   country|latitude|longitude|            forecast|
+--------------------+----------------+----------+----------+--------+---------+--------------------+
|           Aparecida|2023-01-11 17:20| Sao Paulo|    Brazil|  -22.83|   -45.23|[{20.2, 68.4, 86,...|
|  Caldeirao Do Bento|2023-01-11 17:21|     Bahia|    Brazil|  -12.63|   -40.27|[{23.8, 74.9, 79,...|
|     Saco Dos Campos|2023-01-11 17:21|Pernambuco|    Brazil|   -7.97|   -38.08|[{28.0, 82.3, 82,...|
|            Lagoinha|2023-01-11 17:21|     Bahia|    Brazil|  -10.32|    -38.9|[{26.8, 80.3, 85,...|
|              Areias|2023-01-11 17:20|   Paraiba|    Brazil|   -6.45|   -38.43|[{27.7, 81.9, 89,...|
|     Cruzeiro Do Sul|2023-01-11 15:21|      Acre|    Brazil|   -7.63|    -72.6|[{25.3, 77.5, 85,...|
|               Canas|2023-01-11 14:21|Guanacaste|Costa Rica|   10.43|    -85.1|[{

In [97]:
# Renomear colunas
weather_renamed = weather_filtered.withColumnRenamed('location', 'Cidade') \
                                .withColumnRenamed('local_time', 'Data') \
                                .withColumnRenamed('region', 'Regiao') \
                                .withColumnRenamed('country', 'Pais') \
                                .withColumnRenamed('latitude', 'Latitude') \
                                .withColumnRenamed('longitude', 'Longitude') \
                                .withColumnRenamed('forecast', 'Previsao')
weather_renamed.show()

+--------------------+----------------+----------+----------+--------+---------+--------------------+
|              Cidade|            Data|    Regiao|      Pais|Latitude|Longitude|            Previsao|
+--------------------+----------------+----------+----------+--------+---------+--------------------+
|           Aparecida|2023-01-11 17:20| Sao Paulo|    Brazil|  -22.83|   -45.23|[{20.2, 68.4, 86,...|
|  Caldeirao Do Bento|2023-01-11 17:21|     Bahia|    Brazil|  -12.63|   -40.27|[{23.8, 74.9, 79,...|
|     Saco Dos Campos|2023-01-11 17:21|Pernambuco|    Brazil|   -7.97|   -38.08|[{28.0, 82.3, 82,...|
|            Lagoinha|2023-01-11 17:21|     Bahia|    Brazil|  -10.32|    -38.9|[{26.8, 80.3, 85,...|
|              Areias|2023-01-11 17:20|   Paraiba|    Brazil|   -6.45|   -38.43|[{27.7, 81.9, 89,...|
|     Cruzeiro Do Sul|2023-01-11 15:21|      Acre|    Brazil|   -7.63|    -72.6|[{25.3, 77.5, 85,...|
|               Canas|2023-01-11 14:21|Guanacaste|Costa Rica|   10.43|    -85.1|[{

In [98]:
weather_renamed.printSchema()

root
 |-- Cidade: string (nullable = true)
 |-- Data: string (nullable = true)
 |-- Regiao: string (nullable = true)
 |-- Pais: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Previsao: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- avg_temp_c: double (nullable = true)
 |    |    |-- avg_temp_f: double (nullable = true)
 |    |    |-- chance_of_rain: long (nullable = true)
 |    |    |-- condition: string (nullable = true)
 |    |    |-- date: string (nullable = true)
 |    |    |-- icon_url: string (nullable = true)
 |    |    |-- max_temp_c: double (nullable = true)
 |    |    |-- max_temp_f: double (nullable = true)
 |    |    |-- max_wind_kph: double (nullable = true)
 |    |    |-- max_wind_mph: double (nullable = true)
 |    |    |-- min_temp_c: double (nullable = true)
 |    |    |-- min_temp_f: double (nullable = true)
 |    |    |-- sunrise: string (nullable = true)
 |    |  

In [64]:
# Criar DF da Tabela 1
# TODO
schema1 = StructType([
      StructField("Cidade", StringType(), True),
      StructField("CodigodaCidade", IntegerType(), True),
      StructField("Data", IntegerType(), True),
      StructField("Regiao", StringType(), True),
      StructField("Pais", StringType(), True),
      StructField("Latitude", DoubleType(), True),
      StructField("Longitude", DoubleType(), True),
      StructField("TemperaturaMaxima", DoubleType(), True),
      StructField("TemperaturaMinima", DoubleType(), True),
      StructField("VaiChover", BooleanType(), True),
      StructField("ChanceDeChuva", DoubleType(), True),
      StructField("CondicaoDoTempo", DoubleType(), True),
      StructField("NascerDoSol", StringType(), True),
      StructField("PorDoSol", StringType(), True),
      StructField("VelocidadeMaximaDoVento", StringType(), True)
])



table1 = spark.createDataFrame(data = weather, schema = schema1)
table1.printSchema()
table1.show(truncate=False)


TypeError: data is already a DataFrame

- Cidade
    - QtdDiasVaiChover
    - QtdDiasNaoVaiChover
    - TotalDiasMapeados

In [None]:
# Criar DF da Tabela 2
# TODO
schema2 = StructType([
    StructField("Cidade", StringType(), True),
    StructField("QtdDiasVaiChover", IntegerType(), True),
    StructField("QtdDiasNaoVaiChover", IntegerType(), True),
    StructField("TotalDiasMapeados", IntegerType(), True)
])

In [102]:
# Exportar CSVs
# TODO
cities_renamed.write.option("header", True) \
    .csv("./data/csv/cities")

weather_renamed.write.option("header", True) \
    .csv("./data/csv/weather")

AnalysisException: CSV data source does not support array<struct<avg_temp_c:double,avg_temp_f:double,chance_of_rain:bigint,condition:string,date:string,icon_url:string,max_temp_c:double,max_temp_f:double,max_wind_kph:double,max_wind_mph:double,min_temp_c:double,min_temp_f:double,sunrise:string,sunset:string,will_it_rain:boolean>> data type.