<a href="https://colab.research.google.com/github/vivirocha/Bootcamp_DataScience/blob/main/DesenvolvendoSolu%C3%A7%C3%B5esUtilizandoApacheSpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Bootcamp Cientista de Dados - IGTI**

Trabalho 1 - Módulo 2

*Desenvolvendo Soluções Utilizando Apache Spark*


**Enunciado** <br> 
Dados do mercado financeiro são interessantes e ricos: cada ação negociada na bolsa de valores tem um preço que varia a cada dia. Você foi contratado como cientista de dados de uma empresa de Wall Street para criar modelos preditivos que, a partir da variação diária do preço das ações, consigam subsidiar e melhorar decisões de compra e venda de ações. Você disse que, como todo bom cientista de dados, gostaria de explorar os dados para entender suas características antes de criar qualquer modelo preditivo. <br>

Os dados estão disponíveis em https://www.kaggle.com/camnugent/sandp500/ por meio do arquivo all_stocks_5yr.csv. O arquivo contém, para cada dia e ação do S&P 500 (lista de 500 maiores empresas americanas), os seguintes dados: <br> 
● Date - no formato yy-mm-dd <br>
● Open - Preço da ação na abertura do mercado no dia, em dólares. <br>
● High - Maior preço alcançado naquele dia. <br>
● Low - Menor preço alcançado naquele dia. <br>
● Close - Preço da ação no fechamento do mercado no dia. <br>
● Volume - Número de ações vendidas / compradas.<br>
● Name - O nome da ação. <br>

Apesar do volume de dados ser pequeno, você decidiu usar o Apache Spark para processar os dados para aprender a ferramenta, e tendo em vista que a sua empresa disse que, em breve, obterá dados por minuto, e não por dia, e de todas as ações do planeta, não apenas dos Estados Unidos. Neste caso, uma ferramenta desenhada para lidar com big data será necessária, e você já quer estar com o código pronto.


In [69]:
#Instalando spark
!pip install spark



In [70]:
#Instalando pyspark
!pip install pyspark



In [71]:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName('SolucoesSpark') \
                    .getOrCreate()

In [72]:
#Lendo o arquivo .csv e criando um dataframe dele.
df = spark.read.csv('all_stocks_5yr.csv', sep=',', header=True)

Para visualizarmos os dados do nosso dataframe, utilizaremos o comando **.show()**.

In [73]:
df.show(truncate=False)

+----------+-----+-----+-----+-----+--------+----+
|date      |open |high |low  |close|volume  |Name|
+----------+-----+-----+-----+-----+--------+----+
|2013-02-08|15.07|15.12|14.63|14.75|8407500 |AAL |
|2013-02-11|14.89|15.01|14.26|14.46|8882000 |AAL |
|2013-02-12|14.45|14.51|14.1 |14.27|8126000 |AAL |
|2013-02-13|14.3 |14.94|14.25|14.66|10259500|AAL |
|2013-02-14|14.94|14.96|13.16|13.99|31879900|AAL |
|2013-02-15|13.93|14.61|13.93|14.5 |15628000|AAL |
|2013-02-19|14.33|14.56|14.08|14.26|11354400|AAL |
|2013-02-20|14.17|14.26|13.15|13.33|14725200|AAL |
|2013-02-21|13.62|13.95|12.9 |13.37|11922100|AAL |
|2013-02-22|13.57|13.6 |13.21|13.57|6071400 |AAL |
|2013-02-25|13.6 |13.76|13.0 |13.02|7186400 |AAL |
|2013-02-26|13.14|13.42|12.7 |13.26|9419000 |AAL |
|2013-02-27|13.28|13.62|13.18|13.41|7390500 |AAL |
|2013-02-28|13.49|13.63|13.39|13.43|6143600 |AAL |
|2013-03-01|13.37|13.95|13.32|13.61|7376800 |AAL |
|2013-03-04|13.5 |14.07|13.47|13.9 |8174800 |AAL |
|2013-03-05|14.01|14.05|13.71|1

In [76]:
df.printSchema()


root
 |-- date: string (nullable = true)
 |-- open: string (nullable = true)
 |-- high: string (nullable = true)
 |-- low: string (nullable = true)
 |-- close: string (nullable = true)
 |-- volume: string (nullable = true)
 |-- Name: string (nullable = true)



