**Aluno: Leonardo Ribeiro Schaedler**

# Atividade Spark SQL

Considerando o dataset detalhado a seguir, extraia o conjunto de informações solicitadas.

### Dataset dados da BOVESPA

- Dados relativos a bovespa, a bolsa de valores no Brasil
- ~1.3GB
- 8.1M de instâncias


| #  	| Nome do campo                                                             	| Descrição                                                                        	|
|----	|---------------------------------------------------------------------------	|----------------------------------------------------------------------------------	|
| 0  	| RegisterType                                                             	| Fixo '1'                                                                                 	|
| 1  	| TradingDate                                                              	| Data do pregão                                                                   	|
| 2  	| BDICode                                                                  	| Utilizado para classificar os papéis na emissão do boletim diário de informações 	|
| 3  	| NegociationCode                                                          	| Codigo de negociação do papel                                                    	|
| 4  	| MarketType                                                               	| Cód. Do mercado em que o papel está cadastrado                                   	|
| 5  	| TradeName                                                                	| Nome resumido da empresa emissora do papel                                       	|
| 6  	| Specification                                                             	| Especificação do Papel                                                           	|
| 7  	| ForwardMarketTermInDays                                               	| Prazo em dias do mercado a termo                                                 	|
| 8  	| Currency                                                                  	| Moeda de referência                                                              	|
| 9  	| OpeningPrice                                                             	| Preço de abertura do papel no pregão                                             	|
| 10 	| MaxPrice                                                                	| Preço máximo do papel no pregão                                                  	|
| 11 	| MinPrice                                                                	| Preço mínimo do papel no pregão                                                  	|
| 12 	| MeanPrice                                                                	| Preço médio do papel no pregão                                                   	|
| 13 	| LastTradePrice                                                          	| Preço do último negócio do papel no pregão                                       	|
| 14 	| BestPurshaseOrderPrice                                                 	| Preço da melhor oferta de compra do papel no mercado                             	|
| 15 	| BestPurshaseSalePrice                                                  	| Preço da melhor oferta de venda do papel no mercado                              	|
| 16 	| NumborOfTrades                                                          	| Número de negócios efetuados com o papel no pregão                               	|
| 17 	| NumberOfTradedStocks                                                   	| Quantidade total de títulos negociados neste papel                               	|
| 18 	| VolumeOfTradedStocks                                                   	| Volume total de títulos negociados neste papel                                   	|
| 19 	| PriceForOptionsMarketOrSecondaryTermMarket                         	| Preço de exercício para o mercado de opções ou valor do contrato para o mercado  	|
| 20 	| PriceCorrectionsForOptionsMarketOrSecondaryTermMarket             	| Indicador de correção de preços de exercícios ou valores de contrato             	|
| 21 	| DueDateForOptionsMarketOrSecondaryTermMarket                      	| Data do vencimento para os mercados de opções                                    	|
| 22 	| FactorOfPaperQuotatuion                                                	| Fator de cotação do papel                                                        	|
| 23 	| PointsInPriceForOptionsMarketReferencedInDollarOrSecondaryTerm 	| Preço de exercício em pontos para opções referenciadas em dólar                  	|
| 24 	| ISINOrInternCode                                                       	| Código do papel no sistema ISIN                                                  	|
| 25 	| DistributionNumber                                                       	| Número de distribuição do papel                                                  	|

Informações a serem extraídas:

