# **TDE 03: SPARK SQL**
### **Nomes:** Marcella Resende, João Xavier, Sofia Castilho, Bruno Imai

In [None]:
!pip install pyspark
from pyspark.sql import SparkSession, Window
from pyspark import SparkFiles
from pyspark.sql.functions import * 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
spark = SparkSession.builder.master('local').appName('tdeSparkSQL').getOrCreate()

In [None]:
#Lê a base de dados csv, considerando que há cabeçalho, e inferschema = True, onde automaticamente identifica os tipos das colunas, separando por ;
df = spark.read.csv(SparkFiles.get("/content/transactions_amostra.csv"), header=True, inferSchema=True, sep=';')

In [None]:
df.show(5)

+---------------+----+---------+--------------------+---------+---------+---------+-------------------+--------+--------------------+
|country_or_area|year|comm_code|           commodity|     flow|trade_usd|weight_kg|      quantity_name|quantity|            category|
+---------------+----+---------+--------------------+---------+---------+---------+-------------------+--------+--------------------+
|        Belgium|2016|   920510|Brass-wind instru...|   Export|   571297|   3966.0|    Number of items|  4135.0|92_musical_instru...|
|      Guatemala|2008|   660200|Walking-sticks, s...|   Export|    35022|   5575.0|    Number of items| 10089.0|66_umbrellas_walk...|
|       Barbados|2006|   220210|Beverage waters, ...|Re-Export|    81058|  44458.0|   Volume in litres| 24113.0|22_beverages_spir...|
|        Tunisia|2016|   780411|Lead foil of a th...|   Import|     4658|    121.0|Weight in kilograms|   121.0|78_lead_and_artic...|
|      Lithuania|1996|   560110|Sanitary towels, ...|   Export

# **Questão 1**
### Número de transações envolvendo o Brasil

In [None]:
#Só resulta onde país possui 'Brazil'
df_brasil = df.filter("country_or_area == 'Brazil'")

#Realiza a contagem através da função count() de quantas vezes ocorreu transações envolvendo o Brasil
df_brasil.groupBy('country_or_area').count().show()

+---------------+-----+
|country_or_area|count|
+---------------+-----+
|         Brazil|54762|
+---------------+-----+



#Questão 2
### Número de transações por ano

In [None]:
#Realiza o agrupamento por ano através da função groupBy(), a contagem através da função count() de quantas vezes ocorreu transações 
#envolvendo o ano e ordena os resultados utilizando a função orderBy() pelo ano
df_transacoes_ano = df.groupBy('year').count().orderBy('year').show(5)

+----+-----+
|year|count|
+----+-----+
|1988| 8568|
|1989|16916|
|1990|18188|
|1991|22652|
|1992|32860|
+----+-----+
only showing top 5 rows



# Questão 3
### Número de transações por tipo de fluxo e por ano

In [None]:
#Realiza o agrupamento por ano e tipo de fluxo pela função groupBy(), a contagem das ocorrências pela função count() e ordena pelo ano e tipo de fluxo 
#através da função orderBy()
df_fluxo_ano = df.groupBy('year', 'flow').count().orderBy('year', 'flow').show(5)

+----+---------+-----+
|year|     flow|count|
+----+---------+-----+
|1988|   Export| 3542|
|1988|   Import| 4268|
|1988|Re-Export|  758|
|1989|   Export| 7092|
|1989|   Import| 8845|
+----+---------+-----+
only showing top 5 rows



# Questão 4
### O preço médio das commodities por ano

In [None]:
#Cria um dataframe, onde adiciona uma coluna chamada 'soma_preco' que possui a soma dos preços agrupadas por código e ano, e outra coluna chamada 'id_code_year'
#que possui a concatenação do código e ano, depois exclui a coluna 'comm_code' e 'year'
df_soma_preco = df.withColumn('soma_preco', col('trade_usd')).groupBy('comm_code','year').sum('soma_preco') \
                              .withColumn('id_code_year', concat(col('comm_code'), lit(' '),col('year'))) \
                              .drop('comm_code') \
                              .drop('year') 