Iremos renomear as colunas. <br>
Para renomearmos as colunas, usaremos o comando**.select(col('old_name').alias('new_name'))**. 

In [77]:
#Importando col a partir do pyspark.sql.functions
from pyspark.sql.functions import col

In [78]:
df = df.select(col("date").alias("data"), col("open").alias("abertura"), col("high").alias('maxima'), col('low').alias('minima'), col('close').alias('fechamento'), col('volume').alias('volume'), col('Name').alias('nome'))
df.show()

+----------+--------+------+------+----------+--------+----+
|      data|abertura|maxima|minima|fechamento|  volume|nome|
+----------+--------+------+------+----------+--------+----+
|2013-02-08|   15.07| 15.12| 14.63|     14.75| 8407500| AAL|
|2013-02-11|   14.89| 15.01| 14.26|     14.46| 8882000| AAL|
|2013-02-12|   14.45| 14.51|  14.1|     14.27| 8126000| AAL|
|2013-02-13|    14.3| 14.94| 14.25|     14.66|10259500| AAL|
|2013-02-14|   14.94| 14.96| 13.16|     13.99|31879900| AAL|
|2013-02-15|   13.93| 14.61| 13.93|      14.5|15628000| AAL|
|2013-02-19|   14.33| 14.56| 14.08|     14.26|11354400| AAL|
|2013-02-20|   14.17| 14.26| 13.15|     13.33|14725200| AAL|
|2013-02-21|   13.62| 13.95|  12.9|     13.37|11922100| AAL|
|2013-02-22|   13.57|  13.6| 13.21|     13.57| 6071400| AAL|
|2013-02-25|    13.6| 13.76|  13.0|     13.02| 7186400| AAL|
|2013-02-26|   13.14| 13.42|  12.7|     13.26| 9419000| AAL|
|2013-02-27|   13.28| 13.62| 13.18|     13.41| 7390500| AAL|
|2013-02-28|   13.49| 13

In [79]:
#Alterando o tipo das colunas
from pyspark.sql.types import StringType, DateType, FloatType
from pyspark.sql.types import BooleanType, IntegerType


In [80]:
df = df \
  .withColumn("data" ,
              df["data"]
              .cast(DateType()))   \
  .withColumn("abertura",
              df["abertura"]
              .cast(FloatType()))    \
   .withColumn("maxima",df["maxima"]
              .cast(FloatType()))    \
              .withColumn("minima", df["minima"].cast(FloatType()))    \
              .withColumn("fechamento",
              df["fechamento"]
              .cast(FloatType()))    \
              .withColumn("volume",
              df["volume"]
              .cast(IntegerType()))    \
              .withColumn("nome",
              df["nome"]
              .cast(BooleanType()))    \


In [81]:
#Visualizando os tipos de dados de cada coluna após a alteração.
df.printSchema()

root
 |-- data: date (nullable = true)
 |-- abertura: float (nullable = true)
 |-- maxima: float (nullable = true)
 |-- minima: float (nullable = true)
 |-- fechamento: float (nullable = true)
 |-- volume: integer (nullable = true)
 |-- nome: boolean (nullable = true)



Limitaremos a visualização de apenas 5 observações.

In [82]:
df.limit(5).toPandas()

Unnamed: 0,data,abertura,maxima,minima,fechamento,volume,nome
0,2013-02-08,15.07,15.12,14.63,14.75,8407500,
1,2013-02-11,14.89,15.01,14.26,14.46,8882000,
2,2013-02-12,14.45,14.51,14.1,14.27,8126000,
3,2013-02-13,14.3,14.94,14.25,14.66,10259500,
4,2013-02-14,14.94,14.96,13.16,13.99,31879900,


Iremos alterar a posição das colunas com o comando **.select()**.

In [83]:
df = df.select('nome', 'data', 'abertura', 'maxima', 'minima', 'fechamento', 'volume')
df.show()

