In [1]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [3]:
!pip install unidecode

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [4]:
#Importando Bibliotecas e Iniciando o Spark
import findspark
findspark.init()

import pandas as pd
import requests
import json
import unidecode
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("projeto").setMaster("local[1]")
sc = SparkContext(conf = conf)

spark = SparkSession.builder \
      .master("local[1]") \
      .appName("projeto") \
      .getOrCreate()

In [5]:
# Buscar as cidades do Rio de Janeiro utilizando API do IBGE
URL = "https://servicodados.ibge.gov.br/api/v1/localidades/microrregioes/33001|33002|33003|33004|33005|33006|33007|33008|33009|33010|33011|33012|33013|33014/municipios"
r = requests.get(url = URL) 
data = r.json()

In [6]:
# Verificar retorno do request
data

[{'id': '3300100',
  'microrregiao': {'id': 33013,
   'mesorregiao': {'UF': {'id': 33,
     'nome': 'Rio de Janeiro',
     'regiao': {'id': 3, 'nome': 'Sudeste', 'sigla': 'SE'},
     'sigla': 'RJ'},
    'id': 3305,
    'nome': 'Sul Fluminense'},
   'nome': 'Baía da Ilha Grande'},
  'nome': 'Angra dos Reis',
  'regiao-imediata': {'id': 330002,
   'nome': 'Angra dos Reis',
   'regiao-intermediaria': {'UF': {'id': 33,
     'nome': 'Rio de Janeiro',
     'regiao': {'id': 3, 'nome': 'Sudeste', 'sigla': 'SE'},
     'sigla': 'RJ'},
    'id': 3301,
    'nome': 'Rio de Janeiro'}}},
 {'id': '3300159',
  'microrregiao': {'id': 33002,
   'mesorregiao': {'UF': {'id': 33,
     'nome': 'Rio de Janeiro',
     'regiao': {'id': 3, 'nome': 'Sudeste', 'sigla': 'SE'},
     'sigla': 'RJ'},
    'id': 3301,
    'nome': 'Noroeste Fluminense'},
   'nome': 'Santo Antônio de Pádua'},
  'nome': 'Aperibé',
  'regiao-imediata': {'id': 330012,
   'nome': 'Santo Antônio de Pádua',
   'regiao-intermediaria': {'UF': {'i

In [8]:
# Criar Dataframe com todas as cidades
df = spark.read.json(sc.parallelize(data))
df.show(n=68)

+-------+--------------------+--------------------+--------------------+
|     id|        microrregiao|                nome|     regiao-imediata|
+-------+--------------------+--------------------+--------------------+
|3300100|{33013, {{33, Rio...|      Angra dos Reis|{330002, Angra do...|
|3300159|{33002, {{33, Rio...|             Aperibé|{330012, Santo An...|
|3300209|{33010, {{33, Rio...|            Araruama|{330013, Cabo Fri...|
|3300225|{33005, {{33, Rio...|               Areal|{330007, Petrópol...|
|3300233|{33010, {{33, Rio...|  Armação dos Búzios|{330013, Cabo Fri...|
|3300258|{33010, {{33, Rio...|     Arraial do Cabo|{330013, Cabo Fri...|
|3300308|{33012, {{33, Rio...|      Barra do Piraí|{330004, Volta Re...|
|3300407|{33011, {{33, Rio...|         Barra Mansa|{330004, Volta Re...|
|3300506|{33007, {{33, Rio...|          Bom Jardim|{330008, Nova Fri...|
|3300605|{33001, {{33, Rio...|Bom Jesus do Itab...|{330011, Itaperun...|
|3300704|{33010, {{33, Rio...|           Cabo Frio|

In [9]:
# visualizando o schema
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- microrregiao: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- mesorregiao: struct (nullable = true)
 |    |    |-- UF: struct (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- nome: string (nullable = true)
 |    |    |    |-- regiao: struct (nullable = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- nome: string (nullable = true)
 |    |    |    |    |-- sigla: string (nullable = true)
 |    |    |    |-- sigla: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- nome: string (nullable = true)
 |    |-- nome: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- regiao-imediata: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- regiao-intermediaria: struct (nullable = true)
 |    |    |-- UF: struct (nullable = true)
 |    |    |    |-- id: long 

In [10]:
# Criar view com as cidades
df.createOrReplaceTempView("cities")

In [11]:
# Extrai nomes das cidades em uma lista para passar para a API de previsão do tempo
nomes_cidades = df.select("nome").rdd.flatMap(lambda x: x).collect()
nomes_cidades

['Angra dos Reis',
 'Aperibé',
 'Araruama',
 'Areal',
 'Armação dos Búzios',
 'Arraial do Cabo',
 'Barra do Piraí',
 'Barra Mansa',
 'Bom Jardim',
 'Bom Jesus do Itabapoana',
 'Cabo Frio',
 'Cambuci',
 'Carapebus',
 'Comendador Levy Gasparian',
 'Campos dos Goytacazes',
 'Cantagalo',
 'Cardoso Moreira',
 'Carmo',
 'Casimiro de Abreu',
 'Conceição de Macabu',
 'Cordeiro',
 'Duas Barras',
 'Engenheiro Paulo de Frontin',
 'Iguaba Grande',
 'Italva',
 'Itaocara',
 'Itaperuna',
 'Itatiaia',
 'Laje do Muriaé',
 'Macaé',
 'Macuco',
 'Mendes',
 'Miguel Pereira',
 'Miracema',
 'Natividade',
 'Nova Friburgo',
 'Paracambi',
 'Paraíba do Sul',
 'Paraty',
 'Paty do Alferes',
 'Pinheiral',
 'Piraí',
 'Porciúncula',
 'Porto Real',
 'Quatis',
 'Quissamã',
 'Resende',
 'Rio Claro',
 'Rio das Flores',
 'Rio das Ostras',
 'Santa Maria Madalena',
 'Santo Antônio de Pádua',
 'São Francisco de Itabapoana',
 'São Fidélis',
 'São João da Barra',
 'São José de Ubá',
 'São Pedro da Aldeia',
 'São Sebastião do A

In [12]:
# verificar quantas cidades
len(nomes_cidades)

68

In [13]:
# Buscar as previsões na API da WeatherAPI
lista_previsoes = []
datas = ["2022-07-13", "2022-07-14", "2022-07-15", "2022-07-16", "2022-07-17"]

for cidade in nomes_cidades:
  api_key = "27b873d37ea2471385912343220707"
  
  for dt in datas:
    link = f"http://api.weatherapi.com/v1/history.json?key={api_key}&q={cidade}&dt={dt}&hour=12"

    requisicao = requests.get(link)
    requisicao_dic = requisicao.json()
    lista_previsoes.append(requisicao_dic)

In [14]:
# Verificar se retornou 5 previsões por cidade (5 * 68)
len(lista_previsoes)

340

In [15]:
# Criar Dataframe com as previsões
df_previsoes = spark.read.json(sc.parallelize(lista_previsoes))
df_previsoes.show()

+--------------------+--------------------+--------------------+
|               error|            forecast|            location|
+--------------------+--------------------+--------------------+
|                null|{[{{97, Waxing Gi...|{Brazil, -23.0, 2...|
|                null|{[{{97, Full Moon...|{Brazil, -23.0, 2...|
|                null|{[{{90, Full Moon...|{Brazil, -23.0, 2...|
|                null|{[{{83, Full Moon...|{Brazil, -23.0, 2...|
|                null|{[{{76, Full Moon...|{Brazil, -23.0, 2...|
|{1006, No matchin...|                null|                null|
|{1006, No matchin...|                null|                null|
|{1006, No matchin...|                null|                null|
|{1006, No matchin...|                null|                null|
|{1006, No matchin...|                null|                null|
|                null|{[{{97, Waxing Gi...|{Brazil, -22.88, ...|
|                null|{[{{97, Full Moon...|{Brazil, -22.88, ...|
|                null|{[{

In [16]:
# Criar view com as previsões
df_previsoes.createOrReplaceTempView("forecasts")

In [17]:
# Verificar schema do Dataframe das previsões
df_previsoes.printSchema()

root
 |-- error: struct (nullable = true)
 |    |-- code: long (nullable = true)
 |    |-- message: string (nullable = true)
 |-- forecast: struct (nullable = true)
 |    |-- forecastday: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- astro: struct (nullable = true)
 |    |    |    |    |-- moon_illumination: string (nullable = true)
 |    |    |    |    |-- moon_phase: string (nullable = true)
 |    |    |    |    |-- moonrise: string (nullable = true)
 |    |    |    |    |-- moonset: string (nullable = true)
 |    |    |    |    |-- sunrise: string (nullable = true)
 |    |    |    |    |-- sunset: string (nullable = true)
 |    |    |    |-- date: string (nullable = true)
 |    |    |    |-- date_epoch: long (nullable = true)
 |    |    |    |-- day: struct (nullable = true)
 |    |    |    |    |-- avghumidity: double (nullable = true)
 |    |    |    |    |-- avgtemp_c: double (nullable = true)
 |    |    |    |    |-- avgtemp_f: 

In [25]:
# Criar DF da Tabela 1
df_1 = spark.sql("""SELECT
                    A.nome as cidade,
                    A.id as codigo_da_cidade,
                    B.location.region as regiao,
                    B.forecast.forecastday[0].date as data,
                    B.location.country as pais,
                    B.location.lat as latitude,
                    B.location.lon as longitude,
                    B.forecast.forecastday[0].day.maxtemp_c as temperatura_maxima,
                    B.forecast.forecastday[0].day.mintemp_c as temperatura_minima,
                    B.forecast.forecastday[0].day.maxtemp_c as temperatura_media,
                    CASE WHEN B.forecast.forecastday[0].hour[0].will_it_rain = 0 THEN 'nao'
                    ELSE 'sim' END AS vai_chover,
                    B.forecast.forecastday[0].hour[0].chance_of_rain as chance_de_chuva,
                    B.forecast.forecastday[0].day.condition.text as condicao_do_tempo,
                    B.forecast.forecastday[0].astro.sunrise as nascer_do_sol,
                    B.forecast.forecastday[0].astro.sunset as por_do_sol,
                    B.forecast.forecastday[0].day.maxwind_kph as velocidade_max_vento

                  FROM cities as A
                  INNER JOIN forecasts as B
                  ON A.nome = B.location.name
                  WHERE B.location.region ='Rio de Janeiro'
                  """)

In [26]:
# Visualizar df_1
df_1.show()

+-----------+----------------+--------------+----------+------+--------+---------+------------------+------------------+-----------------+----------+---------------+--------------------+-------------+----------+--------------------+
|     cidade|codigo_da_cidade|        regiao|      data|  pais|latitude|longitude|temperatura_maxima|temperatura_minima|temperatura_media|vai_chover|chance_de_chuva|   condicao_do_tempo|nascer_do_sol|por_do_sol|velocidade_max_vento|
+-----------+----------------+--------------+----------+------+--------+---------+------------------+------------------+-----------------+----------+---------------+--------------------+-------------+----------+--------------------+
|   Araruama|         3300209|Rio de Janeiro|2022-07-13|Brazil|  -22.88|   -42.33|              23.7|              19.8|             23.7|       nao|              0|Patchy rain possible|     06:30 AM|  05:21 PM|                26.3|
|   Araruama|         3300209|Rio de Janeiro|2022-07-14|Brazil|  -22

In [27]:
# Criar view do df_1 para manipular na criação do df_2
df_1.createOrReplaceTempView("df_1")

In [28]:
# Verificar se há dias com previsão de chuva
df_distinct = spark.sql("""SELECT
                      DISTINCT vai_chover
                    FROM df_1""")

In [29]:
# Todas as previsões são de dias sem chuva
df_distinct.show()

+----------+
|vai_chover|
+----------+
|       nao|
+----------+



In [30]:
df_2 = spark.sql("""
                  WITH nao_chove AS(
                    SELECT
                      cidade as nome_cidade,
                      count(vai_chover) AS qtd_dias_nao_vai_chover,
                      count(cidade) as total_dias_mapeados
                    FROM df_1
                    WHERE vai_chover = 'nao'
                    GROUP BY cidade)
                    SELECT
                          B.nome_cidade,
                          CASE WHEN A.vai_chover = 'sim' THEN 1 ELSE null END AS qtd_dias_vai_chover,
                          B.qtd_dias_nao_vai_chover,
                          B.total_dias_mapeados
                    FROM df_1 as A
                    RIGHT JOIN nao_chove as B
                    ON A.cidade = B.nome_cidade
                    GROUP BY B.nome_cidade, B.qtd_dias_nao_vai_chover, B.total_dias_mapeados, A.vai_chover
                    """)

In [31]:
# Criar DF da Tabela 2
df_2 = spark.sql("""SELECT 
                      cidade, 
                      SUM(IF(vai_chover='sim',1,0)) as qtd_dias_vai_chover, 
                      SUM(IF(vai_chover='nao',1,0)) as qtd_dias_nao_vai_chover,
                      count(cidade) as TotalDiasMapeados
                    FROM df_1 
                    GROUP BY cidade""")

In [32]:
# Visualizar df_2
df_2.show()

+---------------+-------------------+-----------------------+-----------------+
|         cidade|qtd_dias_vai_chover|qtd_dias_nao_vai_chover|TotalDiasMapeados|
+---------------+-------------------+-----------------------+-----------------+
|       Araruama|                  0|                      5|                5|
|         Paraty|                  0|                      5|                5|
|    Barra Mansa|                  0|                      5|                5|
|     Porto Real|                  0|                      5|                5|
|         Mendes|                  0|                      5|                5|
|Cardoso Moreira|                  0|                      5|                5|
|          Areal|                  0|                      5|                5|
|      Itaperuna|                  0|                      5|                5|
|      Paracambi|                  0|                      5|                5|
|        Resende|                  0|   

In [35]:
# Exportar CSVs
df_1.write.format("csv").save("/content/drive/final/tabela_1.csv")

In [36]:
df_2.write.format("csv").save("/content/drive/final/tabela_2.csv")