#Cria um dataframe, onde faz a contagem dos registros agrupados por código e ano, e cria uma coluna chamada 'id_code_year'
#que possui a concatenação do código e ano, depois exclui a coluna 'comm_code' e 'year'
df_soma_qntd = df.groupBy('comm_code','year').count() \
                          .withColumn('id_code_year', concat(col('comm_code'), lit(' '),col('year'))) \
                          .drop('comm_code') \
                          .drop('year') 

#Cria um dataframe onde realiza um join entre as os dois dataframes criados acima através do id_code_year, adiciona uma coluna chamada 'preco_medio' 
#com a divisão entre a coluna 'sum(soma_preco)' e 'count' resultando na média dos preços das commodities por ano, excluindo as colunas 'sum(soma_preco)', 'count'
#e 'id_code_year' do df_soma_preco
df_resultado = df_soma_preco.join(df_soma_qntd, df_soma_preco.id_code_year == df_soma_qntd.id_code_year, 'inner') \
                                  .withColumn('preco_medio', (col('sum(soma_preco)')) / (col('count'))) \
                                  .drop('sum(soma_preco)') \
                                  .drop('count') \
                                  .drop(df_soma_preco.id_code_year)

#Mostra os 10 primeiros resultados do df_resultado
df_resultado.show(10)

+------------+-------------------+
|id_code_year|        preco_medio|
+------------+-------------------+
| 880110 1993|  363275.6818181818|
| 293722 2002|       1.46001495E7|
| 721632 2006|         8737203.75|
| 160411 2010|         2820821.62|
| 600121 2016| 1001889.1463414634|
| 700210 2009| 1038238.7368421053|
| 381720 1999|             9956.0|
| 420222 1994|4.886976628205128E7|
| 900662 2006|             2173.5|
| 853223 2004|  457601.3333333333|
+------------+-------------------+
only showing top 10 rows



In [None]:
#Realiza o agrupamento por código e ano através da função groupBy(), realiza a média dos preços e ordena de forma descentende através da coluna que possui as médias
df.groupBy('comm_code','year').avg('trade_usd').orderBy('avg(trade_usd)', ascending = False).show(10)

+---------+----+--------------------+
|comm_code|year|      avg(trade_usd)|
+---------+----+--------------------+
|   270900|2014|2.315410248039394E10|
|   270900|2013|2.091229040030303...|
|   270900|2012|1.558291985015789...|
|   270900|2007|1.476528541635897...|
|   270900|2011|1.236943290368181...|
|   270900|2006|1.216849675776470...|
|   270900|2010| 8.596000197073172E9|
|   270900|2009| 8.245109901478261E9|
|   270900|2008| 7.869201615581395E9|
|   271000|2012| 7.131366563296875E9|
+---------+----+--------------------+
only showing top 10 rows



# Questão 5
### O preço médio das transações que envolvem o Brasil por tipo de unidade, ano, categoria e fluxo do tipo 'Export'

In [None]:
#Só resulta onde país possui 'Brazil'
df_brasil = df.filter("country_or_area == 'Brazil'")

#A partir do dataframe com o filtro do Brasil, filtra os resultados que possuem Export no tipo de fluxo
df_brasil_export = df_brasil.filter("flow == 'Export'")

df_brasil_export.show(5)

