# Membros do Grupo
* Daniele Montenegro da Silva Barros
* Rodrigo Dantas da Silva
* Thiago Bruschi Martins


# Dados de Entrada

*   [Pacote de dados](https://drive.google.com/drive/folders/1YT8iYFBM9rCHWW0LBx1pVVckbeBANzC9?usp=sharing)
  *   Selecione "Adicionar ao Drive"
*   [Descrição dos dados](https://drive.google.com/file/d/1wu1yfR1A0A0Tl8Jl1ZNNuowr88YEFtXS/view?usp=sharing)






In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Instalação de pacotes

In [2]:
!wget -q https://downloads.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install findspark pyspark 

Collecting findspark
  Downloading findspark-1.4.2-py2.py3-none-any.whl (4.2 kB)
Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 55.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=42af7024b81ef65fd84471c21389b87e619f3f75e6aeb7e5a4b0ce8663227f81
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark, findspark
Successfully installed findspark-1.4.2 py4j-0.10.9.2 pyspark-3.2.0


# Preparação do ambiente

In [3]:
%env PYTHONHASHSEED=1234
%env JAVA_HOME=/usr/lib/jvm/default-java
%env SPARK_HOME=/content/spark-3.2.0-bin-hadoop3.2

env: PYTHONHASHSEED=1234
env: JAVA_HOME=/usr/lib/jvm/default-java
env: SPARK_HOME=/content/spark-3.2.0-bin-hadoop3.2


In [4]:
import findspark
findspark.init("/content/spark-3.2.0-bin-hadoop3.2")

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Window


from datetime import datetime

appName = 'Big Data'
master = 'local'

spark = SparkSession.builder     \
    .master(master) \
    .appName(appName) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# Leitura dos dados e criação de dataframes

In [6]:
def read_line(line) :
  tipo = line[0:2]
  if tipo == '01' :
    yield Row(data=datetime.strptime(line[2:10], '%Y%m%d'),
               codbdi=int(line[10:12]),
               codneg=line[12:24].strip(),
               tpmerc=int(line[24:27]),
               especi=line[39:49].replace('*', '').replace(' ',''),
               preult=float(line[108:121])/100)




# Implementação

In [7]:
# Modifique esta linha para incluir dados de outros anos na solução final
input_data = spark.sparkContext.textFile('/content/drive/My Drive/b3/COTAHIST*')
cotacoes = input_data.flatMap(read_line).toDF()

## Filtros

In [8]:
# Tipo de Registro é igual a 01 - Cotação Histórica --> Esse já foi feito na função read_line
# Código do Boletim Diário de Informações (BDI) é igual a 02 - Lote Padrão
# Tipo de Mercado é igual a 010 - Mercado à Vista
cotacoes_sel = cotacoes.filter((cotacoes.codbdi == 2) & (cotacoes.tpmerc == 10))

# Agora vamos selecionar o codigo das empresas que possuem 
# alguma cotacao com especificação diferente de ON e ONNM
cotacoes_dif = cotacoes_sel.filter((cotacoes_sel.especi != 'ON') & (cotacoes_sel.especi != 'ONNM')).select('codneg').distinct()

# Utilizando o join lef_anti, podemos filtrar quais cotacoes
# estao na lista anterior
cotacoes_ONNM = cotacoes_sel.join(cotacoes_dif, 'codneg', 'left_anti')
cotacoes_ONNM.show()

+------+-------------------+------+------+------+------+
|codneg|               data|codbdi|tpmerc|especi|preult|
+------+-------------------+------+------+------+------+
| ROS 3|1995-06-07 00:00:00|     2|    10|    ON|   1.1|
| ROS 3|1995-07-13 00:00:00|     2|    10|    ON|   1.1|
| ROS 3|1995-10-17 00:00:00|     2|    10|    ON|   1.0|
| SNS 3|1995-01-18 00:00:00|     2|    10|    ON| 650.0|
| SNS 3|1995-02-21 00:00:00|     2|    10|    ON| 500.0|
| SNS 3|1995-02-24 00:00:00|     2|    10|    ON| 750.0|
| SNS 3|1995-03-07 00:00:00|     2|    10|    ON| 400.0|
| SNS 3|1995-03-09 00:00:00|     2|    10|    ON| 400.0|
| SNS 3|1995-04-26 00:00:00|     2|    10|    ON| 450.0|
| SNS 3|1995-05-24 00:00:00|     2|    10|    ON| 450.0|
| SNS 3|1995-06-13 00:00:00|     2|    10|    ON| 400.0|
| SNS 3|1995-07-11 00:00:00|     2|    10|    ON| 430.0|
| ZAN 3|1995-01-13 00:00:00|     2|    10|    ON|  50.0|
| ZAN 3|1995-02-09 00:00:00|     2|    10|    ON|  45.0|
| ZAN 3|1995-02-16 00:00:00|   

## Janela

In [9]:
# Criando as janelas por empresas
from pyspark.sql import Window
w = Window.partitionBy('codneg').orderBy('data')

# Aplicando o lag para obter o valor do dia anterior
cotacoes_com_anterior = \
 cotacoes_ONNM.withColumn('preult_ant', lag('preult', 1).over(w))

## Volatilidade
Calculando a variacao e a volatilidade (desvio padrao da variacao)

In [10]:
cotacoes_com_var = cotacoes_com_anterior.withColumn('variacao', ((col('preult') / col('preult_ant')) - 1)*100)
cotacoes_com_volat = cotacoes_com_var.groupBy('codneg').agg(stddev_samp('variacao').alias('volat'))

# Resultado

In [11]:
cotacoes_final = cotacoes_com_volat.na.drop().select('codneg', round('volat',2).alias('volat'))
cotacoes_final.show()

+------+-------+
|codneg|  volat|
+------+-------+
| AERI3|   3.53|
| AESL3|12094.5|
| AGR 3|  40.18|
| AHE 3|   11.3|
| ALPK3|   1.79|
| ALSO3|   4.01|
| AMBP3|   2.12|
| AMCE3|  43.26|
| AMPI3|  33.99|
| AORE3|  10.89|
| ARA 3|  58.78|
| ARL 3|  49.04|
| ARP 3|  77.78|
| ARTE3|2861.94|
| ASS 3|  71.64|
| AVLL3|   2.76|
| BAN 3|  62.72|
| BAQU3|   7.78|
| BAR 3|    8.7|
| BBVT3|   8.36|
+------+-------+
only showing top 20 rows



In [13]:
# Dataframe completo
cotacoes_final.show(cotacoes_final.count())

+------+-------+
|codneg|  volat|
+------+-------+
| AERI3|   3.53|
| AESL3|12094.5|
| AGR 3|  40.18|
| AHE 3|   11.3|
| ALPK3|   1.79|
| ALSO3|   4.01|
| AMBP3|   2.12|
| AMCE3|  43.26|
| AMPI3|  33.99|
| AORE3|  10.89|
| ARA 3|  58.78|
| ARL 3|  49.04|
| ARP 3|  77.78|
| ARTE3|2861.94|
| ASS 3|  71.64|
| AVLL3|   2.76|
| BAN 3|  62.72|
| BAQU3|   7.78|
| BAR 3|    8.7|
| BBVT3|   8.36|
| BDE 3|  16.61|
| BDEP3|  47.15|
| BDL 3|  12.52|
| BEE 3|  22.71|
| BEG 3|  22.66|
| BER 3| 106.96|
| BES 3|   5.74|
| BHEQ3| 200.43|
| BIO 3|   8.69|
| BIV 3|  14.88|
| BME 3|  21.72|
| BMI 3|  28.78|
| BNET3|   14.8|
| BOAS3|   2.39|
| BPAR3|  367.9|
| BRD 3|  16.57|
| BSEG3|   0.71|
| BZN 3|   0.23|
| CAF 3|  42.32|
| CAFE3|  35.19|
| CAL 3|  10.98|
| CALI3|  30.51|
| CAMB3|   4.15|
| CASH3|   3.92|
| CBA 3|  31.95|
| CBM 3|    4.1|
| CBMA3|  19.44|
| CCHI3|  13.79|
| CDO 3|  16.35|
| CECI3|   5.74|
| CEE 6|   5.71|
| CEL 3|  22.61|
| CELM3|  45.51|
| CNF 3|    0.0|
| CNTO3|   4.25|
| COB 3|  14.4