<a href="https://colab.research.google.com/github/mmistroni/TensorFlowPlayground/blob/master/PySpark_Colab_Notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h3> Installing dependencies</h3>

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

<p> ToDO test few runs here, then  store a dataframe of all possible shares for last 2 years and run a correlation between all cols </b>

In [0]:
# Use this link to calculate correlation https://stackoverflow.com/questions/45112976/how-to-use-correlation-in-spark-with-dataframes

In [0]:
!pip install pandas-datareader



<h3>Getting IEXApi Token </h3>

In [0]:
def get_iexapi_keys():
  from google.colab import drive
  drive.mount('/content/gdrive')
  with open('gdrive/My Drive/passwords/iexapi.keys') as f:
    return f.readlines()[0]


In [5]:
import requests
import pandas_datareader.data as dr
import pandas as pd
from datetime import date

iexapi_token = get_iexapi_keys()


def get_historical_value(symbol):
  try: 
    data = dr.get_data_yahoo(symbol, date(2018,1,1), date(2019,9,19))[['Adj Close']]
    return data.rename(columns={'Adj Close' : symbol})
    
  except Exception as e :
    print('Exception for {}={}'.format(symbol, str(e)))
    return pd.DataFrame(columns=[symbol])
  
def get_all_symbols():
  all_symbols_data = requests.get('https://cloud.iexapis.com/stable/ref-data/iex/symbols?token={}'.format(iexapi_token)).json()
  good_ones = [d['symbol'] for d in all_symbols_data if d['isEnabled']]
  return filter(lambda data: bool(data), good_ones)

def get_all_etfs():
  nyse_symbols = requests.get('https://cloud.iexapis.com/stable/ref-data/exchange/nys/symbols?token={}'.format(iexapi_token)).json()
  nas_symbols = requests.get('https://cloud.iexapis.com/stable/ref-data/exchange/nas/symbols?token={}'.format(iexapi_token)).json()
  return [d['symbol'] for d in nyse_symbols + nas_symbols if d['type'].lower() == 'et']

def get_all_stocks_data():
  all_symbols_data =  get_all_etfs()
  return map(lambda symbol: get_historical_value(symbol), all_symbols_data)


Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


<h3> Comparing shares correlation with VIX via Pandas and Spark </h3>

In [0]:
vix = get_historical_value('^VIX')


In [6]:

all_stocks  = get_all_stocks_data()

res= [vals for vals in all_stocks if vals.shape[0] == vix.shape[0]]
res.append(vix)



Exception for BSML='Date'
Exception for BSMM='Date'
Exception for BSMN='Date'
Exception for BSMO='Date'
Exception for BSMP='Date'
Exception for BSMQ='Date'
Exception for BSMR='Date'
Exception for BSMS='Date'
Exception for BSMT='Date'
Exception for BUG='Date'
Exception for GXTG='Date'
Exception for HERO='Date'
Exception for POTX='Date'


In [0]:
# building pandas df
all_data = pd.concat(res, axis=1)
spark_df = spark.createDataFrame(all_data)



In [0]:
def calculate_correlation(df):
  from pyspark.ml.stat import Correlation
  from pyspark.ml.feature import VectorAssembler

  # convert to vector column first
  vector_col = "corr_features"
  assembler = VectorAssembler(inputCols=df.columns, outputCol=vector_col)
  df_vector = assembler.transform(df).select(vector_col)

  # get correlation matrix
  matrix = Correlation.corr(df_vector, vector_col)
  return matrix


In [0]:
columns = spark_df.columns
matrix = calculate_correlation(spark_df)
matrix.toPandas()


Unnamed: 0,pearson(corr_features)
0,"DenseMatrix([[ 1. , 0.43882321, 0.462..."


In [0]:
# convert to numpy
np_array = matrix.collect()[0]["pearson({})".format("corr_features")].toArray()
np_array.shape

vix_row = np_array[-1:].tolist()[0]


In [0]:
zipped = zip(columns, vix_row)

sorted_zip = sorted(zipped, key=lambda tpl:tpl[1], reverse=True)
from pprint import pprint
pprint(list(sorted_zip)[0:10])


[('^VIX', 1.0),
 ('VIIX', 0.6989994873731028),
 ('BIS', 0.6202776283699959),
 ('TVIX', 0.5295657060891832),
 ('ZBIO', 0.5289476059509456),
 ('SQQQ', 0.4079530575140182),
 ('FBZ', 0.12024673536202132),
 ('EWZS', 0.10604274352221688),
 ('ISHG', 0.06555273327793446),
 ('DSLV', 0.06500897256669277)]


In [0]:
df = spark.createDataFrame(data, ["features"])

r1 = Correlation.corr(df, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))

Pearson correlation matrix:
DenseMatrix([[1.        , 0.99963975, 0.99955352, ..., 0.8438303 , 0.84639914,
              0.84762431],
             [0.99963975, 1.        , 0.99974304, ..., 0.85282955, 0.8553397 ,
              0.85659553],
             [0.99955352, 0.99974304, 1.        , ..., 0.85123163, 0.85352329,
              0.85473836],
             ...,
             [0.8438303 , 0.85282955, 0.85123163, ..., 1.        , 0.9998867 ,
              0.99979739],
             [0.84639914, 0.8553397 , 0.85352329, ..., 0.9998867 , 1.        ,
              0.99993591],
             [0.84762431, 0.85659553, 0.85473836, ..., 0.99979739, 0.99993591,
              1.        ]])


<h3> Alternative approach via RDD </h3>

In [0]:
def get_historical_value_df(symbol):
  try: 
    data = dr.get_data_yahoo(symbol, date(2018,1,1), date(2019,9,19))[['Adj Close']]
    pandas_df = data.rename(columns={'Adj Close' : 'adj_close'})['adj_close']
    return spark.createDataFrame(pandas_df, FloatType())
    
  except Exception as e :
    print('Excepiton for {}:{}'.format(symbol, str(e)))
    return []

def calculate_correlation(df1, df2, field1, field2)
  firstRDD = df1.select("adj_close").map(lambda r: row.getDouble(0))
  val secondRDD: RDD[Double] = yourDF.select("field2").map(row => row.getDouble(0))
  val corr = Statistics.corr(firstRDD, secondRDD, "spearman")  
  

In [0]:
vix = get_historical_value_df('^VIX')

vals = [x for x in map(lambda r: r.asDict(), vix)]
from pprint import pprint
vals






TypeError: ignored