In [130]:
!apt-get update -y
!apt-get install openjdk-11-jdk -y

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 https://cli.github.com/packages stable InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Get:8 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Hit:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Get:11 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,818 kB]
Hit:12 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:13 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [9,404 kB]
Fetched 12.2 MB in 4s (2,873 kB/s)
Reading package lists... Don

In [131]:
import re
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, LongType, IntegerType, StringType

In [132]:
# Read the file line by line
with open("gold_data_for_2024_2025.txt", "r") as file:
    raw_data = file.read().splitlines()  # or use: list(file)

data = []
for s in raw_data:
    # Skip empty lines
    if not s.strip():
        continue

    # Extract key=value pairs
    items = re.findall(r'(\w+)=([-\w.]+|None)', s)

    # Convert to dict
    d = {}
    for k, v in items:
        if v == "None":
            d[k] = None
        else:
            try:
                d[k] = float(v)
            except ValueError:
                d[k] = v
    data.append(d)

# Create PySpark DataFrame
spark = SparkSession.builder.appName("GoldData").getOrCreate()
schema = StructType([
    StructField("open", DoubleType(), True),
    StructField("high", DoubleType(), True),
    StructField("low", DoubleType(), True),
    StructField("close", DoubleType(), True),
    StructField("volume", DoubleType(), True),        # was LongType()
    StructField("vwap", DoubleType(), True),
    StructField("timestamp", DoubleType(), True),     # was LongType()
    StructField("transactions", DoubleType(), True), # was IntegerType()
    StructField("otc", StringType(), True)
])

# Then create the DataFrame with the schema
df = spark.createDataFrame(data, schema=schema)

df.show()

+------+------+------+------+------+--------+-------------+------------+----+
|  open|  high|   low| close|volume|    vwap|    timestamp|transactions| otc|
+------+------+------+------+------+--------+-------------+------------+----+
|191.61| 192.0|191.61| 192.0| 616.0|191.6739|  1.704186E12|         4.0|NULL|
| 192.2| 192.2| 192.2| 192.2| 515.0|   192.2|1.70418624E12|         6.0|NULL|
|192.25|192.25|192.25|192.25| 500.0|  192.25| 1.7041866E12|         1.0|NULL|
| 192.3| 192.3| 192.3| 192.3| 100.0|   192.3|1.70418666E12|         1.0|NULL|
| 192.3| 192.3| 192.3| 192.3| 250.0|   192.3|1.70418702E12|         3.0|NULL|
|192.32|192.32|192.26|192.26|3010.0|192.2997|1.70418768E12|        12.0|NULL|
| 192.6| 192.6| 192.6| 192.6| 300.0|   192.6|1.70418882E12|         1.0|NULL|
|192.26|192.26|192.26|192.26| 201.0|192.2702|1.70419038E12|         4.0|NULL|
|192.23|192.23|192.23|192.23| 598.0|  192.23|1.70419068E12|         1.0|NULL|
| 192.3| 192.3| 192.3| 192.3| 101.0|192.3002|1.70419146E12|     

In [133]:
df_5m = df.drop("otc",'transactions')

In [134]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col,when,lead

In [135]:
windowSpec = Window.orderBy("timestamp")

In [136]:
for i in range(10):
  df_5m = df_5m.withColumn(
    f"open{i}",
    lead("open", i).over(windowSpec)
)

  df_5m = df_5m.withColumn(
    f"high{i}",
    lead("high", i).over(windowSpec)
)
  df_5m = df_5m.withColumn(
    f"low{i}",
    lead("low", i).over(windowSpec)
)
  df_5m = df_5m.withColumn(
    f"close{i}",
    lead("close", i).over(windowSpec)
)
  df_5m = df_5m.withColumn(
    f"vwap{i}",
    lead("vwap", i).over(windowSpec)
)

df_5m = df_5m.withColumn(
    "BUY/SELL",
    when(lead("close", 15).over(windowSpec)>col('open'),1).otherwise(0)
)


In [137]:
from pyspark.sql.functions import row_number, desc

In [138]:
w = Window.orderBy(desc("timestamp"))   # replace with your ordering column
df_numbered = df_5m.withColumn("row_num", row_number().over(w))

# Keep everything except last 5
df_5m = df_numbered.filter("row_num > 10").drop("row_num")


In [139]:
for row in df_5m.tail(5):
    print(row)

