In [36]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, avg, when
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import LinearRegression, GBTRegressor
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [37]:
spark = SparkSession.builder.appName("TimeSeriesPrediction").getOrCreate()

In [38]:
data = spark.read.csv("/content/HDEF.csv", header=True, inferSchema=True)
data = data.withColumn("Date", col("Date").cast("timestamp"))


In [39]:
print(data.columns)


['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']


In [40]:
from pyspark.sql.functions import col, lag, avg
from pyspark.sql.window import Window

windowSpec = Window.orderBy("Date")

data = data.withColumn("Prev_Close", lag("Close", 1).over(windowSpec))
data = data.withColumn("MA_7", avg("Close").over(windowSpec.rowsBetween(-6, 0)))

data = data.dropna()  # Drop rows with null values


In [41]:
from pyspark.ml.feature import VectorAssembler

feature_cols = ["Open", "High", "Low", "Prev_Close", "MA_7", "Volume"]
existing_cols = [col_name for col_name in feature_cols if col_name in data.columns]

assembler = VectorAssembler(inputCols=existing_cols, outputCol="features")
data1 = assembler.transform(data).select("features", "Close")


In [42]:
print(data.columns)

['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'Prev_Close', 'MA_7']


In [30]:
train, test = data1.randomSplit([0.8, 0.2], seed=42)


In [43]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features", labelCol="Close")
lr_model = lr.fit(train)
reg_predictions = lr_model.transform(test)


In [32]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="Close", metricName="rmse")
rmse = evaluator.evaluate(reg_predictions)
print(f"Regression RMSE: {rmse}")


Regression RMSE: 0.05823250961918156


In [44]:
from pyspark.sql.functions import when

data = data.withColumn("Price_Change", when(col("Close") > col("Prev_Close"), "Up").otherwise("Down"))


In [45]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Price_Change", outputCol="label")
data2 = indexer.fit(data).transform(data)


In [59]:
feature_cols = ["Open", "High", "Low", "Prev_Close", "MA_7", "Volume"]
existing_cols = [col_name for col_name in feature_cols if col_name in data2.columns]
assembler = VectorAssembler(inputCols=existing_cols, outputCol="features")
data3 = assembler.transform(data2)

In [60]:
print(data3.columns)

['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'Prev_Close', 'MA_7', 'Price_Change', 'label', 'features']


In [61]:
train, test = data3.randomSplit([0.8, 0.2], seed=42)

In [62]:
train

DataFrame[Date: timestamp, Open: double, High: double, Low: double, Close: double, Adj Close: double, Volume: int, Prev_Close: double, MA_7: double, Price_Change: string, label: double, features: vector]

In [65]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Define hyperparameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01, 0.001]) \
    .build()

# Cross-validation
crossval = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)

# Fit cross-validated model
cvModel = crossval.fit(train)

# Evaluate on test set
cv_predictions = cvModel.transform(test)
rmse = evaluator.evaluate(cv_predictions)
print(f"Optimized Regression RMSE: {rmse}")


Optimized Regression RMSE: 0.0


In [63]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=100)
rf_model = rf.fit(train)
class_predictions = rf_model.transform(test)

In [66]:
rf_model.save("rf_model")
lr_model.save("lr_model")


In [67]:
from pyspark.ml.classification import RandomForestClassificationModel
loaded_model = RandomForestClassificationModel.load("rf_model")


In [68]:
spark = SparkSession.builder \
    .appName("BigDataML") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.cores", "4") \
    .getOrCreate()


In [64]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")
accuracy = evaluator.evaluate(class_predictions)
print(f"Classification Accuracy: {accuracy}")

Classification Accuracy: 0.8292682926829268