1. Quantidade de dias com operações da PETR4 (NegociationCode)
2. Maior valor (MaxPrice) histórico por ação (NegociationCode)
3. Maior valor (MaxPrice) histórico da PETR4 (NegociationCode)
4. Dia ('TradingDate') com a maior quantidade de papeis (NegociationCode) operados
5. Dia ('TradingDate') da semana com a maior quantidade de papeis (NegociationCode) operados
6. Maior lucro histórico de um papel (NegociationCode) na bovespa (MaxPrice - OpeningPrice)
7. Maior prejuizo histórico de um papel (NegociationCode) na bovespa (OpeningPrice - LastTradePrice)
8. Moeda (Currency) com mais operações
9. Papel (NegociationCode) operado em CZ (Currency) com maior quantidade de operações
10. Papel (NegociationCode) operado em CZ (Currency) com maior valor médio das operações (MeanPrice)
11. Media do preço médio (MeanPrice), mínimo (MinPrice) e máximo (MaxPrice) anual (TradingDate) das ações da PETR4 (NegociationCode)
12. Preço médio (MeanPrice) anual (TradingDate) das ações da PETR4 (NegociationCode)
13. Preço médio (MeanPrice) anual (TradingDate) das 10 ações (NegociationCode) com mais operações na bovespa
14. Desvio Padrão anual do preço médio (MeanPrice) da ação da PETR4 (NegociationCode)
15. Desvio Padrão anual do preço médio (MeanPrice) das 10 ações (NegociationCode) com mais operações na bovespa
16. Preço médio (MeanPrice) anual (TradingDate) das ações (NegociationCode) com a maior quantidade de operações de acordo com a moeda (Currency)


**Dicas:**
- *Crie uma célula (Insert -> Insert Cell Below) para cada informação solicitada*

In [88]:
#instala pyspark no google colab
!pip install pyspark==3.3.1



In [89]:
# abre sessão no spark sql
from pyspark.sql import SparkSession

sc = SparkSession \
    .builder \
    .master('local[*]') \
    .getOrCreate()

In [90]:
#Efetua download do arquivo covid
!gdown 1FbkYKnij5N6A2P3VLgfMLYhoPSb4_wur

Downloading...
From: https://drive.google.com/uc?id=1FbkYKnij5N6A2P3VLgfMLYhoPSb4_wur
To: /content/bovespa.csv
100% 1.29G/1.29G [00:15<00:00, 83.7MB/s]


In [91]:
df = sc.read \
    .option('delimiter', ',') \
    .option('header', 'true') \
    .option('inferschema', 'true') \
    .csv('file:////content/bovespa.csv')

In [92]:
df.printSchema()

root
 |-- RegisterType: integer (nullable = true)
 |-- TradingDate: integer (nullable = true)
 |-- BDICode: double (nullable = true)
 |-- NegociationCode: string (nullable = true)
 |-- MarketType: integer (nullable = true)
 |-- TradeName: string (nullable = true)
 |-- Specification: string (nullable = true)
 |-- ForwardMarketTermInDays: string (nullable = true)
 |-- Currency: string (nullable = true)
 |-- OpeningPrice: double (nullable = true)
 |-- MaxPrice: double (nullable = true)
 |-- MinPrice: double (nullable = true)
 |-- MeanPrice: double (nullable = true)
 |-- LastTradePrice: double (nullable = true)
 |-- BestPurshaseOrderPrice: double (nullable = true)
 |-- BestPurshaseSalePrice: double (nullable = true)
 |-- NumborOfTrades: double (nullable = true)
 |-- NumberOfTradedStocks: double (nullable = true)
 |-- VolumeOfTradedStocks: double (nullable = true)
 |-- PriceForOptionsMarketOrSecondaryTermMarket: double (nullable = true)
 |-- PriceCorrectionsForOptionsMarketOrSecondaryTermMa

In [93]:
df.createOrReplaceTempView('bovespa')

In [94]:
sc.sql('select tradingDate, NegociationCode from bovespa where NegociationCode=="PETR4"').show(10)

+-----------+---------------+
|tradingDate|NegociationCode|
+-----------+---------------+
|   19980316|          PETR4|
|   19980317|          PETR4|
|   19980318|          PETR4|
|   19980319|          PETR4|
|   19980320|          PETR4|
|   19980323|          PETR4|
|   19980324|          PETR4|
|   19980325|          PETR4|
|   19980326|          PETR4|
|   19980327|          PETR4|
+-----------+---------------+
only showing top 10 rows



In [95]:
df.select('TradingDate', "NegociationCode")\
  .where(df.NegociationCode == 'PETR4')\
  .show(10)

