# Stock Exchange Analysis using Spark

## Environment

In [1]:
! pip install spark > /dev/null

In [2]:
! pip install pyspark > /dev/null

### Spark connection

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.master('local').getOrCreate()
sc = spark.sparkContext

### Dataset preparation

In [5]:
from pyspark.sql.types import StructType, DateType, StringType, IntegerType, DoubleType

In [6]:
schema = StructType() \
    .add('date', DateType(), True) \
    .add('open', DoubleType(), True) \
    .add('high', DoubleType(), True) \
    .add('low', DoubleType(), True) \
    .add('close', DoubleType(), True) \
    .add('volume', IntegerType(), True) \
    .add('Name', StringType(), True)

In [9]:
df_stocks = spark.read.format('csv').option('header', True).schema(schema).load('all_stocks_5yr.csv')
df_stocks.printSchema()
df_stocks.show()

root
 |-- date: date (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- Name: string (nullable = true)

+----------+-----+-----+-----+-----+--------+----+
|      date| open| high|  low|close|  volume|Name|
+----------+-----+-----+-----+-----+--------+----+
|2013-02-08|15.07|15.12|14.63|14.75| 8407500| AAL|
|2013-02-11|14.89|15.01|14.26|14.46| 8882000| AAL|
|2013-02-12|14.45|14.51| 14.1|14.27| 8126000| AAL|
|2013-02-13| 14.3|14.94|14.25|14.66|10259500| AAL|
|2013-02-14|14.94|14.96|13.16|13.99|31879900| AAL|
|2013-02-15|13.93|14.61|13.93| 14.5|15628000| AAL|
|2013-02-19|14.33|14.56|14.08|14.26|11354400| AAL|
|2013-02-20|14.17|14.26|13.15|13.33|14725200| AAL|
|2013-02-21|13.62|13.95| 12.9|13.37|11922100| AAL|
|2013-02-22|13.57| 13.6|13.21|13.57| 6071400| AAL|
|2013-02-25| 13.6|13.76| 13.0|13.02| 7186400| AAL|
|2013-02-26|13.14|13.42| 12

## Quiz

### 1. How many rows there are in dataset?

In [10]:
dataset_rows = df_stocks.count()
print(f'There are {dataset_rows} rows in dataset')

There are 619040 rows in dataset


### 2. How many rows there are in dataset with AAPL stocks?

In [11]:
dataset_aapl_rows = df_stocks.filter(df_stocks.Name == 'AAPL').count()
print(f'There are {dataset_aapl_rows} rows in dataset with AAPL stocks')

There are 1259 rows in dataset with AAPL stocks


### 3. How many different companies there are in dataset?

In [12]:
dataset_companies_count = df_stocks.select('Name').distinct().count()
print(f'There are {dataset_companies_count} different companies in dataset')

There are 505 different companies in dataset


### 4. How often closing stock price is greater than opening?

In [13]:
dataset_close_greather_open = df_stocks.filter(df_stocks.close > df_stocks.open).count() / df_stocks.count()
print(f'Closing stock price is greather than opening in {round(dataset_close_greather_open * 100, 2)}% of times')

Closing stock price is greather than opening in 51.53% of times


### 5. What is the biggest AAPL's stock in history?

In [14]:
dataset_biggest_appl_stock = df_stocks.filter(df_stocks.Name == 'AAPL').agg({'high': 'max'}).first()['max(high)']
print(f'APPL\'s biggest stock in history is {dataset_biggest_appl_stock}')

APPL's biggest stock in history is 180.1


### 6. Considering closing price standard deviation, which stock has the bigger volatility?

In [15]:
dataset_stock_stddev = df_stocks.groupBy('Name').agg({'close': 'stddev'})
dataset_biggest_volatility = dataset_stock_stddev.sort(dataset_stock_stddev['stddev(close)'].desc()).first()
print(f'Stock with biggest volatility is {dataset_biggest_volatility["Name"]} with closing price standard deviation of {round(dataset_biggest_volatility["stddev(close)"], 2)}')

Stock with biggest volatility is PCLN with closing price standard deviation of 320.53


### 7. Which day has the biggest stock exchange's negotiation volume?

In [16]:
dataset_volume_by_day = df_stocks.groupBy('date').sum('volume')
dataset_biggest_volume = dataset_volume_by_day.sort(dataset_volume_by_day['sum(volume)'].desc()).first()
print(f'The day with biggest stock exchange\'s negotiation volume is {dataset_biggest_volume["date"]} with volume of {dataset_biggest_volume["sum(volume)"]}')

The day with biggest stock exchange's negotiation volume is 2015-08-24 with volume of 4607945196


### 8. Considering transaction volume, which is the biggest exchanged stock?

In [17]:
dataset_volume_by_stock = df_stocks.groupBy('Name').sum('volume')
dataset_biggest_volume = dataset_volume_by_stock.sort(dataset_volume_by_stock['sum(volume)'].desc()).first()
print(f'Considering transaction volume, the biggest stock exchanged is {dataset_biggest_volume["Name"]} with volume of {dataset_biggest_volume["sum(volume)"]}')

Considering transaction volume, the biggest stock exchanged is BAC with volume of 117884953591


### 9. How many stocks starts with letter "A"?

In [18]:
dataset_startswith_a = df_stocks.select('Name').distinct().filter(df_stocks.Name.startswith('A')).count()
print(f'There are {dataset_startswith_a} stocks starting with letter "A"')

There are 59 stocks starting with letter "A"


### 10. How often closing stock price is the biggest day price?

In [19]:
dataset_close_is_biggest = df_stocks.filter(df_stocks.close == df_stocks.high).count() / df_stocks.count()
print(f'Closing stock price is the biggest day\'s price {round(dataset_close_is_biggest * 100, 2)}% of times')

Closing stock price is the biggest day's price 1.2% of times


### 11. Which day Apple stocks had the most absolute growing until closing?

In [20]:
dataset_aapl_grow = df_stocks.filter(df_stocks.Name == 'AAPL').withColumn('grow', df_stocks.close - df_stocks.open)
dataset_highest_grow = dataset_aapl_grow.sort(dataset_aapl_grow.grow.desc()).first()
print(f'AAPL most absolute growing until close was {dataset_highest_grow["high"]} at {dataset_highest_grow["date"]}')

AAPL most absolute growing until close was 108.8 at 2015-08-24


### 12. Considering mean value, which is the daily's AAPL stocks transaction volume?

In [21]:
dataset_aapl_mean_volume = df_stocks.filter(df_stocks.Name == 'AAPL').agg({'volume': 'mean'}).first()['avg(volume)']
print(f'AAPL daily\'s stock transaction mean volume is {dataset_aapl_mean_volume}')

AAPL daily's stock transaction mean volume is 54047899.73550437


### 13. How many stocks' names has 1, 2, 3, 4 and 5 character length, respectivelly?

In [22]:
from pyspark.sql.functions import col, length, array_contains

In [23]:
dataset_name_length = df_stocks.select('Name').distinct().withColumn('name_length', length(col('Name'))).groupBy('name_length').agg({'Name': 'count'})
dataset_five_lengths = dataset_name_length.sort(dataset_name_length.name_length.asc()).select('count(Name)').rdd.map(lambda row : row['count(Name)']).collect()
print(f'There are {", ".join([str(value) for value in dataset_five_lengths])} stocks which name has 1, 2, 3, 4, 5 character length, respectivelly')

There are 10, 50, 323, 117, 5 stocks which name has 1, 2, 3, 4, 5 character length, respectivelly


### 14. Considering transaction volume, which is the less exchanged stock?

In [24]:
dataset_volume_by_stock = df_stocks.groupBy('Name').sum('volume')
dataset_lowest_volume = dataset_volume_by_stock.sort(dataset_volume_by_stock['sum(volume)'].asc()).first()
print(f'Considering transaction volume, the lowest stock exchanged is {dataset_lowest_volume["Name"]} with volume of {dataset_lowest_volume["sum(volume)"]}')

Considering transaction volume, the lowest stock exchanged is APTV with volume of 92947779


### 15. How often closing price is the biggest day price?

In [25]:
dataset_close_is_biggest = df_stocks.filter(df_stocks.close == df_stocks.high).count() / df_stocks.count()
print(f'Closing stock price is the biggest day\'s price {round(dataset_close_is_biggest * 100, 2)}% of times')

Closing stock price is the biggest day's price 1.2% of times
