In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date
from pyspark.sql.functions import year
from pyspark.sql.functions import expr
from pyspark.sql.functions import col

import pandas as pd
import time

In [2]:
# Armazenando o tempo de início
start_time = time.time()

In [3]:
# Iniciando uma sessão do Spark
spark = SparkSession.builder.appName("Temperatura_Media").getOrCreate()

In [4]:
# Carregando o arquivo CSV como um DataFrame do Spark
dados = spark.read.csv("GlobalLandTemperaturesByState.csv", header=True, sep=",")

In [5]:
# Verificando o formato dos dados
dados.show(5)

+----------+------------------+-----------------------------+-----+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|State|Country|
+----------+------------------+-----------------------------+-----+-------+
|1855-05-01|            25.544|                        1.171| Acre| Brazil|
|1855-06-01|            24.228|                        1.103| Acre| Brazil|
|1855-07-01|            24.371|                        1.044| Acre| Brazil|
|1855-08-01|            25.427|                        1.073| Acre| Brazil|
|1855-09-01|            25.675|                        1.014| Acre| Brazil|
+----------+------------------+-----------------------------+-----+-------+
only showing top 5 rows



In [6]:
# Removendo linhas com valores ausentes
dados = dados.na.drop()

In [7]:
dados.show(10)

+----------+------------------+-----------------------------+-----+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|State|Country|
+----------+------------------+-----------------------------+-----+-------+
|1855-05-01|            25.544|                        1.171| Acre| Brazil|
|1855-06-01|            24.228|                        1.103| Acre| Brazil|
|1855-07-01|            24.371|                        1.044| Acre| Brazil|
|1855-08-01|            25.427|                        1.073| Acre| Brazil|
|1855-09-01|            25.675|                        1.014| Acre| Brazil|
|1855-10-01|25.441999999999997|                        1.179| Acre| Brazil|
|1855-11-01|              25.4|                        1.064| Acre| Brazil|
|1855-12-01|              24.1|           1.7180000000000002| Acre| Brazil|
|1856-01-01|            25.814|                        1.159| Acre| Brazil|
|1856-02-01|            24.658|                        1.147| Acre| Brazil|
+----------+

In [8]:
# Convertendo a coluna "dt" para dados
dados = dados.withColumn("dt", to_date(dados["dt"], "yyyy-MM-dd"))

In [9]:
# Passando a coluna AverageTemperature para dados numéricos
from pyspark.sql.functions import col

try:
    dados = dados.withColumn("AverageTemperature_numeric", col("AverageTemperature").cast("double"))
except:
    print("col is not numeric")

dados.show()

+----------+------------------+-----------------------------+-----+-------+--------------------------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|State|Country|AverageTemperature_numeric|
+----------+------------------+-----------------------------+-----+-------+--------------------------+
|1855-05-01|            25.544|                        1.171| Acre| Brazil|                    25.544|
|1855-06-01|            24.228|                        1.103| Acre| Brazil|                    24.228|
|1855-07-01|            24.371|                        1.044| Acre| Brazil|                    24.371|
|1855-08-01|            25.427|                        1.073| Acre| Brazil|                    25.427|
|1855-09-01|            25.675|                        1.014| Acre| Brazil|                    25.675|
|1855-10-01|25.441999999999997|                        1.179| Acre| Brazil|        25.441999999999997|
|1855-11-01|              25.4|                        1.064| Acre| Brazi

In [10]:
# Extraindo o ano da coluna "dt"
dados = dados.withColumn("year", year(dados["dt"]))

In [11]:
# Agrupando as temperaturas por ano e cidade
dados_agrupados = dados.groupBy(["year", "State"]).mean("AverageTemperature_numeric")
dados_agrupados.show(5)

+----+-------+-------------------------------+
|year|  State|avg(AverageTemperature_numeric)|
+----+-------+-------------------------------+
|1825| Adygey|              9.383166666666666|
|2005| Adygey|                       11.46325|
|1751|Alabama|                        18.3635|
|1870|Alabama|              16.72216666666667|
|1881|Alabama|              17.61691666666667|
+----+-------+-------------------------------+
only showing top 5 rows



In [12]:
# Obtendo as cidades únicas
estados = dados.select("State").distinct().rdd.flatMap(lambda x: x).collect()
print(estados)


