<a href="https://colab.research.google.com/github/esuda/mineiracao_dados_complexos/blob/master/Big_Data_010_Trabalho_3_B3_Spark_Template.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Dados de Entrada

*   [Pacote de dados](https://tinyurl.com/bd009-b3)
  *   Selecione "Adicionar ao Drive"
*   [Descrição dos dados](https://drive.google.com/file/d/1wu1yfR1A0A0Tl8Jl1ZNNuowr88YEFtXS/view?usp=sharing)






In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Instalação de pacotes

In [2]:
!wget -q https://downloads.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install findspark pyspark 



# Preparação do ambiente

In [3]:
%env PYTHONHASHSEED=1234
%env JAVA_HOME=/usr/lib/jvm/default-java
%env SPARK_HOME=/content/spark-3.2.0-bin-hadoop3.2

env: PYTHONHASHSEED=1234
env: JAVA_HOME=/usr/lib/jvm/default-java
env: SPARK_HOME=/content/spark-3.2.0-bin-hadoop3.2


In [4]:
import findspark
findspark.init("/content/spark-3.2.0-bin-hadoop3.2")

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Window


from datetime import datetime

appName = 'Big Data'
master = 'local'

spark = SparkSession.builder     \
    .master(master) \
    .appName(appName) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# Leitura dos dados e criação de dataframes

In [6]:
def read_line(line) :
  tipo = line[0:2]
  if tipo == '01' :
    yield Row(tipreg=int(line[0:2]),
               data=datetime.strptime(line[2:10], '%Y%m%d'),
               codbdi=int(line[10:12]),
               codneg=line[12:24].strip(),
               tpmerc=int(line[24:27]),
               especi=line[39:49].replace('*', '').replace(' ',''),
               preult=float(line[108:121])/100)




In [7]:
# Modifique esta linha para incluir dados de outros anos na solução final

input_data = spark.sparkContext.textFile('/content/drive/My Drive/b3/COTAHIST*2012*')

cotacoes = input_data.flatMap(read_line).toDF()
cotacoes.printSchema()
cotacoes.show()


root
 |-- tipreg: long (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- codbdi: long (nullable = true)
 |-- codneg: string (nullable = true)
 |-- tpmerc: long (nullable = true)
 |-- especi: string (nullable = true)
 |-- preult: double (nullable = true)

+------+-------------------+------+-------+------+------+------+
|tipreg|               data|codbdi| codneg|tpmerc|especi|preult|
+------+-------------------+------+-------+------+------+------+
|     1|2012-01-02 00:00:00|     2|  ABCB4|    10|PNEJN2| 11.85|
|     1|2012-01-02 00:00:00|    96| ABCB4F|    20|PNEJN2| 11.61|
|     1|2012-01-02 00:00:00|    12| ABCP11|    10|  CIER|   8.0|
|     1|2012-01-02 00:00:00|     2| ABRE11|    10| UNTN2| 20.85|
|     1|2012-01-02 00:00:00|    96|ABRE11F|    20| UNTN2| 20.55|
|     1|2012-01-02 00:00:00|     2|  AEDU3|    10|  ONNM| 20.46|
|     1|2012-01-02 00:00:00|    96| AEDU3F|    20|  ONNM|  20.3|
|     1|2012-01-02 00:00:00|    12| AEFI11|    10|  CIER| 100.8|
|     1|2012-01-02

# Implementação

## Retirando os registros com NA

In [8]:
cotacoes = cotacoes.na.drop()

## Primeiro Filtro
* Tipo Registro = 1 (Cotacao Historica)
* Codigo BDI = 02 (Lote Padrao)
* Tipo de Mercado = 010 (Mercado a Vista)

In [25]:
cotacoes.printSchema()

root
 |-- tipreg: long (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- codbdi: long (nullable = true)
 |-- codneg: string (nullable = true)
 |-- tpmerc: long (nullable = true)
 |-- especi: string (nullable = true)
 |-- preult: double (nullable = true)



In [9]:
cotacoes_aux_01 = cotacoes.filter(cotacoes.tipreg == 1)\
                          .filter(cotacoes.codbdi == 2)\
                          .filter(cotacoes.tpmerc == 10)

In [27]:
cotacoes_aux_01.show()

+------+-------------------+------+------+------+------+------+
|tipreg|               data|codbdi|codneg|tpmerc|especi|preult|
+------+-------------------+------+------+------+------+------+
|     1|2012-01-02 00:00:00|     2| ABCB4|    10|PNEJN2| 11.85|
|     1|2012-01-02 00:00:00|     2|ABRE11|    10| UNTN2| 20.85|
|     1|2012-01-02 00:00:00|     2| AEDU3|    10|  ONNM| 20.46|
|     1|2012-01-02 00:00:00|     2| AELP3|    10|    ON|  45.5|
|     1|2012-01-02 00:00:00|     2| AFLT3|    10|    ON|   3.5|
|     1|2012-01-02 00:00:00|     2|AGEN11|    10|   DR3|  0.15|
|     1|2012-01-02 00:00:00|     2| AGRO3|    10|  ONNM|  9.74|
|     1|2012-01-02 00:00:00|     2| ALLL3|    10|  ONNM|  9.35|
|     1|2012-01-02 00:00:00|     2| ALPA4|    10|  PNN1| 12.02|
|     1|2012-01-02 00:00:00|     2| ALSC3|    10|  ONNM| 14.15|
|     1|2012-01-02 00:00:00|     2| AMAR3|    10|  ONNM| 17.69|
|     1|2012-01-02 00:00:00|     2| AMBV3|    10|    ON|  54.4|
|     1|2012-01-02 00:00:00|     2| AMBV

## Segundo Filtro
* Apenas acoes ordinarias normativas ou ordinarias normativas novo mercado (retirar acoes que tenham especificacoes mistas);
* Desconsiderar acoes com especificacao igual " " ou "*"

In [10]:
cotacoes_nao_on = cotacoes_aux_01.filter((col("especi") != "ON") & (col("especi") != "ONNM"))\
                          .select(col("codneg").alias("codneg_nao_on"),
                                  expr("1").alias("flag_nao_on"))\
                          .distinct()

In [29]:
cotacoes_nao_on.show()

+-------------+-----------+
|codneg_nao_on|flag_nao_on|
+-------------+-----------+
|      SLBG11B|          1|
|        ITEC3|          1|
|        FRAS3|          1|
|        EUCA3|          1|
|        SHOW3|          1|
|        BRSR5|          1|
|        CTAX4|          1|
|        TNLP4|          1|
|      MRCK11B|          1|
|        BRML3|          1|
|        HOOT4|          1|
|        ENGI4|          1|
|        NETC3|          1|
|        DUQE4|          1|
|        TCSA3|          1|
|        ITSA4|          1|
|        ODPV3|          1|
|      AMZO11B|          1|
|        MWET4|          1|
|        CTNM4|          1|
+-------------+-----------+
only showing top 20 rows



In [11]:
cotacoes_aux_02 = cotacoes_aux_01.join(cotacoes_nao_on, 
                                       on=[cotacoes_aux_01.codneg == cotacoes_nao_on.codneg_nao_on], 
                                       how="left")

In [12]:
cotacoes_aux_02.filter("flag_nao_on = 1 and (especi = 'ON' or especi = 'ONNM')").select("codneg").distinct().show()

+------+
|codneg|
+------+
| MILS3|
| LIGT3|
| HRTP3|
| MYPK3|
| ENBR3|
| MAGG3|
| RSID3|
| RJCP3|
| OHLB3|
| LUPA3|
| RDNI3|
| TCNO3|
| SBSP3|
| ETER3|
| BRGE3|
| GRND3|
| MRVE3|
| GETI3|
| ARZZ3|
| LPSB3|
+------+
only showing top 20 rows



In [32]:
cotacoes_aux_02.filter("codneg = 'AFLT3'").select("especi").distinct().show()

+------+
|especi|
+------+
|  ONED|
|    ON|
+------+



In [13]:
cotacoes_invalidas = cotacoes_aux_02.filter(col("especi").isin("", " ", "*"))\
                          .select(col("codneg").alias("codneg_invalido"),
                                  expr("1").alias("flag_invalido"))\
                          .distinct()

In [14]:
# Para 2012 nao ha acoes com especificacao igaul a " " ou "*"
# mas manteremos o cruzamento para os outros anos
cotacoes_invalidas.show()

+---------------+-------------+
|codneg_invalido|flag_invalido|
+---------------+-------------+
+---------------+-------------+



In [15]:
cotacoes_aux_03 = cotacoes_aux_02.join(cotacoes_invalidas, 
                                       on=[cotacoes_aux_02.codneg == cotacoes_invalidas.codneg_invalido], 
                                       how="left")

In [36]:
cotacoes_aux_03.show()

+------+-------------------+------+------+------+------+------+-------------+-----------+---------------+-------------+
|tipreg|               data|codbdi|codneg|tpmerc|especi|preult|codneg_nao_on|flag_nao_on|codneg_invalido|flag_invalido|
+------+-------------------+------+------+------+------+------+-------------+-----------+---------------+-------------+
|     1|2012-01-02 00:00:00|     2| AMBV4|    10|    PN|  67.6|        AMBV4|          1|           null|         null|
|     1|2012-01-02 00:00:00|     2| ARZZ3|    10|ONEJNM|  23.7|        ARZZ3|          1|           null|         null|
|     1|2012-01-02 00:00:00|     2| AGRO3|    10|  ONNM|  9.74|         null|       null|           null|         null|
|     1|2012-01-02 00:00:00|     2| ALLL3|    10|  ONNM|  9.35|        ALLL3|          1|           null|         null|
|     1|2012-01-02 00:00:00|     2| AFLT3|    10|    ON|   3.5|        AFLT3|          1|           null|         null|
|     1|2012-01-02 00:00:00|     2| BBDC

In [16]:
cotacoes_aux_04 = cotacoes_aux_03.filter("flag_nao_on is null and flag_invalido is null")\
                        .drop("codneg_nao_on", "flag_nao_on", "codneg_invalido", "flag_invalido")

In [38]:
cotacoes_aux_04.select("codneg").distinct().show()

+------+
|codneg|
+------+
| IENG3|
| POSI3|
| CMGR3|
| AORE3|
| RNAR3|
| BNBR3|
| GSHP3|
| VULC3|
| SOND3|
| PTNT3|
| RHDS3|
| DTCY3|
| LUXM3|
| MLFT3|
| AGRO3|
| VSPT3|
| LFFE3|
| GLOB3|
| AMCE3|
| SULT3|
+------+
only showing top 20 rows



## Calculo da volatilidade dos papeis

In [17]:
janela_cotacao = Window.partitionBy('codneg').orderBy("data")

cotacoes_aux_04 = cotacoes_aux_04.withColumn("preult_ant", lag("preult", 1).over(janela_cotacao))\
                                  .withColumn("dif_rel", (col("preult")/col("preult_ant"))-1)\
                                  .filter("dif_rel is not null")


In [51]:
cotacoes_aux_04.show()

+------+-------------------+------+------+------+------+------+----------+--------------------+
|tipreg|               data|codbdi|codneg|tpmerc|especi|preult|preult_ant|             dif_rel|
+------+-------------------+------+------+------+------+------+----------+--------------------+
|     1|2012-02-01 00:00:00|     2| AFLU3|    10|    ON|   5.0|      4.93|0.014198782961460488|
|     1|2012-02-13 00:00:00|     2| AFLU3|    10|    ON|  5.27|       5.0|0.053999999999999826|
|     1|2012-02-14 00:00:00|     2| AFLU3|    10|    ON|   5.0|      5.27|-0.05123339658444015|
|     1|2012-02-17 00:00:00|     2| AFLU3|    10|    ON|   5.0|       5.0|                 0.0|
|     1|2012-03-01 00:00:00|     2| AFLU3|    10|    ON|   5.0|       5.0|                 0.0|
|     1|2012-03-13 00:00:00|     2| AFLU3|    10|    ON|   5.2|       5.0|0.040000000000000036|
|     1|2012-03-14 00:00:00|     2| AFLU3|    10|    ON|   5.2|       5.2|                 0.0|
|     1|2012-05-09 00:00:00|     2| AFLU

In [48]:
cotacoes.filter("codneg = 'AFLU3'").sort("data", "dif_rel").show()

+------+-------------------+------+------+------+------+------+
|tipreg|               data|codbdi|codneg|tpmerc|especi|preult|
+------+-------------------+------+------+------+------+------+
|     1|2012-01-19 00:00:00|     2| AFLU3|    10|    ON|  4.93|
|     1|2012-02-01 00:00:00|     2| AFLU3|    10|    ON|   5.0|
|     1|2012-02-13 00:00:00|     2| AFLU3|    10|    ON|  5.27|
|     1|2012-02-14 00:00:00|     2| AFLU3|    10|    ON|   5.0|
|     1|2012-02-17 00:00:00|     2| AFLU3|    10|    ON|   5.0|
|     1|2012-03-01 00:00:00|     2| AFLU3|    10|    ON|   5.0|
|     1|2012-03-13 00:00:00|     2| AFLU3|    10|    ON|   5.2|
|     1|2012-03-14 00:00:00|     2| AFLU3|    10|    ON|   5.2|
|     1|2012-05-09 00:00:00|     2| AFLU3|    10|    ON|  4.83|
|     1|2012-06-11 00:00:00|     2| AFLU3|    10|    ON|  4.61|
|     1|2012-06-29 00:00:00|     2| AFLU3|    10|    ON|   6.1|
|     1|2012-08-06 00:00:00|     2| AFLU3|    10|    ON|  6.15|
|     1|2012-08-16 00:00:00|     2| AFLU

In [18]:
cotacoes_var = cotacoes_aux_04.select("codneg", "dif_rel")\
                              .groupby("codneg")\
                              .agg(stddev("dif_rel"))

In [19]:
cotacoes_var.show(cotacoes.count())

+------+--------------------+
|codneg|stddev_samp(dif_rel)|
+------+--------------------+
| AFLU3| 0.08087051237373724|
| AGRO3| 0.02208735046287768|
| AHEB3|                null|
| AMCE3|   0.255495562335909|
| AMPI3| 0.02800414677881901|
| AORE3| 0.09888682662996408|
| ARTR3|0.014347739490924773|
| BHGR3| 0.01772160841885122|
| BIOM3|  0.1617974325481534|
| BNBR3| 0.05031393685985516|
| BTOW3| 0.04113677097185169|
| BTTL3| 0.05684431029345428|
| CAFE3| 0.16476368593970986|
| CASN3|  0.3559851823321133|
| CBMA3| 0.08516947123388358|
| CCIM3| 0.01953387259244671|
| CCXC3|0.042165459296822375|
| CEBR3| 0.09727952050060319|
| CEGR3| 0.11031016645119282|
| CEPE3|                null|
| CIQU3|0.045051245625291336|
| CMGR3| 0.07378962369446043|
| CSAB3|0.002216187049174592|
| CTKA3| 0.39964237257248564|
| CTNM3| 0.05858669222045167|
| CTPC3|                 0.0|
| DOHL3|                null|
| DTCY3|0.052163333954856735|
| EBTP3| 0.03692121318602152|
| ECPR3|  0.2572178996119165|
| FHER3|0.

In [22]:
cotacoes_aux_04.filter("codneg in ('MERC3', 'MWET3', 'CEPE3', 'DOHL3')").sort("codneg", "data").show()

+------+-------------------+------+------+------+------+------+----------+--------------------+
|tipreg|               data|codbdi|codneg|tpmerc|especi|preult|preult_ant|             dif_rel|
+------+-------------------+------+------+------+------+------+----------+--------------------+
|     1|2012-09-21 00:00:00|     2| CEPE3|    10|    ON| 24.53|      20.0| 0.22650000000000015|
|     1|2012-06-12 00:00:00|     2| DOHL3|    10|    ON|   3.0|      2.01|  0.4925373134328359|
|     1|2012-06-21 00:00:00|     2| MERC3|    10|    ON|   6.5|       6.5|                 0.0|
|     1|2012-08-06 00:00:00|     2| MERC3|    10|    ON|   6.5|       6.5|                 0.0|
|     1|2012-04-16 00:00:00|     2| MWET3|    10|    ON|   4.0|      3.99|0.002506265664160...|
+------+-------------------+------+------+------+------+------+----------+--------------------+



## --------- Ate Aqui Feito -----------

In [9]:
cotacoes_on = cotacoes.filter("especi == 'ON' or especi == 'ONNM'")
candidate_names = cotacoes_on.select("codneg").distinct().rdd.flatMap(lambda x: x).collect()


In [None]:
cotacoes.filter(cotacoes.codneg.isin(candidate_names)).show(20)

+-------------------+------+------+------+-------+------+
|               data|codbdi|codneg|tpmerc| especi|preult|
+-------------------+------+------+------+-------+------+
|2020-01-02 00:00:00|     2| AALR3|    10|   ONNM|  19.0|
|2020-01-02 00:00:00|     2| ABEV3|    10|   ONEJ|  19.2|
|2020-01-02 00:00:00|     2| ADHM3|    10|     ON|  2.48|
|2020-01-02 00:00:00|     2| AFLT3|    10|     ON| 11.23|
|2020-01-02 00:00:00|     2| AGRO3|    10|   ONNM| 19.25|
|2020-01-02 00:00:00|     2| ALSO3|    10|   ONNM| 50.01|
|2020-01-02 00:00:00|     2| AMAR3|    10|   ONNM| 13.46|
|2020-01-02 00:00:00|     2| ANIM3|    10|   ONNM|  29.6|
|2020-01-02 00:00:00|     2| APER3|    10|   ONNM|  30.7|
|2020-01-02 00:00:00|     2| ARZZ3|    10|   ONNM| 63.91|
|2020-01-02 00:00:00|     2| ATOM3|    10|     ON|  1.72|
|2020-01-02 00:00:00|     2| B3SA3|    10|ONEDJNM| 45.11|
|2020-01-02 00:00:00|     2| BAZA3|    10|     ON|  39.0|
|2020-01-02 00:00:00|     2| BBAS3|    10|   ONNM|  53.8|
|2020-01-02 00

In [None]:
print(*sorted(candidate_names))

AALR3 AALR3F AALR3T ABEV3 ABEV3F ABEV3T ABEVA100 ABEVA105 ABEVA105E ABEVA110 ABEVA110E ABEVA115 ABEVA120 ABEVA120E ABEVA125 ABEVA130 ABEVA132 ABEVA135 ABEVA135E ABEVA140 ABEVA142 ABEVA142E ABEVA145 ABEVA147 ABEVA147E ABEVA150 ABEVA152 ABEVA152E ABEVA155 ABEVA157 ABEVA157E ABEVA15E ABEVA160 ABEVA162 ABEVA162E ABEVA165 ABEVA167 ABEVA167E ABEVA17 ABEVA170 ABEVA172 ABEVA172E ABEVA175 ABEVA17E ABEVA18 ABEVA180 ABEVA182 ABEVA182E ABEVA185 ABEVA187 ABEVA187E ABEVA18E ABEVA19 ABEVA190 ABEVA192 ABEVA192E ABEVA195 ABEVA197 ABEVA19E ABEVA200 ABEVA202 ABEVA205 ABEVA210 ABEVA220 ABEVA23 ABEVA47 ABEVA47E ABEVA48 ABEVA48E ABEVA51 ABEVA52 ABEVA53 ABEVA980 ABEVA980E ABEVB125 ABEVB132 ABEVB135 ABEVB140 ABEVB145 ABEVB147 ABEVB150 ABEVB155 ABEVB160 ABEVB165 ABEVB17 ABEVB170 ABEVB175 ABEVB17E ABEVB18 ABEVB180 ABEVB185 ABEVB19 ABEVB190 ABEVB195 ABEVB198 ABEVB20 ABEVB21 ABEVB212 ABEVB22 ABEVB23 ABEVB46 ABEVB46E ABEVB47 ABEVB50 ABEVB51 ABEVB53 ABEVB86 ABEVB86E ABEVB87 ABEVB92 ABEVC127 ABEVC13 ABEVC137 ABEVC14

In [None]:
candidate_types = cotacoes.select("especi").distinct()

Py4JError: ignored