+---------------+----+---------+--------------------+------+---------+---------+-------------------+--------+--------------------+
|country_or_area|year|comm_code|           commodity|  flow|trade_usd|weight_kg|      quantity_name|quantity|            category|
+---------------+----+---------+--------------------+------+---------+---------+-------------------+--------+--------------------+
|         Brazil|2016|   621320|Handkerchiefs, of...|Export|    47155|    944.0|Weight in kilograms|   944.0|62_articles_of_ap...|
|         Brazil|2014|   482040|Business forms, i...|Export|    34742|   1977.0|Weight in kilograms|  1977.0|48_paper_paperboa...|
|         Brazil|1997|   730531|Steel pipes nes, ...|Export|   221733| 344867.0|Weight in kilograms|344867.0|73_articles_of_ir...|
|         Brazil|2012|   284160|Manganites, manga...|Export|     1993|    139.0|Weight in kilograms|   139.0|28_inorganic_chem...|
|         Brazil|2013|   300432|Adrenal cortical ...|Export| 24425430| 361150.0|Wei

In [None]:
#Realiza o agrupamento dos dados por ano, código, tipo de unidade, categoria (quantity), realiza a média dos preços através da função avg() e 
#ordena pela média através da função orderBy()
df_brasil_export.groupBy('year','comm_code','quantity_name', 'quantity').avg('trade_usd').orderBy('avg(trade_usd)').show(10)

+----+---------+-------------------+--------+--------------+
|year|comm_code|      quantity_name|quantity|avg(trade_usd)|
+----+---------+-------------------+--------+--------------+
|1994|    30270|Weight in kilograms|     3.0|           1.0|
|2004|   280470|Weight in kilograms|     1.0|           1.0|
|2013|   252921|Weight in kilograms|     1.0|           1.0|
|2007|   110510|Weight in kilograms|     1.0|           1.0|
|2002|   721011|Weight in kilograms|     2.0|           1.0|
|1993|    80290|Weight in kilograms|     3.0|           2.0|
|1994|   950612|Weight in kilograms|     4.0|           2.0|
|1993|   470692|Weight in kilograms|     1.0|           2.0|
|1998|   481720|Weight in kilograms|     4.0|           2.0|
|2010|   400241|Weight in kilograms|     4.0|           2.0|
+----+---------+-------------------+--------+--------------+
only showing top 10 rows



# Questão 6
### Preço máximo, mínimo e média dos preços por tipo de unidade e ano  

In [None]:
#Cria um dataframe, adiciona uma coluna chamada 'soma_preco_ano' somando os preços e agrupando por tipo de unidade e ano, e outra coluna chamada 'id_Q6' 
#com a concatenação de tipo de unidade e ano, exclui a coluna  'quantity_name' e 'year'
df_soma_preco_ano = df.withColumn('soma_preco_ano', col('trade_usd')).groupBy('quantity_name', 'year').sum('soma_preco_ano') \
                                  .withColumn('id_Q6', concat(col('quantity_name'), lit(' '), col('year'))) \
                                  .drop('quantity_name') \
                                  .drop('year')
                      
#Cria um dataframe, agrupanda por tipo de unidade e ano e realiza a contagem de ocorrências, e outra coluna chamada 'id_Q6' com a concatenação de 
#tipo de unidade e ano, exclui a coluna  'quantity_name' e 'year'
df_soma_qntd_ano = df.groupBy('quantity_name', 'year').count() \
                              .withColumn('id_Q6', concat(col('quantity_name'), lit(' '), col('year'))) \
                              .drop('quantity_name') \
                              .drop('year')

#Cria um dataframe onde realiza um join entre as os dois dataframes criados acima através do id_Q6, adiciona uma coluna chamada 'preco_medio' 
#com a divisão entre a coluna 'sum(soma_preco_ano)' e 'count' resultando na média dos preços das commodities por ano, excluindo as colunas 'sum(soma_preco)', 'count'
#e 'id_Q6' do df_soma_preco_ano
df_resultado_ano = df_soma_preco_ano.join(df_soma_qntd_ano, df_soma_preco_ano.id_Q6 == df_soma_qntd_ano.id_Q6, 'inner') \
                                  .withColumn('preco_medio', (col('sum(soma_preco_ano)')) / (col('count'))) \
                                  .drop('sum(soma_preco_ano)') \
                                  .drop('count') \
                                  .drop(df_soma_preco_ano.id_Q6)