['Bryansk', 'Hawaii', 'Manitoba', 'Kostroma', 'Nagaland', 'Karnataka', 'Belgorod', 'Guangdong', 'Minnesota', 'Kursk', 'Hunan', 'Kerala', 'Kaluga', 'Hubei', 'Kamchatka', 'Lipetsk', 'Beijing', 'Arkansas', 'Chita', 'Dagestan', 'Chhattisgarh', 'District Of Columbia', 'Andhra Pradesh', 'Georgia (State)', 'Madhya Pradesh', 'Mato Grosso', 'Kalmyk', 'Khakass', 'Kurgan', 'Mariy El', 'Andaman And Nicobar', 'Buryat', 'Krasnodar', 'Leningrad', 'Manipur', 'Connecticut', 'Nebraska', 'Heilongjiang', 'Chuvash', 'Daman And Diu', 'Alberta', 'Goa', 'Karachay Cherkess', 'Liaoning', 'Henan', 'Nevada', 'Mizoram', 'Australian Capital Territory', 'Anhui', 'Illinois', 'Alagoas', 'Mordovia', 'Fujian', 'Koryak', 'Bahia', 'Jiangxi', 'Jilin', 'Chongqing', 'Distrito Federal', 'New Brunswick', 'Himachal Pradesh', 'Ingush', 'Delaware', 'Haryana', 'Alaska', 'Jharkhand', 'Amazonas', 'Arunachal Pradesh', 'Chelyabinsk', 'Moskva', 'Acre', 'Gujarat', 'Mato Grosso Do Sul', 'Maga Buryatdan', 'Missouri', 'Gorno Altay', 'Komi 

In [13]:
from pyspark.sql.functions import max, min

# Agrupando as temperaturas por cidade e calcular a temperatura máxima e mínima
dados_agrupados = dados.groupBy("State").agg(max("AverageTemperature_numeric").alias("max_temp"),
                                            min("AverageTemperature_numeric").alias("min_temp"))

# Calculando a diferença entre as temperaturas máxima e mínima
dados_diferenca = dados_agrupados.withColumn("diferenca_temp", expr("max_temp - min_temp"))

# Selecionando as colunas "City" e "diferenca_temp"
dados_final = dados_diferenca.select("State", "diferenca_temp")

# Exibindo os resultados
dados_final.show()


+---------+------------------+
|    State|    diferenca_temp|
+---------+------------------+
|  Bryansk|            42.665|
|   Hawaii| 6.640000000000001|
| Manitoba|51.946000000000005|
| Kostroma|            46.388|
| Nagaland|            15.428|
|Karnataka|10.232999999999997|
| Belgorod|43.644000000000005|
|Guangdong|21.157000000000004|
|Minnesota|             47.94|
|    Kursk|            43.453|
|    Hunan|             28.87|
|   Kerala|7.2769999999999975|
|   Kaluga|44.031000000000006|
|    Hubei|30.555999999999997|
|Kamchatka|34.623999999999995|
|  Lipetsk|            45.458|
|  Beijing|39.629000000000005|
| Arkansas|            33.009|
|    Chita|             55.96|
| Dagestan|            35.495|
+---------+------------------+
only showing top 20 rows



In [14]:
# Ordenando o DataFrame em ordem crescente de acordo com a coluna "diferenca_temp"
dados_final = dados_final.sort(col("diferenca_temp").desc())

# Exibindo os resultados
dados_final.show()


+---------------+------------------+
|          State|    diferenca_temp|
+---------------+------------------+
|          Sakha|63.041999999999994|
|          Evenk|61.961000000000006|
|           Amur|57.096000000000004|
|         Taymyr|            56.369|
| Maga Buryatdan|            56.329|
|           Tuva| 56.19799999999999|
|        Irkutsk| 56.04600000000001|
|          Chita|             55.96|
|     Khabarovsk|            55.447|
|          Tomsk|            54.813|
|    Krasnoyarsk|            54.443|
|    Novosibirsk|54.028000000000006|
|         Yevrey|53.778999999999996|
|         Buryat|53.748000000000005|
|     Aga Buryat|53.668000000000006|
|Ust Orda Buryat|            53.592|
|   Heilongjiang|53.162000000000006|
|           Omsk|52.995999999999995|
|          Altay|            52.906|
|   Saskatchewan|            52.723|
+---------------+------------------+
only showing top 20 rows



In [15]:
# Criando um novo DataFrame com apenas as 10 primeiras linhas de "dados_final"
top_states = dados_final.limit(10)

# Exibindo o novo DataFrame
top_states.show()

+--------------+------------------+
|         State|    diferenca_temp|
+--------------+------------------+
|         Sakha|63.041999999999994|
|         Evenk|61.961000000000006|
|          Amur|57.096000000000004|
|        Taymyr|            56.369|
|Maga Buryatdan|            56.329|
|          Tuva| 56.19799999999999|
|       Irkutsk| 56.04600000000001|
|         Chita|             55.96|
|    Khabarovsk|            55.447|
|         Tomsk|            54.813|
+--------------+------------------+



In [16]:
# Armazenando o tempo de término
end_time = time.time()

# Calculando o tempo total de execução
execution_time = end_time - start_time

# Exibindo o tempo total de execução
print("Tempo de execução: {:.2f} segundos".format(execution_time))

Tempo de execução: 47.82 segundos
