# NASDAQ Stocks data analysis

In [38]:
from pyspark import pandas as ps
import pandas as pd
from pyspark.sql import SparkSession
import plotly.express as px

In [3]:
# Setup spark session
spark = SparkSession.builder.appName("pyspark-exp").getOrCreate()

23/03/06 11:17:00 WARN Utils: Your hostname, seba-G7-7700 resolves to a loopback address: 127.0.1.1; using 192.168.31.22 instead (on interface wlp0s20f3)
23/03/06 11:17:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/06 11:17:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

In [5]:
spark.version

'3.3.2'

In [6]:
# NASDAQ Analysis

## Read dataset using spark
nasdaq_df = spark.read.csv("data/nasdaq_data.csv", inferSchema=True, header=True)

                                                                                

In [7]:
# Show dataframe 
nasdaq_df.show(n=10)

+------+-------------------+--------+
|symbol|               date|adjusted|
+------+-------------------+--------+
|  AACG|2011-01-03 00:00:00|0.297095|
|  AACG|2011-01-04 00:00:00|0.300307|
|  AACG|2011-01-05 00:00:00|0.297095|
|  AACG|2011-01-06 00:00:00|0.308336|
|  AACG|2011-01-07 00:00:00|0.309139|
|  AACG|2011-01-10 00:00:00|0.307533|
|  AACG|2011-01-11 00:00:00|0.305125|
|  AACG|2011-01-12 00:00:00|0.309942|
|  AACG|2011-01-13 00:00:00| 0.30111|
|  AACG|2011-01-14 00:00:00|0.319578|
+------+-------------------+--------+
only showing top 10 rows



In [8]:
# Check schema
nasdaq_df.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- adjusted: string (nullable = true)



In [9]:
# Dataframe columns
nasdaq_df.columns

['symbol', 'date', 'adjusted']

In [10]:
# Select columns
nasdaq_df.select("symbol").show(n=5)

+------+
|symbol|
+------+
|  AACG|
|  AACG|
|  AACG|
|  AACG|
|  AACG|
+------+
only showing top 5 rows



In [11]:
nasdaq_df.select(["symbol", "adjusted"]).show(n=5)

+------+--------+
|symbol|adjusted|
+------+--------+
|  AACG|0.297095|
|  AACG|0.300307|
|  AACG|0.297095|
|  AACG|0.308336|
|  AACG|0.309139|
+------+--------+
only showing top 5 rows



In [12]:
nasdaq_df.dtypes

[('symbol', 'string'), ('date', 'timestamp'), ('adjusted', 'string')]

In [13]:
nasdaq_df = nasdaq_df.withColumn("adjusted", nasdaq_df["adjusted"].cast("float"))
nasdaq_df.show(n=5)

+------+-------------------+--------+
|symbol|               date|adjusted|
+------+-------------------+--------+
|  AACG|2011-01-03 00:00:00|0.297095|
|  AACG|2011-01-04 00:00:00|0.300307|
|  AACG|2011-01-05 00:00:00|0.297095|
|  AACG|2011-01-06 00:00:00|0.308336|
|  AACG|2011-01-07 00:00:00|0.309139|
+------+-------------------+--------+
only showing top 5 rows



In [14]:
nasdaq_df.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- adjusted: float (nullable = true)



In [15]:
# Rename column
nasdaq_df.withColumnRenamed("date", "timestamp").show(n=5)

+------+-------------------+--------+
|symbol|          timestamp|adjusted|
+------+-------------------+--------+
|  AACG|2011-01-03 00:00:00|0.297095|
|  AACG|2011-01-04 00:00:00|0.300307|
|  AACG|2011-01-05 00:00:00|0.297095|
|  AACG|2011-01-06 00:00:00|0.308336|
|  AACG|2011-01-07 00:00:00|0.309139|
+------+-------------------+--------+
only showing top 5 rows



In [16]:
# Drop column
nasdaq_df.drop("symbol").show(n=5)

+-------------------+--------+
|               date|adjusted|
+-------------------+--------+
|2011-01-03 00:00:00|0.297095|
|2011-01-04 00:00:00|0.300307|
|2011-01-05 00:00:00|0.297095|
|2011-01-06 00:00:00|0.308336|
|2011-01-07 00:00:00|0.309139|
+-------------------+--------+
only showing top 5 rows



In [18]:
# Filter
nasdaq_df.filter("adjusted > 0.3").show(n=5)