+-----------+---------------+
|TradingDate|NegociationCode|
+-----------+---------------+
|   19980316|          PETR4|
|   19980317|          PETR4|
|   19980318|          PETR4|
|   19980319|          PETR4|
|   19980320|          PETR4|
|   19980323|          PETR4|
|   19980324|          PETR4|
|   19980325|          PETR4|
|   19980326|          PETR4|
|   19980327|          PETR4|
+-----------+---------------+
only showing top 10 rows



In [96]:
import pyspark.sql.functions as func

df.select('TradingDate', "NegociationCode")\
  .where(df.TradingDate >= 20100000)\
  .groupBy(df.TradingDate)\
  .agg(func.count('TradingDate').alias('qt'))\
  .orderBy('qt', ascending=False)\
  .show(10)

+-----------+----+
|TradingDate|  qt|
+-----------+----+
|   20191216|4652|
|   20191212|4241|
|   20191213|4223|
|   20191118|4208|
|   20190819|4135|
|   20191114|4131|
|   20191021|3975|
|   20190916|3954|
|   20191113|3930|
|   20190715|3916|
+-----------+----+
only showing top 10 rows



In [97]:
#informacao 1
# Quantidade de dias com operações da PETR4 (NegociationCode)


In [98]:
sc.sql('select count(*) from bovespa where NegociationCode == "PETR4" group by NegociationCode').show(10)

+--------+
|count(1)|
+--------+
|    5391|
+--------+



In [99]:
df.select(df.NegociationCode)\
  .where(df.NegociationCode == 'PETR4')\
  .count()

5391

In [100]:
#informacao 2
# Maior valor (MaxPrice) histórico por ação (NegociationCode)

In [101]:
sc.sql('select NegociationCode, max(MaxPrice) from bovespa group by NegociationCode').show(10)

+---------------+-------------+
|NegociationCode|max(MaxPrice)|
+---------------+-------------+
|          FGO 4|     240000.0|
|          VAG 3|    5395600.0|
|          EST 1|      18000.0|
|          BIO 1|     160000.0|
|         OEA 10|        270.0|
|          OPC 9|       2000.0|
|         OBE 18|        200.0|
|         MAG 13|       6000.0|
|         OPU 23|        700.0|
|         OCZ 20|       9800.0|
+---------------+-------------+
only showing top 10 rows



In [102]:
df.select('NegociationCode', 'MaxPrice')\
  .groupBy('NegociationCode')\
  .agg(func.max('MaxPrice').alias('MaiorValor'))\
  .show(10)

+---------------+----------+
|NegociationCode|MaiorValor|
+---------------+----------+
|          FGO 4|  240000.0|
|          VAG 3| 5395600.0|
|          EST 1|   18000.0|
|          BIO 1|  160000.0|
|         OEA 10|     270.0|
|          OPC 9|    2000.0|
|         OBE 18|     200.0|
|         MAG 13|    6000.0|
|         OPU 23|     700.0|
|         OCZ 20|    9800.0|
+---------------+----------+
only showing top 10 rows



In [103]:
#informacao 3
# Maior valor (MaxPrice) histórico da PETR4 (NegociationCode)

In [104]:
df.select('NegociationCode', 'MaxPrice')\
  .where(df.NegociationCode == 'PETR4')\
  .groupBy('NegociationCode')\
  .agg(func.max('MaxPrice').alias('MaiorValor'))\
  .show(10)

+---------------+----------+
|NegociationCode|MaiorValor|
+---------------+----------+
|          PETR4|   52100.0|
+---------------+----------+



In [105]:
#informacao 4
# Dia ('TradingDate') com a maior quantidade de papeis (NegociationCode) operados

In [106]:
sc.sql('select TradingDate, count(*) as qt from bovespa group by TradingDate order by qt desc').show(10)

