# Desafio: Consumo de Dados para Previsão do Tempo das Cidades do Vale do Paraíba.

## Objetivo

Avaliar conhecimentos nas linguagens Python e SQL e na engine de processamento Apache Spark.

## Descrição

Neste desafio, você desenvolverá um notebook que será responsável por extrair dados de previsão do tempo das cidades do Vale do Paraíba, região onde se localiza a Dataside. Para consultar todas as cidades dessa região, utilizaremos a API do IBGE. No caso, basta realizar uma requisição HTTP com o método GET, utilizando a URL abaixo:

```
https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios
```

Com esses dados, gerar um data frame e a partir dele uma temp view. Ex: "cities"

Utilizando os nomes das cidades, deverão ser consultados os dados de previsão de tempo para cada cidade. Para realizar essa consulta, poderá ser utilizada qualquer uma das APIs informadas no link abaixo.

[Public APIs - Wather](https://github.com/public-apis/public-apis#weather)

Obs.: Para algumas, pode ser necessário cadastrar-se para acessar sua API Key. Mas nenhuma delas deve precisar cadastrar cartão de crédito ou adicionar qualquer valor monetário para utilizar. Caso alguma solicite, basta optar por outra.

Com os dados consultados, gerar um data frame e partir dele outra temp view. Ex: "forecasts"

Com as temp views geradas, utilizar Spark SQL para criar queries e gerar data frames das seguintes tabelas:

- Tabela 1: dados de previsão do tempo para os próximos cinco dias, para cada data e cidade consultadas. As colunas dessa tabela serão:
    - Cidade
    - CodigoDaCidade
    - Data
    - Regiao
    - Pais
    - Latitude
    - Longigute
    - TemperaturaMaxima
    - TemperaturaMinima
    - TemperaturaMedia
    - VaiChover
    - ChanceDeChuva
    - CondicaoDoTempo
    - NascerDoSol
    - PorDoSol
    - VelocidadeMaximaDoVento
    
    Obs.: Os valores da coluna "VaiChover" deverá ser "Sim" ou "Não". E a coluna "CodigoDaCidade" é o ID retornado junto com os nomes da cidades na API do IBGE.
    Obs.: Dependendo da API utilizada, algumas colunas podem não existir e ficarão em branco. Você deve optar por uma API que traga o maior número de informações possível.

- Tabela 2: quantidade de dias com chuva e sem chuva para os dias consultados, para cada data consultada. Colunas:
    - Cidade
    - QtdDiasVaiChover
    - QtdDiasNaoVaiChover
    - TotalDiasMapeados

Essas tabelas deverão ser exportadas em formado CSV e entregue no final do desafio.

## To Do

[ ] - Consultar municípios do Vale do Paraíba, gerar um data frame e criar uma temp view com esses dados.
[ ] - Consultar dados do tempo para cada município, gerar um data frame e criar uma outra temp view.
[ ] - Utilizar Spark SQL para gerar os data frames das Tabelas 1 e 2.
[ ] - Exportar os data frames para CSV.

## Atenção

- Existe um limite de requisições de 10000 requests por conta cadastrada na m3o.
- Essa API pode retornar cidades de outras regiões que possuem nome semelhante a alguma cidade do Vale do Paraiba. Pode mantê-las ou filtrar para gerar as tabelas apenas com dados de Regiao = Sao Paulo. Fica a seu critério.

## Entregando o desafio

Concluindo todos os passos informados em To Do, basta salvar o arquivo .ipynb do notebook e enviar para a Dataside juntamente com os CSVs das duas tabelas.


In [None]:
!pip install pyspark
!pip install findspark
!pip install unidecode

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/

Collecting pyspark

  Downloading pyspark-3.3.1.tar.gz (281.4 MB)

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m

[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone

Collecting py4j==0.10.9.5

  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m14.1 MB/s[0m eta [36m0:00:00[0m

[?25hBuilding wheels for collected packages: pyspark

  Building wheel for pyspark (setup.py) ... [?25l[?25hdone

  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=a48b7d22dcd2b5a0948cc3b7bf20b941f354fdcfacaf75771af7184f5a5b79df

  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598

Successfully built pyspark

Installing collected packages

In [None]:
import findspark
findspark.init()

import requests
import json
import unidecode
import ast
import pyspark
from pyspark import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType



spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()
sparkContext = spark.sparkContext

In [None]:
#faz request na api e retorna json com dados
r = requests.get('https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios')
#verifica se 200 para sucesso
r.status_code

200

In [None]:
#adiciona os dados retornados na variável cities
raw_cities = r.json()

In [None]:
raw_cities[0].items()

dict_items([('id', 3502507), ('nome', 'Aparecida'), ('microrregiao', {'id': 35051, 'nome': 'Guaratinguetá', 'mesorregiao': {'id': 3513, 'nome': 'Vale do Paraíba Paulista', 'UF': {'id': 35, 'sigla': 'SP', 'nome': 'São Paulo', 'regiao': {'id': 3, 'sigla': 'SE', 'nome': 'Sudeste'}}}}), ('regiao-imediata', {'id': 350052, 'nome': 'Guaratinguetá', 'regiao-intermediaria': {'id': 3511, 'nome': 'São José dos Campos', 'UF': {'id': 35, 'sigla': 'SP', 'nome': 'São Paulo', 'regiao': {'id': 3, 'sigla': 'SE', 'nome': 'Sudeste'}}}})])

In [None]:
raw_cities[0]['id']

3502507

In [None]:
#cria dataframe com colunas para os dados da consulta
emptyRDD = spark.sparkContext.emptyRDD()

schemaDF1 = (StructType([
    StructField('id', LongType(), True),
    StructField('nome', StringType(), True),
    StructField('microregiao', StringType(), True),
    StructField('mesorregiao', StringType(), True),
    StructField('UF', StringType(), True),
    StructField('regiao', StringType(), True)
]))
df = spark.createDataFrame(emptyRDD, schemaDF1)
df.show()

+---+----+-----------+-----------+---+------+

| id|nome|microregiao|mesorregiao| UF|regiao|

+---+----+-----------+-----------+---+------+

+---+----+-----------+-----------+---+------+




In [None]:
for city in raw_cities:
  newRow = spark.createDataFrame([(city['id'],
                                   city['nome'],
                                   city['microrregiao']['nome'],
                                   city['microrregiao']['mesorregiao']['nome'],
                                   city['microrregiao']['mesorregiao']['UF']['sigla'],
                                   city['microrregiao']['mesorregiao']['UF']['regiao']['nome']
                                   )])
  df = df.union(newRow)

In [None]:
df.show(truncate = False)

+-------+------------------+--------------------+------------------------+---+-------+

|id     |nome              |microregiao         |mesorregiao             |UF |regiao |

+-------+------------------+--------------------+------------------------+---+-------+

|3502507|Aparecida         |Guaratinguetá       |Vale do Paraíba Paulista|SP |Sudeste|

|3503158|Arapeí            |Bananal             |Vale do Paraíba Paulista|SP |Sudeste|

|3503505|Areias            |Bananal             |Vale do Paraíba Paulista|SP |Sudeste|

|3504909|Bananal           |Bananal             |Vale do Paraíba Paulista|SP |Sudeste|

|3508504|Caçapava          |São José dos Campos |Vale do Paraíba Paulista|SP |Sudeste|

|3508603|Cachoeira Paulista|Guaratinguetá       |Vale do Paraíba Paulista|SP |Sudeste|

|3509700|Campos do Jordão  |Campos do Jordão    |Vale do Paraíba Paulista|SP |Sudeste|

|3509957|Canas             |Guaratinguetá       |Vale do Paraíba Paulista|SP |Sudeste|

|3510500|Caraguatatuba     |Cara

In [None]:
#create a temporary table view from original DataFrame
df.createOrReplaceTempView('cities')
#select data from temp view
spark.sql('select nome from cities').show()

+------------------+

|              nome|

+------------------+

|         Aparecida|

|            Arapeí|

|            Areias|

|           Bananal|

|          Caçapava|

|Cachoeira Paulista|

|  Campos do Jordão|

|             Canas|

|     Caraguatatuba|

|          Cruzeiro|

|             Cunha|

|     Guaratinguetá|

|           Igaratá|

|          Ilhabela|

|           Jacareí|

|          Jambeiro|

|          Lagoinha|

|         Lavrinhas|

|            Lorena|

|   Monteiro Lobato|

+------------------+

only showing top 20 rows




In [None]:
#collect data from temp view to loop through
dataCollect = spark.sql('select nome from cities').collect()
cities_name = []

#stores the names of the cities in the cities_name list to pass to API
for city in dataCollect:
  cities_name.append(city['nome'])
  
print(cities_name)

['Aparecida', 'Arapeí', 'Areias', 'Bananal', 'Caçapava', 'Cachoeira Paulista', 'Campos do Jordão', 'Canas', 'Caraguatatuba', 'Cruzeiro', 'Cunha', 'Guaratinguetá', 'Igaratá', 'Ilhabela', 'Jacareí', 'Jambeiro', 'Lagoinha', 'Lavrinhas', 'Lorena', 'Monteiro Lobato', 'Natividade da Serra', 'Paraibuna', 'Pindamonhangaba', 'Piquete', 'Potim', 'Queluz', 'Redenção da Serra', 'Roseira', 'Santa Branca', 'Santo Antônio do Pinhal', 'São Bento do Sapucaí', 'São José do Barreiro', 'São José dos Campos', 'São Luiz do Paraitinga', 'São Sebastião', 'Silveiras', 'Taubaté', 'Tremembé', 'Ubatuba']


In [None]:
API_KEY = ''
latitudes = []
longitudes = []

#get city latitude and longitude from city name and state code
for city in cities_name:
  get_coordinates = requests.get(f'http://api.openweathermap.org/geo/1.0/direct?q={city},BR&limit=5&appid={API_KEY}')
  raw_data = json.loads(get_coordinates.text)
   
  latitudes.append(raw_data[0]['lat'])
  longitudes.append(raw_data[0]['lon'])

#link_api = f'api.openweathermap.org/data/2.5/forecast?lat={lat}&lon={lon}&appid={API_KEY}'

In [None]:
#convert list of latitudes to a dataframe
latitudes_data = spark.createDataFrame([(lat,) for lat in latitudes], ['latitudes'])

#convert list longitudes to a dataframe
longitudes_data = spark.createDataFrame([(lon,) for lon in longitudes], ['longitudes'])

In [None]:
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql import Window

#add sequential index and join both dataframes to get intermediary result
latitudes_data = latitudes_data.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id())))
longitudes_data = longitudes_data.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id())))

#intermediary result of both latitude and longitude dataframes
intermediary_df = latitudes_data.join(longitudes_data, latitudes_data.index == longitudes_data.index).drop('index')

#intermediary_df.show()

In [None]:
#now, to join the main df to the intermediary_df

#add sequential index in both dataframes
df = df.withColumn('index', row_number().over(Window.orderBy(monotonically_increasing_id())))
intermediary_df = intermediary_df.withColumn('index', row_number().over(Window.orderBy(monotonically_increasing_id())))

#final result
complete_df = df.join(intermediary_df, df.index == intermediary_df.index).drop('index')

#complete_df.show(truncate = False)

In [None]:
#rows = complete_df.count()
#print(rows)

In [None]:
#create temp viw with latitude, longitude
complete_df.createOrReplaceTempView('lat_lon')
#select data from temp view
spark.sql('select latitudes, longitudes from lat_lon').show()

+-----------+-----------+

|  latitudes| longitudes|

+-----------+-----------+

| -22.851552|-45.2340924|

|-22.6738889|-44.4477778|

|-22.5800929|-44.6975127|

|-22.6828195|-44.3221095|

| -23.099204| -45.707645|

| -22.666498| -45.015384|

|-22.7395263|-45.5912829|

|-22.7029347|-45.0526508|

|  -23.62028|  -45.41306|

|-22.5783685|-44.9642044|

|  -23.07753|-44.9567436|

|-22.8057839|-45.1908926|

|-23.2063475| -46.156934|

| -23.816628| -45.368685|

|  -23.30528|  -45.96583|

|-23.2556416|-45.6919926|

|-23.0898337|-45.1903825|

| -22.570047| -44.902359|

|-22.7367652|-45.1070876|

|-22.9553542|-45.8387146|

+-----------+-----------+

only showing top 20 rows




In [None]:
#fazer um collect dos dados de latitude e longitude e buscar a previsão
#collect latitude, longitude data from temp view to loop through
dataCollect = spark.sql('select latitudes, longitudes from lat_lon').collect()

latitudes = []
longitudes = []

for city in dataCollect:
  latitudes.append(city['latitudes'])
  longitudes.append(city['longitudes'])
#print(latitudes, longitudes)

In [None]:
#get data from api using latitude and longitude coordinates
import itertools
raw_forecasts = []

for lat, lon in zip(latitudes, longitudes):

  get_coordinates = requests.get(f'http://api.openweathermap.org/data/2.5/forecast?lat={lat}&lon={lon}&appid={API_KEY}&units=metric&lang=pt_br')
  raw_data = json.loads(get_coordinates.text)
  raw_forecasts.append(raw_data)



In [None]:
#creates a dataframe with empty columns that will be filled when we get the city forecast
emptyRDD = spark.sparkContext.emptyRDD()

schema = (StructType([
    StructField('cidade', StringType(), True),
    StructField('codCidade', StringType(), True),
    StructField('dia', StringType(), True),
    StructField('regiao', StringType(), True),
    StructField('pais', StringType(), True),
    StructField('latitude', StringType(), True),
    StructField('longitude', StringType(), True),
    StructField('tempMax', StringType(), True),
    StructField('tempMin', StringType(), True),
    StructField('temMedia', StringType(), True),
    StructField('vaiChover', StringType(), True),
    StructField('pctgChuva', StringType(), True),
    StructField('condiTempo', StringType(), True),
    StructField('nascerSol', StringType(), True),
    StructField('porSol', StringType(), True),
    StructField('velMaxVento', StringType(), True)
]))
df_forecast = spark.createDataFrame(emptyRDD, schema)
df_forecast.show()

+------+---------+---+------+----+--------+---------+-------+-------+--------+---------+---------+----------+---------+------+-----------+

|cidade|codCidade|dia|regiao|pais|latitude|longitude|tempMax|tempMin|temMedia|vaiChover|pctgChuva|condiTempo|nascerSol|porSol|velMaxVento|

+------+---------+---+------+----+--------+---------+-------+-------+--------+---------+---------+----------+---------+------+-----------+

+------+---------+---+------+----+--------+---------+-------+-------+--------+---------+---------+----------+---------+------+-----------+




In [None]:
#import datetime to convert the time from unix to tuc and then 
import datetime
from datetime import timedelta

#this 'for range' sets the predictions for the same time each day
for day in range (4, 44, 8):
  for forecast in raw_forecasts:
    vai_chover = ''
    if(str(forecast['list'][4]['weather'][0]['main']) == 'Rain'):
      vai_chover = "Sim"

    #catches the time from unix time and converts to utc
    sunrise = datetime.datetime.fromtimestamp(raw_forecasts[1]['city']['sunrise'])
    #then subtracts -3 hours to match brasilia time
    sunrise -= timedelta(hours = 3)
    sunset = datetime.datetime.fromtimestamp(raw_forecasts[1]['city']['sunset'])
    sunset -= timedelta(hours = 3)

    newRow = spark.createDataFrame([(
        forecast['city']['name'],
        forecast['city']['id'],
        forecast['list'][day]['dt_txt'],
        'Vale do Paraiba',
        forecast['city']['country'],
        forecast['city']['coord']['lat'],
        forecast['city']['coord']['lon'],
        forecast['list'][day]['main']['feels_like'],
        forecast['list'][day]['main']['temp'],
        forecast['list'][day]['main']['temp_min'],
        vai_chover,
        #forecast['list'][4]['weather'][0]['main'],
        int(forecast['list'][day]['pop']*100),
        forecast['list'][day]['weather'][0]['description'],
        sunrise,
        #forecast['city']['sunrise'],
        sunset,
        #forecast['city']['sunset'],
        forecast['list'][day]['wind']['speed'])])

    df_forecast = df_forecast.union(newRow).distinct()

In [None]:
#raw_forecasts[0]['city']
df_forecast.show(truncate = False)
#df_forecast.show()

#dont know why but it is inserting in the dataframe duplicate values
#for city name but with different coordinate values and therefore different frecast for the same location

+------------------+---------+-------------------+---------------+----+--------+---------+-------+-------+--------+---------+---------+--------------+-------------------+-------------------+-----------+

|cidade            |codCidade|dia                |regiao         |pais|latitude|longitude|tempMax|tempMin|temMedia|vaiChover|pctgChuva|condiTempo    |nascerSol          |porSol             |velMaxVento|

+------------------+---------+-------------------+---------------+----+--------+---------+-------+-------+--------+---------+---------+--------------+-------------------+-------------------+-----------+

|Aparecida         |3471949  |2023-01-15 06:00:00|Vale do Paraiba|BR  |-22.8516|-45.2341 |20.69  |20.09  |20.09   |Sim      |96       |chuva leve    |2023-01-14 05:25:02|2023-01-14 18:48:01|0.5        |

|Bananal           |3470992  |2023-01-15 06:00:00|Vale do Paraiba|BR  |-22.6739|-44.4478 |22.41  |21.63  |21.63   |Sim      |82       |chuva leve    |2023-01-14 05:25:02|2023-01-14 18:

In [None]:
#for some reason the API is returning wrong city names for the given coordinates
#so i had to hardcode the correct cityname into the dataframe for those wrong coordinates
df_forecast = df_forecast.withColumn('cidade', pyspark.sql.functions.when(df_forecast['latitude'] == '-22.8377', 'Potim').otherwise(df_forecast['cidade']))
df_forecast = df_forecast.withColumn('cidade', pyspark.sql.functions.when(df_forecast['latitude'] == '-22.6739', 'Arapeí').otherwise(df_forecast['cidade']))
df_forecast = df_forecast.withColumn('cidade', pyspark.sql.functions.when(df_forecast['latitude'] == '-22.5801', 'Areias').otherwise(df_forecast['cidade']))
df_forecast = df_forecast.withColumn('cidade', pyspark.sql.functions.when(df_forecast['latitude'] == '-22.6452', 'São José do Bareiro').otherwise(df_forecast['cidade']))
df_forecast = df_forecast.withColumn('cidade', pyspark.sql.functions.when(df_forecast['latitude'] == '-22.5372', 'Queluz').otherwise(df_forecast['cidade']))
df_forecast = df_forecast.withColumn('cidade', pyspark.sql.functions.when(df_forecast['latitude'] == '-22.6638', 'Silveiras').otherwise(df_forecast['cidade']))
df_forecast = df_forecast.withColumn('cidade', pyspark.sql.functions.when(df_forecast['latitude'] == '-22.6885', 'São Bento do Sapucaí').otherwise(df_forecast['cidade']))
df_forecast = df_forecast.withColumn('cidade', pyspark.sql.functions.when(df_forecast['latitude'] == '-23.0898', 'Lagoinha').otherwise(df_forecast['cidade']))
df_forecast.sort(df_forecast.cidade.asc(), df_forecast.dia.asc()).show(truncate = False)

+---------+---------+-------------------+---------------+----+--------+---------+-------+-------+--------+---------+---------+--------------+-------------------+-------------------+-----------+

|cidade   |codCidade|dia                |regiao         |pais|latitude|longitude|tempMax|tempMin|temMedia|vaiChover|pctgChuva|condiTempo    |nascerSol          |porSol             |velMaxVento|

+---------+---------+-------------------+---------------+----+--------+---------+-------+-------+--------+---------+---------+--------------+-------------------+-------------------+-----------+

|Aparecida|3471949  |2023-01-15 06:00:00|Vale do Paraiba|BR  |-22.8516|-45.2341 |20.69  |20.09  |20.09   |Sim      |96       |chuva leve    |2023-01-14 05:25:02|2023-01-14 18:48:01|0.5        |

|Aparecida|3471949  |2023-01-16 06:00:00|Vale do Paraiba|BR  |-22.8516|-45.2341 |20.87  |20.26  |20.26   |Sim      |10       |nublado       |2023-01-14 05:25:02|2023-01-14 18:48:01|0.7        |

|Aparecida|3471949  |2023

In [None]:
#rows = df_forecast.count()
#print(rows)

In [None]:
#export ordered Table1 to csv with headers
df_forecast.orderBy('cidade').write.option('header', True).csv('Tabela_1_sorted.csv')

In [None]:
#this creates a temp view from df_forecast
df_forecast.createOrReplaceTempView('previsao')
#select data from temp view
previsao = spark.sql('select cidade, dia, tempMax, tempMin, temMedia,vaiChover, pctgChuva, velMaxVento from previsao order by cidade asc')

In [None]:
previsao.sort('cidade', 'dia').show(truncate = False)

+---------+-------------------+-------+-------+--------+---------+---------+-----------+

|cidade   |dia                |tempMax|tempMin|temMedia|vaiChover|pctgChuva|velMaxVento|

+---------+-------------------+-------+-------+--------+---------+---------+-----------+

|Aparecida|2023-01-15 06:00:00|20.69  |20.09  |20.09   |Sim      |96       |0.5        |

|Aparecida|2023-01-16 06:00:00|20.87  |20.26  |20.26   |Sim      |10       |0.7        |

|Aparecida|2023-01-17 06:00:00|19.73  |19.24  |19.24   |Sim      |4        |1.09       |

|Aparecida|2023-01-18 06:00:00|19.22  |18.78  |18.78   |Sim      |69       |0.91       |

|Aparecida|2023-01-19 06:00:00|20.35  |19.74  |19.74   |Sim      |100      |0.31       |

|Arapeí   |2023-01-15 06:00:00|22.41  |21.63  |21.63   |Sim      |82       |0.3        |

|Arapeí   |2023-01-16 06:00:00|21.87  |21.14  |21.14   |Sim      |0        |0.34       |

|Arapeí   |2023-01-17 06:00:00|20.43  |19.86  |19.86   |Sim      |0        |0.31       |

|Arapeí   

In [None]:
# Buscar cidades do Vale do Paraíba
# Done

# Criar data frame com as cidades
# Done

# Criar view com as cidades
# Done

In [None]:
# Buscar previsão do tempo para as cidades
# Done

# Criar data frame com as previsões
# Done

# Criar view com as previsões
# Done

In [None]:
# Criar DF da Tabela 1
# Done

In [None]:
# Criar DF da Tabela 2
# TODO

In [None]:
# Exportar CSVs
# TODO

#Ver se essa API vai tá funcionando amanhã, pode ser que ainda estava off

## Parei aqui
Tabela 2: quantidade de dias com chuva e sem chuva para os dias consultados, para cada data consultada. Colunas:

Cidade
QtdDiasVaiChover
QtdDiasNaoVaiChover
TotalDiasMapeados