+------+-------------------+--------+
|symbol|               date|adjusted|
+------+-------------------+--------+
|  AACG|2011-01-04 00:00:00|0.300307|
|  AACG|2011-01-06 00:00:00|0.308336|
|  AACG|2011-01-07 00:00:00|0.309139|
|  AACG|2011-01-10 00:00:00|0.307533|
|  AACG|2011-01-11 00:00:00|0.305125|
+------+-------------------+--------+
only showing top 5 rows



In [23]:
# Groupby
nasdaq_df.groupBy("symbol").sum().show(n=5)



+------+------------------+
|symbol|     sum(adjusted)|
+------+------------------+
|  ABMD|371409.49010181427|
|   APM|  6241.03800201416|
|  ARRW| 908.8679885864258|
|  ARAY|14744.059997081757|
|  AMTX| 5515.863501340151|
+------+------------------+
only showing top 5 rows



                                                                                

In [22]:
nasdaq_df.groupBy("symbol").mean().show(n=5)

[Stage 15:====>                                                   (1 + 11) / 12]

+------+------------------+
|symbol|     avg(adjusted)|
+------+------------------+
|  ABMD|137.15269206123128|
|   APM| 9.111004382502424|
|  ARRW| 9.668808389217295|
|  ARAY| 5.485141367961963|
|  AMTX|2.0368772161521975|
+------+------------------+
only showing top 5 rows



                                                                                

## From pyspark to pandas

In [4]:
# new spark session

spark = SparkSession.builder \
    .master("local[12]") \
    .appName("pandas_pyspark") \
    .config("spark.driver.bindAddress","localhost") \
    .config("spark.ui.port", "4050") \
    .config("spark.driver.memory", "16g") \
    .config("spark.memory.fraction", "0.9") \
    .getOrCreate()

nasdaq_df2 = ps.read_csv("data/nasdaq_data.csv")
nasdaq_df2.head()

  series = series.astype(t, copy=False)


Unnamed: 0,symbol,date,adjusted
0,AACG,2011-01-03,0.297095
1,AACG,2011-01-04,0.300307
2,AACG,2011-01-05,0.297095
3,AACG,2011-01-06,0.308336
4,AACG,2011-01-07,0.309139


In [5]:
nasdaq_df2.shape

(5795746, 3)

In [7]:
nasdaq_df2['symbol'].value_counts().shape

                                                                                

(3980,)