+-----------+----+
|TradingDate|  qt|
+-----------+----+
|   20191216|4652|
|   20191212|4241|
|   20191213|4223|
|   20191118|4208|
|   20190819|4135|
|   20191114|4131|
|   20191021|3975|
|   20190916|3954|
|   20191113|3930|
|   20190715|3916|
+-----------+----+
only showing top 10 rows



In [107]:
df.select('TradingDate')\
  .groupBy('TradingDate')\
  .agg(func.count('TradingDate').alias('qt'))\
  .orderBy('qt', ascending=False)\
  .show(10)

+-----------+----+
|TradingDate|  qt|
+-----------+----+
|   20191216|4652|
|   20191212|4241|
|   20191213|4223|
|   20191118|4208|
|   20190819|4135|
|   20191114|4131|
|   20191021|3975|
|   20190916|3954|
|   20191113|3930|
|   20190715|3916|
+-----------+----+
only showing top 10 rows



In [108]:
#informacao 5
# Dia ('TradingDate') da semana com a maior quantidade de papeis (NegociationCode) operados

In [109]:
import datetime

dia = '19860102'

dt = datetime.datetime.strptime(dia, '%Y%m%d')

dt.strftime('%A')

'Thursday'

In [110]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

func_udf = udf(lambda tradingDate: datetime.datetime.strptime(str(tradingDate), '%Y%m%d').strftime('%A'), StringType())

df.select('TradingDate')\
  .withColumn('diaSemana', func_udf('TradingDate'))\
  .groupBy(func.col('diaSemana'))\
  .agg(func.count('diaSemana').alias('qt'))\
  .orderBy(func.col('qt'), ascending=False)\
  .show(10)

+---------+-------+
|diaSemana|     qt|
+---------+-------+
|Wednesday|1688444|
|   Monday|1626494|
|  Tuesday|1613747|
|   Friday|1599650|
| Thursday|1597353|
| Saturday|      3|
+---------+-------+



In [111]:
#informacao 6
# Maior lucro histórico de um papel (NegociationCode) na bovespa (MaxPrice - OpeningPrice)

In [112]:
df.select('NegociationCode', 'OpeningPrice', 'MaxPrice') \
  .withColumn('Profit', func.col('MaxPrice') - func.col('OpeningPrice')) \
  .groupBy('NegociationCode') \
  .agg(func.max('Profit').alias('MaxProfit')) \
  .orderBy('MaxProfit', ascending=False) \
  .show(1)

+---------------+---------+
|NegociationCode|MaxProfit|
+---------------+---------+
|          ANT 3|  4.411E8|
+---------------+---------+
only showing top 1 row



In [113]:
#informacao 7
# Maior prejuizo histórico de um papel (NegociationCode) na bovespa (OpeningPrice - LastTradePrice)

In [114]:
df.select('NegociationCode', 'OpeningPrice', 'LastTradePrice') \
  .withColumn('Loss', func.col('OpeningPrice') - func.col('LastTradePrice')) \
  .groupBy('NegociationCode') \
  .agg(func.max('Loss').alias('MaxLoss')) \
  .orderBy('MaxLoss', ascending=False) \
  .show(1)

+---------------+-------+
|NegociationCode|MaxLoss|
+---------------+-------+
|          ANT 3| 1.45E8|
+---------------+-------+
only showing top 1 row



In [115]:
#informacao 8
# Moeda (Currency) com mais operações

In [116]:
df.select('Currency')\
  .groupBy('Currency')\
  .agg(func.count('Currency').alias('qt'))\
  .orderBy('qt', ascending=False)\
  .show(10)

+--------+-------+
|Currency|     qt|
+--------+-------+
|      R$|6995662|
|     CR$| 498562|
|     CZ$| 458842|
|    NCZ$| 172625|
+--------+-------+



In [117]:
#informacao 9
# Papel (NegociationCode) operado em CZ (Currency) com maior quantidade de operações

In [135]:
df.select('NegociationCode', 'Currency')\
  .where(df.Currency == 'CZ$')\
  .groupBy('NegociationCode')\
  .agg(func.count('NegociationCode').alias('qt'))\
  .orderBy('qt', ascending=False)\
  .show(1)