df_resultado_ano.show(10)

+--------------------+--------------------+
|               id_Q6|         preco_medio|
+--------------------+--------------------+
|Electrical energy...|           4122301.5|
|Number of pairs 2008|3.9991725244176015E7|
|Weight in carats ...| 9.718167391666667E7|
|Thousands of item...|   6971520.423076923|
|Volume in cubic m...|   2.1613620009375E7|
|Area in square me...|1.0891966478880407E7|
|Length in metres ...| 1.493895061967213E7|
|Number of pairs 2013| 3.524038769922879E7|
|Volume in litres ...| 8.903355118110237E7|
|Number of pairs 1990|   6225074.855421687|
+--------------------+--------------------+
only showing top 10 rows



In [None]:
#Realiza o agrupamento por tipo de unidade e ano através da função groupBy(), pega o valor máximo de preço através da função max() e ordena de forma descendente
#pela função orderBy()
df.groupBy('quantity_name','year').max('trade_usd').orderBy(max('trade_usd'), ascending = False ).show(10)

+-------------------+----+--------------+
|      quantity_name|year|max(trade_usd)|
+-------------------+----+--------------+
|Weight in kilograms|2013|  401459981239|
|Weight in kilograms|2014|  359475936313|
|Weight in kilograms|2007|  290899699040|
|Weight in kilograms|2006|  262924347374|
|Weight in kilograms|2012|  220793843089|
|Weight in kilograms|2008|  155473624044|
|Weight in kilograms|2010|  105814257878|
|Weight in kilograms|2000|   98728826923|
|Weight in kilograms|2009|   89255586690|
|Weight in kilograms|2011|   82986100000|
+-------------------+----+--------------+
only showing top 10 rows



In [None]:
#Realiza o agrupamento por tipo de unidade e ano através da função groupBy(), pega o valor mínimo de preço através da função min() e ordena de forma ascendente
#pela função orderBy()
df.groupBy('quantity_name','year').min('trade_usd').orderBy(min('trade_usd'), asceding = True).show(10)

+-------------------+----+--------------+
|      quantity_name|year|min(trade_usd)|
+-------------------+----+--------------+
|   Volume in litres|2009|             1|
|    Number of pairs|2004|             1|
|Weight in kilograms|2013|             1|
|    Number of items|2002|             1|
|Weight in kilograms|1998|             1|
|    Number of items|2011|             1|
|Weight in kilograms|1990|             1|
|   Volume in litres|2001|             1|
|Weight in kilograms|2008|             1|
|    Number of items|2014|             1|
+-------------------+----+--------------+
only showing top 10 rows



# Questão 7
### A commodity mais comercializada em 2016 por tipo de fluxo

In [None]:
#Partição da base de dados por tipo de fluxo
w = Window.partitionBy('flow')

#Filtra a base de dados pelo ano 2016
df_filter = df.filter("year = '2016'")

#Agrupa os dados por commodity e tipo de fluxo, somando as quantidades comercializadas
df_groupBy = df_filter.groupBy('commodity', 'flow').agg(sum('quantity').alias('sum_quantity'))

#Verifica a commodity mais comercializada, através da função max() na partição da base de dados
df_partition = df_groupBy.withColumn('max_quantity', max('sum_quantity').over(w))

#Filtrar somente os valores que correspondem a quantidade máxima em cada partição 
df_result = df_partition.where(col('max_quantity') == col('sum_quantity')).drop('sum_quantity')

df_result.show()

+--------------------+---------+-----------------+
|           commodity|     flow|     max_quantity|
+--------------------+---------+-----------------+
|Ice, snow and pot...|   Export| 6.99847368575E11|
|Iron ore, concent...|   Import|1.063049523354E12|
|Safety razor blad...|Re-Export|       1.261968E9|
|Chem wood pulp, s...|Re-Import|      3.8774873E7|
+--------------------+---------+-----------------+