+----+----------+--------+------+------+----------+--------+
|nome|      data|abertura|maxima|minima|fechamento|  volume|
+----+----------+--------+------+------+----------+--------+
|null|2013-02-08|   15.07| 15.12| 14.63|     14.75| 8407500|
|null|2013-02-11|   14.89| 15.01| 14.26|     14.46| 8882000|
|null|2013-02-12|   14.45| 14.51|  14.1|     14.27| 8126000|
|null|2013-02-13|    14.3| 14.94| 14.25|     14.66|10259500|
|null|2013-02-14|   14.94| 14.96| 13.16|     13.99|31879900|
|null|2013-02-15|   13.93| 14.61| 13.93|      14.5|15628000|
|null|2013-02-19|   14.33| 14.56| 14.08|     14.26|11354400|
|null|2013-02-20|   14.17| 14.26| 13.15|     13.33|14725200|
|null|2013-02-21|   13.62| 13.95|  12.9|     13.37|11922100|
|null|2013-02-22|   13.57|  13.6| 13.21|     13.57| 6071400|
|null|2013-02-25|    13.6| 13.76|  13.0|     13.02| 7186400|
|null|2013-02-26|   13.14| 13.42|  12.7|     13.26| 9419000|
|null|2013-02-27|   13.28| 13.62| 13.18|     13.41| 7390500|
|null|2013-02-28|   13.4

Quantos registros há na planilha?

Para respondermos essa questão podemos utilizar os comandos **.describe()** ou **.count()**.

In [84]:
df.describe().show()


+-------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|         abertura|           maxima|           minima|       fechamento|           volume|
+-------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|           619029|           619032|           619032|           619040|           619040|
|   mean| 83.0233343160683|83.77831069095873| 82.2560964206955|83.04376277067759|4321823.395568945|
| stddev|97.37876902994715| 98.2075188299977|96.50742113042669|97.38974800596789|8693609.511967659|
|    min|             1.62|             1.69|              1.5|             1.59|                0|
|    max|           2044.0|          2067.99|          2035.11|           2049.0|        618237630|
+-------+-----------------+-----------------+-----------------+-----------------+-----------------+



In [85]:
df.count()

619040

Quantos registros há na planilha para a ação da Apple (AAPL)?
<br> 


Para selecionarmos dados específicos de uma coluna específica utilizaremos as funções em conjunto: **select, where, count**. <br>
Então selecionaremos a coluna **nome**, onde exista nomes **igual a AAPL** *(ação da APPLE)* e **contaremos** quantas são.

In [86]:
df.select('nome').where(df.nome=="AAPL").count()

0

Quantas empresas distintas têm registros nessa planilha?

Vamos importar a biblioteca countDistinct do pyspark.sql.functions.


In [67]:
#importando biblioteca para contagem de valores distintos 
from pyspark.sql.functions import countDistinct

Criaremos uma variável para inserir os valores distintos da coluna **nome** e visualizaremos com o comando **.show()**




In [68]:
empresasdistintas = df.select(countDistinct("nome"))
empresasdistintas.show()

+--------------------+
|count(DISTINCT nome)|
+--------------------+
|                   2|
+--------------------+



Com qual frequência o preço de uma ação no fechamento é maior do que o preço na abertura?

Criaremos uma coluna (**fech_abert**) para verificarmos em quais observações o preço de fechamento é maior do que o preço da abertura.


In [19]:
df = df.withColumn("fech_abert", col('fechamento') >col('abertura'))
df.limit(5).toPandas()

Unnamed: 0,nome,data,abertura,maxima,minima,fechamento,volume,fech_abert
0,,2013-02-08,15.07,15.12,14.63,14.75,8407500,False
1,,2013-02-11,14.89,15.01,14.26,14.46,8882000,False
2,,2013-02-12,14.45,14.51,14.1,14.27,8126000,False
3,,2013-02-13,14.3,14.94,14.25,14.66,10259500,True
4,,2013-02-14,14.94,14.96,13.16,13.99,31879900,False


Vamos usar a função count para contarmos quantas vezes o fechamento foi maior que a abertura.

In [20]:
df.select('fech_abert').where(df.fech_abert=="True").count()


318970

Então, dividiremos o resultado pelo total dos eventos e arredondaremos o resultado.

In [21]:
porcentagem =  (318940/619040)*100
round(porcentagem, 2)

51.52

Qual o maior valor das ações da Apple (AAPL) na história? <br>
Vamos criar uma variável com o nome **apple** e inseriremos dentro dela apenas a valores encontrados em **AAPL**. Faremos um agrupamento para localizarmos apenas a cotação máxima da ação desta empresa.