+---------------+----+
|NegociationCode|  qt|
+---------------+----+
|          PMA 2|2002|
+---------------+----+
only showing top 1 row



In [119]:
#informacao 10
# Papel (NegociationCode) operado em CZ (Currency) com maior valor médio das operações (MeanPrice)

In [120]:
df.select('NegociationCode', 'Currency', 'MeanPrice') \
  .where(df.Currency == 'CZ$') \
  .groupBy('NegociationCode') \
  .agg(func.max('MeanPrice').alias('MaxMeanPrice')) \
  .orderBy('MaxMeanPrice', ascending=False) \
  .show(1)

+---------------+------------+
|NegociationCode|MaxMeanPrice|
+---------------+------------+
|          TLS 6|    3.7975E7|
+---------------+------------+
only showing top 1 row



In [121]:
#informacao 11
# Media do preço médio (MeanPrice), mínimo (MinPrice) e máximo (MaxPrice) anual (TradingDate) das ações da PETR4 (NegociationCode)

In [122]:
sc.sql('select substring(TradingDate,0,4) as ano, avg(MeanPrice) as media, max(MaxPrice) as max, min(MinPrice) as min from bovespa\
  where NegociationCode == "PETR4"\
  group by ano').show(10)

+----+------------------+-------+------+
| ano|             media|    max|   min|
+----+------------------+-------+------+
|1999| 26273.98775510204|46500.0|7900.0|
|1998|20242.401015228428|29700.0|9000.0|
|2005| 8094.469879518072|13000.0|2925.0|
|2000|24186.137096774193|52100.0|4400.0|
|2002|4796.7389558232935| 6079.0|3440.0|
|2004| 8383.638554216868| 9800.0|6704.0|
|2001|  5297.70325203252| 6288.0|4460.0|
|2003|          5613.908| 7890.0|4091.0|
|2006|4344.0609756097565| 4996.0|3632.0|
|2008| 4806.373493975903| 8774.0|1675.0|
+----+------------------+-------+------+
only showing top 10 rows



In [123]:
df.select('MeanPrice', 'MinPrice', 'MaxPrice', 'TradingDate', 'NegociationCode')\
  .where(df.NegociationCode == 'PETR4')\
  .withColumn('ano', df.TradingDate[0:4])\
  .groupBy(func.col('ano'))\
  .agg(func.avg('MeanPrice').alias('media'),
    func.min('MinPrice').alias('minimo'),
    func.max('MaxPrice').alias('maximo'))\
  .show(10)

+----+------------------+------+-------+
| ano|             media|minimo| maximo|
+----+------------------+------+-------+
|1999| 26273.98775510204|7900.0|46500.0|
|1998|20242.401015228428|9000.0|29700.0|
|2005| 8094.469879518072|2925.0|13000.0|
|2000|24186.137096774193|4400.0|52100.0|
|2002|4796.7389558232935|3440.0| 6079.0|
|2004| 8383.638554216868|6704.0| 9800.0|
|2001|  5297.70325203252|4460.0| 6288.0|
|2003|          5613.908|4091.0| 7890.0|
|2006|4344.0609756097565|3632.0| 4996.0|
|2008| 4806.373493975903|1675.0| 8774.0|
+----+------------------+------+-------+
only showing top 10 rows



In [124]:
#informacao 12
# Preço médio (MeanPrice) anual (TradingDate) das ações da PETR4 (NegociationCode)

In [125]:
df.select('NegociationCode', 'TradingDate', 'MeanPrice') \
  .where(df.NegociationCode == 'PETR4') \
  .withColumn('Year', func.year(func.to_date('TradingDate', 'yyyyMMdd'))) \
  .groupBy('Year') \
  .agg(func.avg('MeanPrice').alias('AnnualMeanPrice')) \
  .orderBy('Year') \
  .show()