Row(open=192.3, high=192.3, low=192.3, close=192.3, volume=250.0, vwap=192.3, timestamp=1704187020000.0, open0=192.3, high0=192.3, low0=192.3, close0=192.3, vwap0=192.3, open1=192.32, high1=192.32, low1=192.26, close1=192.26, vwap1=192.2997, open2=192.6, high2=192.6, low2=192.6, close2=192.6, vwap2=192.6, open3=192.26, high3=192.26, low3=192.26, close3=192.26, vwap3=192.2702, open4=192.23, high4=192.23, low4=192.23, close4=192.23, vwap4=192.23, open5=192.3, high5=192.3, low5=192.3, close5=192.3, vwap5=192.3002, open6=192.15, high6=192.15, low6=192.14, close6=192.14, vwap6=192.1522, open7=192.3, high7=192.3, low7=192.3, close7=192.3, vwap7=192.2994, open8=191.97, high8=191.97, low8=191.97, close8=191.97, vwap8=191.9767, open9=191.86, high9=191.86, low9=191.81, close9=191.81, vwap9=191.82, BUY/SELL=0)
Row(open=192.3, high=192.3, low=192.3, close=192.3, volume=100.0, vwap=192.3, timestamp=1704186660000.0, open0=192.3, high0=192.3, low0=192.3, close0=192.3, vwap0=192.3, open1=192.3, high1=

In [140]:
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

In [141]:

def evalutations(predictions):
    multi_evaluator = MulticlassClassificationEvaluator(labelCol="BUY/SELL", predictionCol="prediction")
    binary_evaluator = BinaryClassificationEvaluator(labelCol="BUY/SELL", rawPredictionCol="rawPrediction")

    accuracy = multi_evaluator.setMetricName("accuracy").evaluate(predictions)
    f1 = multi_evaluator.setMetricName("f1").evaluate(predictions)
    precision = multi_evaluator.setMetricName("weightedPrecision").evaluate(predictions)
    recall = multi_evaluator.setMetricName("weightedRecall").evaluate(predictions)
    auc = binary_evaluator.setMetricName("areaUnderROC").evaluate(predictions)

    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"AUC: {auc:.4f}")
    return predictions.groupBy("BUY/SELL", "prediction").count()

In [142]:
# Example columns
excluded_col = ["timestamp","transactions","BUY/SELL"]
selected_cols = [c for c in df_5m.columns if c not in excluded_col]

feature_cols = selected_cols

# Convert the target label to numeric
df_5m = df_5m.withColumn("BUY/SELL", col("BUY/SELL").cast("integer"))

In [143]:
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)
df_5m = assembler.transform(df_5m)

In [144]:
train_df, test_df = df_5m.randomSplit([0.7, 0.3], seed=42)

In [145]:
feature_cols

['open',
 'high',
 'low',
 'close',
 'volume',
 'vwap',
 'open0',
 'high0',
 'low0',
 'close0',
 'vwap0',
 'open1',
 'high1',
 'low1',
 'close1',
 'vwap1',
 'open2',
 'high2',
 'low2',
 'close2',
 'vwap2',
 'open3',
 'high3',
 'low3',
 'close3',
 'vwap3',
 'open4',
 'high4',
 'low4',
 'close4',
 'vwap4',
 'open5',
 'high5',
 'low5',
 'close5',
 'vwap5',
 'open6',
 'high6',
 'low6',
 'close6',
 'vwap6',
 'open7',
 'high7',
 'low7',
 'close7',
 'vwap7',
 'open8',
 'high8',
 'low8',
 'close8',
 'vwap8',
 'open9',
 'high9',
 'low9',
 'close9',
 'vwap9']

In [146]:
# Drop rows with any null values in the feature columns before grouping
train_df = train_df.na.drop(subset=feature_cols)
train_df.groupBy("BUY/SELL").count().show()

+--------+-----+
|BUY/SELL|count|
+--------+-----+
|       1|46737|
|       0|44463|
+--------+-----+



In [147]:
# Drop rows with any null values in the feature columns from test DataFrame before making predictions
test_df = test_df.na.drop(subset=feature_cols)

lr = LogisticRegression(featuresCol="features", labelCol="BUY/SELL")
model = lr.fit(train_df)
predictions = model.transform(test_df)

In [148]:
evalutations(predictions).show()

Accuracy: 0.7837
F1 Score: 0.7836
Precision: 0.7838
Recall: 0.7837
AUC: 0.8579
+--------+----------+-----+
|BUY/SELL|prediction|count|
+--------+----------+-----+
|       1|       1.0|15970|
|       1|       0.0| 3989|
|       0|       0.0|14779|
|       0|       1.0| 4499|
+--------+----------+-----+



In [149]:
model.save("model_lr_5m")

Py4JJavaError: An error occurred while calling o4504.save.
: java.io.IOException: Path model_lr_5m already exists. To overwrite it, please use write.overwrite().save(path) for Scala and use write().overwrite().save(path) for Java and Python.
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:683)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:167)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [None]:
   !zip -r /content/model_lr_5m.zip /content/model_lr_5m

In [None]:
predictions.select("BUY/SELL", "prediction", "probability").show(10, truncate=False)

In [None]:
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier

# Random Forest
rf = RandomForestClassifier(featuresCol="features", labelCol="BUY/SELL", numTrees=100)
model = rf.fit(train_df)
predictions = model.transform(test_df)
evalutations(predictions).show()

In [None]:
gbt = GBTClassifier(featuresCol="features", labelCol="BUY/SELL", maxIter=50)
model = gbt.fit(train_df)
predictions = model.transform(test_df)
predictions.select("BUY/SELL", "prediction", "probability").show(10, truncate=False)


In [None]:
evalutations(predictions).show()