In [1]:
from pyspark import SparkContext, SQLContext

import pandas as pd
import matplotlib.pyplot as plt 
import pandas_market_calendars as mcal
import numpy as np
import datetime

sc =SparkContext()

from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

# sc.stop()

In [2]:
in_data = pd.read_csv("C:/Users/Rabe's/Desktop/FUTURES_DATA/1_MIN_DATA/YM.txt",sep=",", header = None, names = ['date', 'time', 'open', 'high', 'low', 'close', 'volume'])

In [3]:
def create_timestamp_intraday(df):
    
    df['timestamp'] = pd.to_datetime(df['date'] + ' ' +df['time'],format='%m/%d/%Y %H:%M')
    
    return df

In [4]:
data = create_timestamp_intraday(in_data)

## Load Data into Spark 

In [5]:
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.types import TimestampType

In [None]:
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(data)
sdf = sdf.withColumn('symbol', F.lit('YM')) 

In [None]:
sdf.columns

In [None]:
def add_vwap(df):
    """
    Volume Weighted Average Price cumulative volume*price / cumulative volume over each day
    """

    df = df.withColumn('average_price', (F.col('high') + F.col('low') + F.col('close'))/3)
    df = df.withColumn('dollars_traded', F.col('average_price')*F.col('volume'))

    wind = (Window.partitionBy(['symbol', 'date']).orderBy('time').rangeBetween(Window.unboundedPreceding, 0))

    df = df.withColumn('cumulative_dollars_traded', F.sum('dollars_traded').over(wind))
    df = df.withColumn('cumulative_volume', F.sum('volume').over(wind))
    df = df.withColumn('vwap', F.col('cumulative_dollars_traded')/F.col('cumulative_volume'))
    df = df.drop('average_price', 'dollars_traded', 'cumulative_dollars_traded', 'cumulative_volume')

    return df




In [None]:
sdf = add_vwap(sdf)

In [None]:
from pyspark.sql.types import TimestampType, DateType

def resample_data(df, time_bin):
    
    df_gr = df.groupBy(['symbol', F.window("timestamp", time_bin).alias('time_window')]).agg(F.first('open').alias('open'),
                                                            F.max('high').alias('high'),
                                                            F.min('low').alias('low'),
                                                            F.last('close').alias('close'),
                                                            F.sum('volume').alias('volume'),
                                                            F.avg('vwap').alias('vwap'))

    df_gr = df_gr.withColumn('start_timestamp', df_gr.time_window.start)\
                 .withColumn('end_timestamp', df_gr.time_window.end).drop('time_window').orderBy('start_timestamp')\
                 .withColumn('time_index', F.monotonically_increasing_id())
    
    return df_gr

def add_standard_columns_intraday(df):
    
    df = df.withColumn('range', (F.col('high') - F.col('low')))\
           .withColumn('body', (F.col('open') - F.col('close')))\
           .withColumn('weekday', F.dayofweek(F.col('start_timestamp')))\
           .withColumn('hour', F.hour(F.col('start_timestamp')))\
           .withColumn('date', F.date_format('start_timestamp', 'yyyy-MM-dd').cast(DateType()))\
           .withColumn('time', F.date_format('start_timestamp', 'H:mm:ss'))

    return df


In [None]:
df_resample = resample_data(sdf, "4 hour")