+----+------------------+
|Year|   AnnualMeanPrice|
+----+------------------+
|1998|20242.401015228428|
|1999| 26273.98775510204|
|2000|24186.137096774193|
|2001|  5297.70325203252|
|2002|4796.7389558232935|
|2003|          5613.908|
|2004| 8383.638554216868|
|2005| 8094.469879518072|
|2006|4344.0609756097565|
|2007| 5442.975510204082|
|2008| 4806.373493975903|
|2009| 3183.650406504065|
|2010| 2975.072874493927|
|2011|2369.9236947791164|
|2012| 2134.691056910569|
|2013|1825.3064516129032|
|2014|1669.7298387096773|
|2015| 980.4349593495934|
|2016|1077.4939759036145|
|2017| 1459.060975609756|
+----+------------------+
only showing top 20 rows



In [126]:
#informacao 13
# Preço médio (MeanPrice) anual (TradingDate) das 10 ações (NegociationCode) com mais operações na bovespa

In [127]:
top10 = df.select('NegociationCode')\
  .groupBy('NegociationCode')\
  .agg(func.count('*').alias('qt'))\
  .orderBy('qt', ascending=False)\
  .take(10)

lista10 = [x['NegociationCode'] for x in top10]

In [128]:
df.select('MeanPrice', 'TradingDate', 'NegociationCode')\
  .filter(func.col('NegociationCode').isin(lista10))\
  .withColumn('ano', df.TradingDate[0:4])\
  .groupBy('NegociationCode', 'ano')\
  .agg(func.avg('MeanPrice').alias('media'))\
  .show(10)

+---------------+----+------------------+
|NegociationCode| ano|             media|
+---------------+----+------------------+
|         VALE5T|1999|3489.3505154639174|
|         BBAS3T|1999| 697.3055555555555|
|         BBDC4T|1999| 925.7908163265306|
|         USIM5T|1999|506.05732484076435|
|         BBDC4T|1998|  872.018018018018|
|         CSNA3T|1998|2523.1486486486488|
|         PETR4T|1999|26129.480662983424|
|         BBAS3T|1998| 944.0075757575758|
|         CSNA3T|1999|4042.7311827956987|
|         PETR4T|1998| 20028.90404040404|
+---------------+----+------------------+
only showing top 10 rows



In [129]:
#informacao 14
# Desvio Padrão anual do preço médio (MeanPrice) da ação da PETR4 (NegociationCode)

In [130]:
df.select('NegociationCode', 'TradingDate', 'MeanPrice') \
  .where(df.NegociationCode == 'PETR4') \
  .withColumn('Year', func.year(func.to_date('TradingDate', 'yyyyMMdd'))) \
  .groupBy('Year') \
  .agg(func.stddev('MeanPrice').alias('AnnualStdDev')) \
  .orderBy('Year') \
  .show()

+----+------------------+
|Year|      AnnualStdDev|
+----+------------------+
|1998|5739.2884608935565|
|1999| 7692.556188216364|
|2000|20189.956356803512|
|2001| 333.4576639125791|
|2002| 613.8605232990918|
|2003| 863.8748535443933|
|2004| 757.8230426632923|
|2005|3377.0688879989852|
|2006|242.19193000733262|
|2007| 1200.445391820076|
|2008|2311.3643124656405|
|2009| 414.3029746894998|
|2010|392.22804616214563|
|2011|304.10310718438365|
|2012|202.65900303596553|
|2013|136.15365084480885|
|2014|319.84012263210207|
|2015|212.59195810379697|
|2016|379.52406656319016|
|2017|131.33322761191312|
+----+------------------+
only showing top 20 rows



In [131]:
#informacao 15
# Desvio Padrão anual do preço médio (MeanPrice) das 10 ações (NegociationCode) com mais operações na bovespa

In [132]:
top10_stocks = df.groupBy('NegociationCode') \
                 .agg(func.sum('NumborOfTrades').alias('TotalTrades')) \
                 .orderBy('TotalTrades', ascending=False) \
                 .limit(10) \
                 .select('NegociationCode')

