In [1]:
from pyspark.sql import SparkSession, functions as f
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType

In [2]:
spark = SparkSession.builder.master('local[*]').appName('HTF_Vol').getOrCreate()
spark.conf.set("spark.executor.memory", "2g")

df = spark.read.csv('new2MM.csv', header=True)
df = df.withColumn('TIME', f.date_format(df.TIME, 'HH:mm:ss'))
df = df.withColumn('PRICE', df['PRICE'].cast(DoubleType()))
df = df.withColumn('SIZE', df['SIZE'].cast(DoubleType()))

In [3]:
# patition a window to calculcate the u_sequence
w = Window().partitionBy('SYMBOL', 'DATE').orderBy('TIME')

# calculate the u_sequence
df = df.withColumn('PRICE_1', f.lead('PRICE').over(w))
df = df.withColumn('LOG_RETURN', f.log(df.PRICE_1/df.PRICE))


# patition a window to calculate the moving standard deviation
window_period = 100
w2 = Window().partitionBy('SYMBOL', 'DATE').orderBy('TIME').rowsBetween(0, window_period)
df = df.withColumn('VOLATILITY', f.stddev(df['LOG_RETURN']).over(w2))

In [4]:
#df.show()

In [5]:
seconds = 300
timeFormat = 'HH:mm:ss'
seconds_window = f.from_unixtime(f.unix_timestamp('TIME', format='HH:mm:ss') - 
                  f.unix_timestamp('TIME', format='HH:mm:ss') % seconds, format='HH:mm:ss')

In [6]:
df = df.withColumn('5_min_window', seconds_window)

#df_AA = df.where(df['SYMBOL'] == 'A').select('*')

In [7]:
new_df = df.groupby('SYMBOL', 'DATE', '5_min_window').agg(
    f.avg('VOLATILITY').alias('avgVol'),
    f.avg('PRICE').alias('avgPrice'),
    f.avg('SIZE').alias('avgSize'))

In [8]:
#new_df.show()

In [9]:
w3 = Window().partitionBy('SYMBOL', 'DATE').orderBy('5_min_window')
w4 = Window().partitionBy('SYMBOL').orderBy('DATE', '5_min_window')

# calculate the u_sequence
new_df = new_df.withColumn('PRICE_1', f.lead('avgPrice').over(w4))
new_df = new_df.withColumn('U_SEQ', f.log(new_df.PRICE_1/new_df.avgPrice))
new_df = new_df.withColumn('Num', f.lit(1))
new_df = new_df.withColumn('totalSum', f.sum('Num').over(w4))
new_df = new_df.withColumn('sumSqaures', f.sum(f.pow(new_df.U_SEQ, 2)).over(w4)/(new_df.totalSum - f.lit(1)))
new_df = new_df.withColumn('suqareSums', f.pow(f.sum(new_df.U_SEQ).over(w4), 2)/((new_df.totalSum - f.lit(1)) 
                                                                                * new_df.totalSum))
new_df = new_df.withColumn('sectionVol', f.sqrt(new_df.sumSqaures - new_df.suqareSums))




#section_volatility = f.sqrt(f.sum(f.pow(new_df.U_SEQ, 2)).over(w3)/(new_df.totalSum - f.lit(1)) 
#                            - f.pow(f.sum(new_df.U_SEQ).over(w3), 2)/((new_df.totalSum - f.lit(1) * new_df.totalSum)))


#new_df = new_df.withColumn('sectionVol', section_volatility)
#df = df.withColumn('cumSum', f.sum('PRICE').over(w))

#df = df.withColumn('count', f.lit(1))
#df = df.withColumn('totalNum', f.sum('count').over(w))

In [10]:
data_pd = new_df\
    .select('SYMBOL', 'DATE', '5_min_window', 'avgVol', 'avgPrice', 'avgSize','PRICE_1', 'U_SEQ', 'totalSum',
            'sectionVol')\
    .toPandas()

In [11]:
data_pd.head()

Unnamed: 0,SYMBOL,DATE,5_min_window,avgVol,avgPrice,avgSize,PRICE_1,U_SEQ,totalSum,sectionVol
0,ABK,20021202,09:30:00,0.000426,63.15125,1150.0,63.147692,-5.6e-05,1,
1,ABK,20021202,09:35:00,0.000365,63.147692,584.615385,62.923636,-0.003554,2,0.002474
2,ABK,20021202,09:40:00,0.000339,62.923636,1150.0,62.854615,-0.001098,3,0.001796
3,ABK,20021202,09:45:00,0.000349,62.854615,369.230769,62.668462,-0.002966,4,0.001624
4,ABK,20021202,09:50:00,0.000343,62.668462,430.769231,62.6076,-0.000972,5,0.001469


In [17]:
sectionVolatility = {}
for symbol in set(data_pd.SYMBOL):
    sectionVolatility[symbol] = data_pd.loc[data_pd.SYMBOL == symbol, :].reset_index()

In [21]:
%matplotlib
from matplotlib import pyplot as plt
count = 1
fig, axes = plt.subplots(nrows=2, ncols=1, figsize=(15,10))


for symbol in sectionVolatility:
    plt.subplot(2, 1, 1)
    plt.grid()
    plt.plot(sectionVolatility[symbol].loc[:,  'sectionVol'], label=symbol)
    plt.legend(loc='upper right')
    plt.ylabel('Section Volatility')
    plt.subplot(2, 1, 2)
    plt.plot(sectionVolatility[symbol].loc[:,  'avgPrice'], label=symbol)
    plt.grid()
    plt.ylabel('Stock Price')
    #sectionVolatility[symbol].to_csv(symbol + '_SectionVol_0614.csv')
    count +=1
    if count > 5:
        break

plt.suptitle('Section Volatility')
#plt.ylabel('Section Volatility')
plt.savefig('Section_Vol0614.png')

Using matplotlib backend: MacOSX


In [22]:
data_pd.to_csv('section_volatility.csv')