In [22]:
#Filtrando pelo valor que desejamos encontrar.
apple = df[df['nome']=='AAPL']

In [23]:
#Agrupando por nome e procurando a maxima do valor que desejamos encontrar.
apple.groupby('nome').agg({'maxima': 'max'}).show()

+----+-----------+
|nome|max(maxima)|
+----+-----------+
+----+-----------+



Qual ação tem a maior volatilidade? Uma forma é medir o desvio-padrão do preço de fechamento de cada ação e considerar a ação de maior desvio-padrão.

In [24]:
#importando bibliotecas necessárias. 
from pyspark.sql.functions import stddev, sum, col, desc, max, min


Utilizaremos um conjunto de funções combinadas para encontrarmos a ação com maior desvio padrão.<br>
**groupBy** para agrupar pelo nome da empresa. <br>
**agg(stddev().alias()** para salvar o resultado do desvio padrão da coluna fechamento na própria coluna fechamento. <br>
**sort(desc)** para ordenar por ordem decrescente. <br>
**show()** para visualizarmos os resultados.


In [25]:
df.groupBy("nome") \
  .agg(stddev("fechamento").alias("fechamento")) \
  .sort(desc("fechamento")) \
  .show()

+-----+------------------+
| nome|        fechamento|
+-----+------------------+
| null|  97.5154988184303|
| true|  2.79386052180569|
|false|1.9572693543127926|
+-----+------------------+



Qual o dia com maior volume de negociação da bolsa?

In [26]:
df.groupBy("data") \
  .agg(sum("volume").alias("volume")) \
  .sort(desc("volume")) \
  .show()

+----------+----------+
|      data|    volume|
+----------+----------+
|2015-08-24|4607945196|
|2016-06-24|4367393052|
|2015-12-18|4124454411|
|2016-01-20|4087629753|
|2018-02-06|4072080890|
|2016-11-10|4060601612|
|2014-10-15|3993171524|
|2013-06-21|3983923288|
|2015-09-18|3962050449|
|2016-11-09|3915089371|
|2016-01-15|3787884056|
|2017-12-15|3786992731|
|2016-02-08|3759709109|
|2016-03-18|3755746256|
|2016-01-29|3740170664|
|2014-03-21|3611991561|
|2018-02-05|3598437288|
|2013-03-15|3569550899|
|2016-12-16|3564871264|
|2016-02-11|3547898496|
+----------+----------+
only showing top 20 rows



Qual a ação mais negociada da bolsa, em volume de transações? 


In [27]:
df.groupBy("nome") \
  .agg(sum("volume").alias("volume")) \
  .sort(desc("volume")) \
  .show()

+-----+-------------+
| nome|       volume|
+-----+-------------+
| null|2600142826728|
|false|  43388129992|
| true|  31850598073|
+-----+-------------+



Quantas ações começam com a letra “A”?


In [28]:
from pyspark.sql.functions import col, lit
from functools import reduce

In [32]:
df.filter(df.nome.startswith("A")).count()

0

In [30]:
df.filter(df.nome.startswith("A")).show()

+----+----+--------+------+------+----------+------+----------+
|nome|data|abertura|maxima|minima|fechamento|volume|fech_abert|
+----+----+--------+------+------+----------+------+----------+
+----+----+--------+------+------+----------+------+----------+



In [31]:
a3 = df.filter("nome").startsWith('A') \


AttributeError: ignored

In [None]:
df.select('volume').describe().show()

In [None]:
df.groupBy("nome", "data") \
  .agg(max("volume").alias("volume")) \
  .sort(desc("volume")) \
  .show()

In [None]:
apple.groupby('nome').agg({'maxima': 'max'}).show()

Com qual frequência o preço mais alto do dia da ação também é o preço de fechamento?


Em qual dia a ação da Apple mais subiu até o fechamento, de forma absoluta?


Em média, qual o volume diário de transações das ações da AAPL?


Quantas ações tem 1, 2, 3, 4 e 5 caracteres em seu nome, respectivamente?


Qual a ação menos negociada da bolsa, em volume de transações?


Com qual frequência o preço de fechamento é também o mais alto do dia?


In [None]:
import pyspark.sql.functions