filtered_df = df.join(top10_stocks, 'NegociationCode')

filtered_df.select('NegociationCode', 'TradingDate', 'MeanPrice') \
           .withColumn('Year', func.year(func.to_date('TradingDate', 'yyyyMMdd'))) \
           .groupBy('NegociationCode', 'Year') \
           .agg(func.stddev('MeanPrice').alias('AnnualStdDev')) \
           .orderBy('NegociationCode', 'Year') \
           .show()

+---------------+----+------------------+
|NegociationCode|Year|      AnnualStdDev|
+---------------+----+------------------+
|          BBAS3|1998| 191.0232089288056|
|          BBAS3|1999| 71.34827969702083|
|          BBAS3|2000| 75.82122753206457|
|          BBAS3|2001|  97.5732118997873|
|          BBAS3|2002|147.50261524927728|
|          BBAS3|2003|433.52504457423527|
|          BBAS3|2004| 385.5920887080102|
|          BBAS3|2005| 507.4842826373836|
|          BBAS3|2006| 533.7911685739903|
|          BBAS3|2007|2075.1843619944507|
|          BBAS3|2008| 557.3112212675186|
|          BBAS3|2009| 613.3250335246126|
|          BBAS3|2010|222.82637293543215|
|          BBAS3|2011|240.25052354303372|
|          BBAS3|2012|  276.610203947405|
|          BBAS3|2013|208.67184429609298|
|          BBAS3|2014| 387.4057204353948|
|          BBAS3|2015| 339.7799999006763|
|          BBAS3|2016|468.92560147381585|
|          BBAS3|2017|277.91281758768434|
+---------------+----+------------

In [133]:
#informacao 16
# Preço médio (MeanPrice) anual (TradingDate) das ações (NegociationCode) com a maior quantidade de operações de acordo com a moeda (Currency)

In [134]:
from pyspark.sql.window import Window

windowSpec = Window.partitionBy('Currency').orderBy(func.desc('TotalTrades'))

most_traded_stocks = df.groupBy('NegociationCode', 'Currency') \
                       .agg(func.sum('NumborOfTrades').alias('TotalTrades')) \
                       .withColumn('rank', func.rank().over(windowSpec)) \
                       .filter(func.col('rank') == 1) \
                       .select('NegociationCode', 'Currency')

df.join(most_traded_stocks, ['NegociationCode', 'Currency']) \
  .select('NegociationCode', 'Currency', 'TradingDate', 'MeanPrice') \
  .withColumn('Year', func.year(func.to_date('TradingDate', 'yyyyMMdd'))) \
  .groupBy('Currency', 'Year', 'NegociationCode') \
  .agg(func.avg('MeanPrice').alias('AnnualMeanPrice')) \
  .orderBy('Currency', 'Year', 'NegociationCode') \
  .show()

+--------+----+---------------+------------------+
|Currency|Year|NegociationCode|   AnnualMeanPrice|
+--------+----+---------------+------------------+
|     CR$|1990|          TEL 4|24479.032786885247|
|     CR$|1991|          TEL 4| 282141.6230636833|
|     CR$|1992|          TEL 4|232933.65988909427|
|     CR$|1993|          TEL 4| 286500.4512195122|
|     CR$|1994|          TEL 4|  4429.55785123967|
|     CZ$|1986|          PMA 2|1718.6313672922251|
|     CZ$|1987|          PMA 2|1507.8791018998272|
|     CZ$|1988|          PMA 2| 4754.275229357798|
|     CZ$|1989|          PMA 2|16335.695652173914|
|    NCZ$|1989|          PMA 2|41372.546845124285|
|    NCZ$|1990|          PMA 2| 62042.07608695652|
|      R$|1998|          PETR4|20242.401015228428|
|      R$|1999|          PETR4| 26273.98775510204|
|      R$|2000|          PETR4|24186.137096774193|
|      R$|2001|          PETR4|  5297.70325203252|
|      R$|2002|          PETR4|4796.7389558232935|
|      R$|2003|          PETR4|