In [9]:
nasdaq_df2.head().info()

  fields = [
  for column, series in pdf.iteritems():
                                                                                

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 5 entries, 0 to 4
Data columns (total 3 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   symbol    5 non-null      object        
 1   date      5 non-null      datetime64[ns]
 2   adjusted  5 non-null      object        
dtypes: datetime64[ns](1), object(2)

In [11]:
# Change data types
nasdaq_df2['symbol'] = nasdaq_df2['symbol'].astype("str")
nasdaq_df2['date'] = nasdaq_df2['date'].astype("datetime64")
nasdaq_df2['adjusted'] = nasdaq_df2['adjusted'].astype("float64")

In [13]:
nasdaq_df2.head().info()

  fields = [
  for column, series in pdf.iteritems():
                                                                                

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 5 entries, 0 to 4
Data columns (total 3 columns):
 #   Column    Non-Null Count  Dtype     
---  ------    --------------  -----     
 0   symbol    5 non-null      <U0       
 1   date      5 non-null      datetime64
 2   adjusted  5 non-null      float64   
dtypes: datetime64(1), float64(1), str(1)

In [14]:
# What is spark doing?
nasdaq_df2.spark.explain()

== Physical Plan ==
*(1) Project [__index_level_0__#117L, CASE WHEN isnull(symbol#101) THEN None ELSE symbol#101 END AS symbol#319, date#102, cast(adjusted#103 as double) AS adjusted#325]
+- AttachDistributedSequence[__index_level_0__#117L, symbol#101, date#102, adjusted#103] Index: __index_level_0__#117L
   +- FileScan csv [symbol#101,date#102,adjusted#103] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/seba/Documentos/pyspark-nasdaq-exp/data/nasdaq_data.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<symbol:string,date:timestamp,adjusted:string>




In [15]:
# CHECKPOINTING ----
# - Can help to speed up spark by Caching operations
nasdaq_df2 = nasdaq_df2.spark.local_checkpoint()

                                                                                

In [20]:
# LAG OPERATION ----

nasdaq_shifted_df = nasdaq_df2 \
    .drop('date', axis = 1) \
    .groupby("symbol") \
    .shift(1) \
    .rename(columns={'adjusted':'lag1'}) \
    .sort_index()

In [22]:
# Create another local checkpoint to speed up 
nasdaq_shifted_df = nasdaq_shifted_df.spark.local_checkpoint()

                                                                                

In [23]:
# COMBINE LAG WITH ADJUSTED PRICES ----
# - IF YOU GET ERROR: NEED TO SET OPTION TO COMPUTE ON MULTIPLE DATA FRAMES

ps.set_option('compute.ops_on_diff_frames', True)

nasdaq_lag_df = nasdaq_df2.copy()
nasdaq_lag_df['lag1'] = nasdaq_shifted_df[['lag1']]

nasdaq_lag_df.head()

  series = series.astype(t, copy=False)


Unnamed: 0,symbol,date,adjusted,lag1
0,AACG,2011-01-03,0.297095,
1990193,FORD,2021-09-29,2.35,2.52
2985461,LFVN,2013-04-30,15.96,15.89
2985462,LFVN,2013-05-01,15.96,15.96
3981791,PCTI,2015-08-19,4.915337,4.947516


In [24]:
nasdaq_lag_df = nasdaq_lag_df.spark.local_checkpoint()

                                                                                

In [25]:
# SUMMARIZE RETURNS

nasdaq_agg_df = nasdaq_lag_df \
    .assign(returns = lambda x: (x['adjusted'] / x['lag1']) - 1) \
    .groupby('symbol') \
    .aggregate(
        {
            'returns': ['mean', 'std', 'count'],
            'date': ['max', 'min']
        }
    ) \
    .reset_index()

In [26]:
nasdaq_agg_df.head()

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


Unnamed: 0_level_0,symbol,returns,returns,returns,date,date
Unnamed: 0_level_1,Unnamed: 1_level_1,mean,std,count,max,min
0,BAND,0.002173,0.029852,959,2021-10-05,2017-11-09
1,BMBL,-8.5e-05,0.04195,143,2021-10-05,2021-02-11
2,BPTS,-0.002923,0.052964,142,2021-10-05,2021-02-10
3,BROG,0.000341,0.027809,791,2021-10-05,2018-07-13
4,CBIO,-0.000535,0.064393,2685,2021-10-05,2011-01-03


In [27]:
nasdaq_agg_df.columns = ["_".join(a) for a in nasdaq_agg_df.columns.to_flat_index()]

In [28]:
nasdaq_agg_df.head()

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


Unnamed: 0,symbol_,returns_mean,returns_std,returns_count,date_max,date_min
0,BAND,0.002173,0.029852,959,2021-10-05,2017-11-09
1,BMBL,-8.5e-05,0.04195,143,2021-10-05,2021-02-11
2,BPTS,-0.002923,0.052964,142,2021-10-05,2021-02-10
3,BROG,0.000341,0.027809,791,2021-10-05,2018-07-13
4,CBIO,-0.000535,0.064393,2685,2021-10-05,2011-01-03


In [29]:
nasdaq_agg_df = nasdaq_agg_df.spark.local_checkpoint()

In [30]:
# JOIN NASDAQ INDEX META DATA ----

nasdaq_index_df = ps.read_csv('data/nasdaq_index.csv')

nasdaq_index_df.head()



Unnamed: 0,symbol,company,last.sale.price,market.cap,country,ipo.year,industry
0,AACG,ATA Creativity Global American Depositary Shares,2.2578,71574942,China,,Service to the Health Industry
1,AACIU,Armada Acquisition Corp. I Unit,9.965,0,United States,2021.0,Business Services
2,AADI,Aadi Bioscience Inc. Common Stock,27.59,575785118,United States,,Biotechnology: Pharmaceutical Preparations
3,AAL,American Airlines Group Inc. Common Stock,20.5676,13316653685,United States,,Air Freight/Delivery Services
4,AAME,Atlantic American Corporation Common Stock,4.44,90623788,United States,,Life Insurance


In [31]:
nasdaq_index_df = nasdaq_index_df[['symbol', 'company', 'market.cap']]

nasdaq_index_df.head().info()

  fields = [
  for column, series in pdf.iteritems():


<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 5 entries, 0 to 4
Data columns (total 3 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   symbol      5 non-null      object
 1   company     5 non-null      object
 2   market.cap  5 non-null      object
dtypes: object(3)

In [32]:
nasdaq_index_df['market.cap'] = nasdaq_index_df['market.cap'].astype('float64')

nasdaq_index_df['symbol'] = nasdaq_index_df['symbol'].astype('str')

nasdaq_index_df['company'] = nasdaq_index_df['company'].astype('str')

nasdaq_index_df.head().info()

  fields = [
  for column, series in pdf.iteritems():


<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 5 entries, 0 to 4
Data columns (total 3 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   symbol      5 non-null      <U0    
 1   company     5 non-null      <U0    
 2   market.cap  5 non-null      float64
dtypes: float64(1), str(2)

In [33]:
nasdaq_agg_join_df = nasdaq_agg_df \
    .rename({'symbol_':'symbol'}, axis = 1) \
    .set_index('symbol') \
    .join(nasdaq_index_df.set_index('symbol'), how = "left", lsuffix = "_l", rsuffix = "_r") \
    .reset_index()

nasdaq_agg_join_df.head()

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


Unnamed: 0,symbol,returns_mean,returns_std,returns_count,date_max,date_min,company,market.cap
0,BAND,0.002173,0.029852,959,2021-10-05,2017-11-09,Bandwidth Inc. Class A Common Stock,2055042000.0
1,BMBL,-8.5e-05,0.04195,143,2021-10-05,2021-02-11,Bumble Inc. Class A Common Stock,6354404000.0
2,BPTS,-0.002923,0.052964,142,2021-10-05,2021-02-10,Biophytis SA American Depositary Share,105207800.0
3,BROG,0.000341,0.027809,791,2021-10-05,2018-07-13,Brooge Energy Limited Ordinary Shares,913546300.0
4,CBIO,-0.000535,0.064393,2685,2021-10-05,2011-01-03,Catalyst Biosciences Inc. Common Stock,114509400.0


In [34]:
type(nasdaq_agg_join_df)

pyspark.pandas.frame.DataFrame

In [35]:
nasdaq_screened_df = nasdaq_agg_join_df \
    .rename({"market.cap":"market_cap"}, axis=1) \
    .query("returns_std < 0.10") \
    .query("returns_count > 3*365") \
    .query("market_cap > 1e9") \
    .assign(reward_metric = lambda x: x['returns_mean'] / x['returns_std'] * 2500) \
    .to_pandas() 

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


In [36]:
nasdaq_screened_df

Unnamed: 0,symbol,returns_mean,returns_std,returns_count,date_max,date_min,company,market_cap,reward_metric
18,ABMD,0.001734,0.029123,2707,2021-10-05,2011-01-03,ABIOMED Inc. Common Stock,1.518377e+10,148.830258
29,IOSP,0.000851,0.021225,2685,2021-10-05,2011-01-03,Innospec Inc. Common Stock,2.074109e+09,100.193368
34,OPRX,0.002726,0.055620,2707,2021-10-05,2011-01-03,OptimizeRx Corporation Common Stock,1.437326e+09,122.536623
37,RGLD,0.000617,0.024319,2685,2021-10-05,2011-01-03,Royal Gold Inc. Common Stock,6.167069e+09,63.442816
38,SABR,0.000432,0.033235,1860,2021-10-05,2014-04-17,Sabre Corporation Common Stock,3.755731e+09,32.475457
...,...,...,...,...,...,...,...,...,...
3925,PRGS,0.000410,0.019933,2685,2021-10-05,2011-01-03,Progress Software Corporation Common Stock (DE),2.147921e+09,51.386023
3927,SAIA,0.001486,0.025431,2687,2021-10-05,2011-01-03,Saia Inc. Common Stock,6.476327e+09,146.092258
3931,ZBRA,0.001272,0.022351,2685,2021-10-05,2011-01-03,Zebra Technologies Corporation Class A Common ...,2.700615e+10,142.307717
3948,SBNY,0.000868,0.021434,2685,2021-10-05,2011-01-03,Signature Bank Common Stock,1.739683e+10,101.210999


In [37]:
type(nasdaq_screened_df)

pandas.core.frame.DataFrame

In [39]:
nasdaq_screened_df \
    .pipe(
        func           = px.scatter,
        x              = 'returns_std',
        y              = 'returns_mean',
        color          = 'reward_metric',
        hover_data     = ['company', 'symbol', 'market_cap'],
        render_mode    = 'svg',
        template       = 'plotly_dark'
    )
