In [34]:
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import os

In [2]:
spark = SparkSession.builder \
        .appName("Stock Prediction") \
        .master("local[*]") \
        .config("spark.ui.port", "8080") \
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/02 00:18:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [23]:
schema = T.StructType([
  T.StructField("datetime", T.DateType()),
  T.StructField("open", T.DoubleType()),
  T.StructField("high", T.DoubleType()),
  T.StructField("low", T.DoubleType()),
  T.StructField("close", T.DoubleType()),
  T.StructField("volume", T.DoubleType()),
  ])

In [24]:
df = spark.read \
    .format("csv") \
    .option("header", True) \
    .option("mode", "FAILFAST") \
    .schema(schema) \
    .load("../data/raw/archive/D1/*.csv")

                                                                                

In [25]:
@F.udf(returnType=T.StringType())
def get_basename(path):
  filename = os.path.basename(path)
  filename_without_ext = os.path.splitext(filename)[0]
  return filename_without_ext.split('.')[0]

df = df.withColumn("ticket_name", get_basename(F.input_file_name()))

In [None]:
window_spec = Window.partitionBy(F.col("ticket_name")).orderBy(F.col("ticket_name"))

In [28]:
df = df.withColumn("price_change", F.col("close") - F.lag("close").over(window_spec)) \
  .withColumn("daily_return", F.col("price_change") / F.lag("close").over(window_spec)) \
  .withColumn("log_return", F.log(F.col("close") / F.lag("close").over(window_spec)))

In [None]:
df = df.withColumn("VMA", F.avg("volumne").over(window_spec.rowsBetween(-2, 0))) \
  .withColumn("vol_change", (F.col("volume") - F.lag("volume").over(window_spec)) / F.lag("volume").over(window_spec))

@F.udf(returnType=T.DoubleType())
def true_range(columns):
  return max(columns)
df = df.withColumn("TR", 
              true_range(
                F.col("high") - F.col("low"), 
                F.abs(F.col('high') - F.lag('close').over(window_spec)), 
                F.abs(F.col('low') - F.lag('close').over(window_spec))
                ))

DataFrame[datetime: date, open: double, high: double, low: double, close: double, volume: double, ticket_name: string, price_change: double, daily_return: double, log_return: double, TR: double]

In [None]:
df = df.withColumn("SMA", F.avg(F.col("close")).over(window_spec.rowsBetween(-2, 0))) # Last 3 days

df = df.withColumn("delta", F.col("close") - F.lag(col="close").over(window_spec)) \
  .withColumn("gain", F.when(F.col("delta") > 0, F.col("delta")).otherwise(0)) \
  .withColumn("loss", F.when(F.col("delta") < 0, -F.col("delta")).otherwise(0)) \
  .withColumn("average_gain", F.avg("gain").over(window_spec.rowsBetween(-3, 0))) \
  .withColumn("average_loss", F.avg("loss").over(window_spec.rowsBetween(-3, 0))) \
  .withColumn("RS", F.col("average_gain") / F.col("average_loss")) \
  .withColumn("RSI", 100 - 100/(F.col("RS") + 1)) \
  .drop("delta", "gain", "loss", "average_gain", "average_loss", "RS")  #RSI over 3 days

In [39]:
df = df.withColumn("formatted_timestamp", F.to_timestamp("datetime", "MM/dd/yyyy hh:mm:ss")) \
  .withColumn("is_weekday", (F.dayofweek("datetime") >=2) & (F.dayofweek("datetime") <= 6)) \
  .drop("datetime")

In [44]:
df.printSchema()

root
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: double (nullable = true)
 |-- ticket_name: string (nullable = true)
 |-- price_change: double (nullable = true)
 |-- daily_return: double (nullable = true)
 |-- log_return: double (nullable = true)
 |-- formatted_timestamp: timestamp (nullable = true)
 |-- is_weekday: boolean (nullable = true)

