In [2]:
import findspark
findspark.init()
findspark.find()

'C:\\Spark\\spark-3.1.2-bin-hadoop2.7'

In [3]:
import pyspark
from pyspark import SparkContext
sc = SparkContext()

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, DateType, StringType, IntegerType, DoubleType

In [5]:
schema = StructType() \
    .add('date', DateType(), True) \
    .add('open', DoubleType(), True) \
    .add('high', DoubleType(), True) \
    .add('low', DoubleType(), True) \
    .add('close', DoubleType(), True) \
    .add('volume', IntegerType(), True) \
    .add('Name', StringType(), True)

In [6]:
spark = SparkSession\
        .builder\
        .appName("All_Stocks")\
        .getOrCreate()



all_df = spark.read.format('csv').option('header', True).schema(schema).load('all_stocks_5yr.csv')
all_df.printSchema()
all_df.show()

root
 |-- date: date (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- Name: string (nullable = true)

+----------+-----+-----+-----+-----+--------+----+
|      date| open| high|  low|close|  volume|Name|
+----------+-----+-----+-----+-----+--------+----+
|2013-02-08|15.07|15.12|14.63|14.75| 8407500| AAL|
|2013-02-11|14.89|15.01|14.26|14.46| 8882000| AAL|
|2013-02-12|14.45|14.51| 14.1|14.27| 8126000| AAL|
|2013-02-13| 14.3|14.94|14.25|14.66|10259500| AAL|
|2013-02-14|14.94|14.96|13.16|13.99|31879900| AAL|
|2013-02-15|13.93|14.61|13.93| 14.5|15628000| AAL|
|2013-02-19|14.33|14.56|14.08|14.26|11354400| AAL|
|2013-02-20|14.17|14.26|13.15|13.33|14725200| AAL|
|2013-02-21|13.62|13.95| 12.9|13.37|11922100| AAL|
|2013-02-22|13.57| 13.6|13.21|13.57| 6071400| AAL|
|2013-02-25| 13.6|13.76| 13.0|13.02| 7186400| AAL|
|2013-02-26|13.14|13.42| 12

In [59]:
all_df.show(10)

+----------+-----+-----+-----+-----+--------+----+
|      date| open| high|  low|close|  volume|Name|
+----------+-----+-----+-----+-----+--------+----+
|2013-02-08|15.07|15.12|14.63|14.75| 8407500| AAL|
|2013-02-11|14.89|15.01|14.26|14.46| 8882000| AAL|
|2013-02-12|14.45|14.51| 14.1|14.27| 8126000| AAL|
|2013-02-13| 14.3|14.94|14.25|14.66|10259500| AAL|
|2013-02-14|14.94|14.96|13.16|13.99|31879900| AAL|
|2013-02-15|13.93|14.61|13.93| 14.5|15628000| AAL|
|2013-02-19|14.33|14.56|14.08|14.26|11354400| AAL|
|2013-02-20|14.17|14.26|13.15|13.33|14725200| AAL|
|2013-02-21|13.62|13.95| 12.9|13.37|11922100| AAL|
|2013-02-22|13.57| 13.6|13.21|13.57| 6071400| AAL|
+----------+-----+-----+-----+-----+--------+----+
only showing top 10 rows



## 1. Quantos registros há na planilha?

In [10]:
# Usando a função count para contar os registros
all_df.count()

619041

## 2.Quantos registros há na planilha para a ação da Apple (AAPL)?

In [68]:
# Função filter para filtrar as ações
all_df.filter(all_df.Name == 'AAPL').count()

1259

## 3.Quantas empresas distintas têm registros nessa planilha?

In [19]:
# Usando a função select e distinct
all_df.select('Name').distinct().count()

505

## 4.Com qual frequência o preço de uma ação no fechamento é maior do que o preço na abertura?

In [75]:
# Importando algumas funções
from pyspark.sql.functions import mean, min, max

# Usando o filter e count
all_df.filter(all_df.close > all_df.open).count() / all_df.count()


0.5152655724993538

## 5.Qual o maior valor das ações da Apple (AAPL) na história?

In [7]:
all_df.filter(all_df.Name == 'AAPL').agg({'high': 'max'}).first()['max(high)']

180.1

## 6.Qual ação tem a maior volatilidade? Uma forma é medir o desvio-padrão do preço de fechamento de cada ação e considerar a ação de maior desvio-padrão.

In [9]:
# Agrupamento pelo nome e fechamento com o desvião padrão
all_df_std = all_df.groupBy('Name').agg({'close': 'stddev'})
all_df_std.sort(all_df_std['stddev(close)'].desc()).first()

Row(Name='PCLN', stddev(close)=320.533473018748)

## 7. Qual o dia com maior volume de negociação da bolsa?

In [15]:
# Primeiro objeto com a some dos volumes e o segundo buscando a soma de ordem decrescente
all_df_max_volume = all_df.groupBy('date').sum('volume')

all_df_max_volume.sort(all_df_max_volume['sum(volume)'].desc()).first()

Row(date=datetime.date(2015, 8, 24), sum(volume)=4607945196)

## 8. Qual a ação mais negociada da bolsa, em volume de transações?

In [91]:
# Primeiro objeto com a soma dos volumes e segundo buscando a soma com o nome da ação
all_df_max_volume = all_df.groupBy('Name').sum('volume')

all_df_max_volume.sort(all_df_max_volume['sum(volume)'].desc()).first()

Row(Name='BAC', sum(volume)=117884953591)

## 9.Quantas ações começam com a letra “A”?

In [18]:
# Função select para selecionar a coluna de ações, distinct para retornar os nomes distintos, 
# filter para filtrar o nome das colunas, startswith para a letra e count para contar.

all_df.select('Name').distinct().filter(all_df.Name.startswith('A')).count()

59

## 10.Com qual frequência o preço mais alto do dia da ação também é o preço de fechamento?

In [30]:
# Filter para filtrar as colunas com as condições de igualdade. Dividindo pelo número de registros e multiplicando por 100 
# obter a porcentagem.
all_df.filter(all_df.high == all_df.close).count() / all_df.count() * 100

1.1986301369863013

## 11. Em qual dia a ação da Apple mais subiu até o fechamento, de forma absoluta?

In [36]:
all_df_aapl = all_df.filter(all_df.Name == 'AAPL').withColumn('grow', all_df_aapl.close - all_df_aapl.open)

all_df_aapl.sort(all_df_aapl.grow.desc()).first()


Row(date=datetime.date(2015, 8, 24), open=94.87, high=108.8, low=92.0, close=103.12, volume=162206292, Name='AAPL', grow=8.25)

## 12. Em média, qual o volume diário de transações das ações da AAPL?

In [46]:
# Filter para filtrar a ação, agregando pela média e tirando a média.
all_df.filter(all_df.Name == 'AAPL').agg({'volume': 'mean'}).first()['avg(volume)']

54047899.73550437

## 13. Quantas ações tem 1, 2, 3, 4 e 5 caracteres em seu nome, respectivamente?

In [47]:
from pyspark.sql.functions import col, length, array_contains

In [53]:
dataset_name_length = all_df.select('Name').distinct().withColumn('name_length', length(col('Name'))).groupBy('name_length').agg({'Name': 'count'})
dataset_five_lengths = dataset_name_length.sort(dataset_name_length.name_length.asc()).select('count(Name)').rdd.map(lambda row : row['count(Name)']).count()

print(f'There are {", ".join([str(value) for value in dataset_five_lengths])} stocks which name has 1, 2, 3, 4, 5 character length, respectivelly')

## 14. Qual a ação menos negociada da bolsa, em volume de transações?

In [56]:
all_df_min_volume = all_df.groupBy('Name').sum('volume')

all_df_min_volume.sort(all_df_min_volume['sum(volume)'].asc()).first()

Row(Name='APTV', sum(volume)=92947779)

## 15. Com qual frequência o preço de fechamento é também o mais alto do dia?

In [57]:
all_df.filter(all_df.close == all_df.high).count() / all_df.count() * 100

1.1986301369863013