In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=ebc0edcdb6a3b1ac0be7871ba5b86a1e43dbabc13411a5216da1fbca4b3e2a91
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession

# Initiating Spark session
spark = SparkSession.builder \
    .appName("StockAnalysis") \
    .getOrCreate()

df = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/stock_analysis_one.csv/Stock_analysis_modified.csv', header=True, inferSchema=True)

In [None]:
df.show()

+------+----------+------------------+------------------+------------------+------------------+------------------+-------+-----+---+----+------------------+------------------+----+------------------+------------------+
|Ticker|      Date|              Open|              High|               Low|             Close|         Adj Close| Volume|Month|Day|Year|         YearlHigh|         YearlyLow|Week|          WeekHigh|           WeekLow|
+------+----------+------------------+------------------+------------------+------------------+------------------+-------+-----+---+----+------------------+------------------+----+------------------+------------------+
|     A|2024-05-01|136.72000122070312|138.52999877929688| 136.1300048828125| 137.9199981689453| 137.9199981689453| 324723|    5|  1|2024|149.63999938964844|128.02000427246094|  18|138.60000610351562|136.72000122070312|
|     A|2024-04-30|138.60000610351562|139.64999389648438|136.97000122070312| 137.0399932861328| 137.0399932861328|1087300|  

In [None]:
df.dtypes

[('Ticker', 'string'),
 ('Date', 'date'),
 ('Open', 'double'),
 ('High', 'double'),
 ('Low', 'double'),
 ('Close', 'double'),
 ('Adj Close', 'double'),
 ('Volume', 'bigint'),
 ('Month', 'int'),
 ('Day', 'int'),
 ('Year', 'int'),
 ('YearlHigh', 'double'),
 ('YearlyLow', 'double'),
 ('Week', 'int'),
 ('WeekHigh', 'double'),
 ('WeekLow', 'double')]

##**Data Pre-Processing**

In [None]:
from pyspark.sql.functions import col, year, month, dayofmonth

# Convert Ticker to unique identifier code
df = df.withColumn("TickerId", col("Ticker").cast("string"))

# Extract year, month, and day from Date and convert to int
df = df.withColumn("Year", year("Date")) \
       .withColumn("Month", month("Date")) \
       .withColumn("Day", dayofmonth("Date")) \
       .drop("Date")

# Display the updated DataFrame
df.show()


+------+------------------+------------------+------------------+------------------+------------------+-------+-----+---+----+------------------+------------------+----+------------------+------------------+--------+
|Ticker|              Open|              High|               Low|             Close|         Adj Close| Volume|Month|Day|Year|         YearlHigh|         YearlyLow|Week|          WeekHigh|           WeekLow|TickerId|
+------+------------------+------------------+------------------+------------------+------------------+-------+-----+---+----+------------------+------------------+----+------------------+------------------+--------+
|     A|136.72000122070312|138.52999877929688| 136.1300048828125| 137.9199981689453| 137.9199981689453| 324723|    5|  1|2024|149.63999938964844|128.02000427246094|  18|138.60000610351562|136.72000122070312|       A|
|     A|138.60000610351562|139.64999389648438|136.97000122070312| 137.0399932861328| 137.0399932861328|1087300|    4| 30|2024|149.63

In [None]:
# Drop the 'Close' and 'Adj Close' features
df = df.drop("Close", "Adj Close")

# Display the updated DataFrame
df.show()


+------+------------------+------------------+------------------+-------+-----+---+----+------------------+------------------+----+------------------+------------------+--------+------------------+--------+
|Ticker|              Open|              High|               Low| Volume|Month|Day|Year|         YearlHigh|         YearlyLow|Week|          WeekHigh|           WeekLow|TickerId|           Balance|In_or_de|
+------+------------------+------------------+------------------+-------+-----+---+----+------------------+------------------+----+------------------+------------------+--------+------------------+--------+
|     A|136.72000122070312|138.52999877929688| 136.1300048828125| 324723|    5|  1|2024|149.63999938964844|128.02000427246094|  18|138.60000610351562|136.72000122070312|       A|1.1999969482421875|       1|
|     A|138.60000610351562|139.64999389648438|136.97000122070312|1087300|    4| 30|2024|149.63999938964844|128.02000427246094|  18|138.60000610351562|136.72000122070312|   

##**Feature Engineering**

In [None]:
from pyspark.sql.functions import when, abs

# Calculate the balance
df = df.withColumn("Balance", col("Adj Close") - col("Open"))

# Mark positive balance as 1 and negative balance as 0
df = df.withColumn("In_or_de", when(col("Balance") >= 0, 1).otherwise(0))

# Take the absolute value of balance
df = df.withColumn("Balance", abs(col("Balance")))

# Display the updated DataFrame
df.show()


+------+------------------+------------------+------------------+------------------+------------------+-------+-----+---+----+------------------+------------------+----+------------------+------------------+--------+------------------+--------+
|Ticker|              Open|              High|               Low|             Close|         Adj Close| Volume|Month|Day|Year|         YearlHigh|         YearlyLow|Week|          WeekHigh|           WeekLow|TickerId|           Balance|In_or_de|
+------+------------------+------------------+------------------+------------------+------------------+-------+-----+---+----+------------------+------------------+----+------------------+------------------+--------+------------------+--------+
|     A|136.72000122070312|138.52999877929688| 136.1300048828125| 137.9199981689453| 137.9199981689453| 324723|    5|  1|2024|149.63999938964844|128.02000427246094|  18|138.60000610351562|136.72000122070312|       A|1.1999969482421875|       1|
|     A|138.60000610

In [None]:
df.show()

+------+------------------+------------------+------------------+-------+-----+---+----+------------------+------------------+----+------------------+------------------+--------+------------------+--------+
|Ticker|              Open|              High|               Low| Volume|Month|Day|Year|         YearlHigh|         YearlyLow|Week|          WeekHigh|           WeekLow|TickerId|           Balance|In_or_de|
+------+------------------+------------------+------------------+-------+-----+---+----+------------------+------------------+----+------------------+------------------+--------+------------------+--------+
|     A|136.72000122070312|138.52999877929688| 136.1300048828125| 324723|    5|  1|2024|149.63999938964844|128.02000427246094|  18|138.60000610351562|136.72000122070312|       A|1.1999969482421875|       1|
|     A|138.60000610351562|139.64999389648438|136.97000122070312|1087300|    4| 30|2024|149.63999938964844|128.02000427246094|  18|138.60000610351562|136.72000122070312|   

In [None]:
# Count occurrences of each value in the 'In_or_de' column
count_df = df.groupBy("In_or_de").count()

# Display the count DataFrame
count_df.show()


+--------+-------+
|In_or_de|  count|
+--------+-------+
|       1|1744031|
|       0|5650101|
+--------+-------+



#**Working on a Sub Set**

##**Train-Test Split on a Sub sample**

In [None]:
from pyspark.sql import functions as F

# Take the first 30k rows
df_subset = df.limit(30000)
count_df_subset = df_subset.groupBy("In_or_de").count()
count_df_subset.show()


+--------+-----+
|In_or_de|count|
+--------+-----+
|       1| 4042|
|       0|25958|
+--------+-----+



In [None]:
# Split the data into train and test sets (80% train, 20% test)
train_df, test_df = df_subset.randomSplit([0.8, 0.2], seed=42)

# Display the number of rows in each set
print("Train set count:", train_df.count())
print("Test set count:", test_df.count())


Train set count: 24032
Test set count: 5968


##**Modeling**

##**Random Forest**

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, Imputer, StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import PCA

# Define numerical and categorical columns
numerical_columns = ["Open", "High", "Low", "Volume", "YearlHigh", "YearlyLow", "WeekHigh", "WeekLow", "Day", "Month", "Year", "Balance"]
categorical_columns = ["TickerId"]

# Define feature assembler for numerical columns
numerical_assembler = VectorAssembler(inputCols=numerical_columns, outputCol="numerical_features")

# Define SimpleImputer for handling null values in numerical columns
numerical_imputer = Imputer(strategy="median", inputCols=numerical_columns, outputCols=[column + "_imputed" for column in numerical_columns])

# Define StringIndexer for categorical columns
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="keep")
            for column in categorical_columns]

# Define OneHotEncoder for categorical columns
encoder = OneHotEncoder(inputCols=[indexer.getOutputCol() for indexer in indexers],
                        outputCols=[column + "_encoded" for column in categorical_columns])

# Define feature assembler to combine numerical and categorical features
feature_assembler = VectorAssembler(inputCols=numerical_columns + [column + "_encoded" for column in categorical_columns], outputCol="features")

# Define StandardScaler for scaling features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

# Define PCA for dimensionality reduction
pca = PCA(k=5, inputCol="scaled_features", outputCol="pca_features")

# Define Random Forest classifier
rf = RandomForestClassifier(labelCol="In_or_de", featuresCol="pca_features")

# Define pipeline
pipeline = Pipeline(stages=[numerical_assembler] + indexers + [encoder, feature_assembler, scaler, pca, rf])

# Define parameter grid for hyperparameter tuning
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [50, 100, 200])
             .addGrid(rf.maxDepth, [5, 10, 20])
             .build())

# Define evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="In_or_de", metricName="f1")

# Define cross-validator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)  # Number of folds for cross-validation

# Fit the model
cv_model = crossval.fit(train_df)

# Evaluate the model on the test set
predictions = cv_model.transform(test_df)
f1_score = evaluator.evaluate(predictions)

print("F1 Score:", f1_score)

# Print the best parameters
print("Best Parameters:")
print("Number of Trees:", cv_model.bestModel.stages[-1].getNumTrees)
print("Max Depth:", cv_model.bestModel.stages[-1].getOrDefault('maxDepth'))

F1 Score: 0.9105120627367687
Best Parameters:
Number of Trees: 200
Max Depth: 5


##**Logistic Regression**

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, Imputer, StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import PCA

# Define PCA for dimensionality reduction
pca = PCA(k=5, inputCol="scaled_features", outputCol="pca_features")

# Define Logistic Regression classifier
lr = LogisticRegression(labelCol="In_or_de", featuresCol="pca_features")

# Define pipeline
pipeline = Pipeline(stages=[numerical_assembler] + indexers + [encoder, feature_assembler, scaler, pca, lr])

# Define parameter grid for hyperparameter tuning
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.01])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .build())

# Define evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="In_or_de", metricName="f1")

# Define cross-validator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)  # Number of folds for cross-validation

# Fit the model
cv_model = crossval.fit(train_df)

# Evaluate the model on the test set
predictions = cv_model.transform(test_df)
f1_score = evaluator.evaluate(predictions)

print("F1 Score:", f1_score)

# Get the best model
best_model_lr = cv_model.bestModel.stages[-1]

# Print the best parameters
print("Best Parameters:")
print("Regularization Parameter:", best_model_lr.getOrDefault('regParam'))
print("Elastic Net Parameter:", best_model_lr.getOrDefault('elasticNetParam'))


F1 Score: 0.9070952293274992
Best Parameters:
Regularization Parameter: 0.01
Elastic Net Parameter: 1.0


#**Decision Tree**

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, Imputer, StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import PCA

# Define PCA for dimensionality reduction
pca = PCA(k=5, inputCol="scaled_features", outputCol="pca_features")

# Define Decision Tree classifier
dt = DecisionTreeClassifier(labelCol="In_or_de", featuresCol="pca_features")

# Define pipeline
pipeline = Pipeline(stages=[numerical_assembler] + indexers + [encoder, feature_assembler, scaler, pca, dt])

# Define parameter grid for hyperparameter tuning
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [5, 10, 20])
             .addGrid(dt.maxBins, [16, 32, 64])
             .build())

# Define evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="In_or_de", metricName="f1")

# Define cross-validator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)  # Number of folds for cross-validation

# Fit the model
cv_model = crossval.fit(train_df)

# Evaluate the model on the test set
predictions = cv_model.transform(test_df)
f1_score = evaluator.evaluate(predictions)

print("F1 Score:", f1_score)

# Get the best model
best_model_dt = cv_model.bestModel.stages[-1]

# Print the best parameters
print("Best Parameters:")
print("Max Depth:", best_model_dt.getMaxDepth())
print("Max Bins:", best_model_dt.getMaxBins())


F1 Score: 0.9095280186432009
Best Parameters:
Max Depth: 5
Max Bins: 16


##**Best Model(Entire Dataset)**

In [None]:
# Split the data into train and test sets (80% train, 20% test)
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Display the number of rows in each set
print("Train set count:", train_df.count())
print("Test set count:", test_df.count())

Train set count: 5914624
Test set count: 1479508


##**Logistic Regression**

In [None]:
# Define PCA for dimensionality reduction
pca = PCA(k=5, inputCol="scaled_features", outputCol="pca_features")

# Define Logistic Regression classifier with the best parameters
lr = LogisticRegression(labelCol="In_or_de", featuresCol="pca_features", regParam=0.01, elasticNetParam=1.0)

# Define pipeline
pipeline = Pipeline(stages=[numerical_assembler] + indexers + [encoder, feature_assembler, scaler, pca, lr])

# Fit the model
model = pipeline.fit(train_df)

# Make predictions on the test set
predictions = model.transform(test_df)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="In_or_de", metricName="f1")
f1_score = evaluator.evaluate(predictions)

print("F1 Score:", f1_score)


F1 Score: 0.6620702300899203


##**Random Forest**

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, Imputer, StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import PCA

# Define numerical and categorical columns
numerical_columns = ["Open", "High", "Low", "Volume", "YearlHigh", "YearlyLow", "WeekHigh", "WeekLow", "Day", "Month", "Year", "Balance"]
categorical_columns = ["TickerId"]

# Define feature assembler for numerical columns
numerical_assembler = VectorAssembler(inputCols=numerical_columns, outputCol="numerical_features")

# Define SimpleImputer for handling null values in numerical columns
numerical_imputer = Imputer(strategy="median", inputCols=numerical_columns, outputCols=[column + "_imputed" for column in numerical_columns])

# Define StringIndexer for categorical columns
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="keep")
            for column in categorical_columns]

# Define OneHotEncoder for categorical columns
encoder = OneHotEncoder(inputCols=[indexer.getOutputCol() for indexer in indexers],
                        outputCols=[column + "_encoded" for column in categorical_columns])

# Define feature assembler to combine numerical and categorical features
feature_assembler = VectorAssembler(inputCols=numerical_columns + [column + "_encoded" for column in categorical_columns], outputCol="features")

# Define StandardScaler for scaling features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

# Define PCA for dimensionality reduction
pca = PCA(k=5, inputCol="scaled_features", outputCol="pca_features")

# Define Random Forest classifier
rf = RandomForestClassifier(labelCol="In_or_de", featuresCol="pca_features", numTrees=200, maxDepth=5)

# Define pipeline
pipeline = Pipeline(stages=[numerical_assembler] + indexers + [encoder, feature_assembler, scaler, pca, rf])

# Fit the model
model = pipeline.fit(train_df)

# Make predictions on the test set
predictions = model.transform(test_df)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="In_or_de", metricName="f1")
f1_score = evaluator.evaluate(predictions)

print("F1 Score:", f1_score)


F1 Score: 0.6916681275273606


##**Conclusion:**

We chose F-1 score as our evaluation metric as the data set is imbalanced. Initially we implemented 3 different models on a subset of the data (30,000 rows) with hyperparameters, cross validation, gridsearch and pca. The best parameters are taken for the best model and implemented on the entire dataset. We did this for Random forest and logistic regression. Here we can clearly see that random forest is clearly performing well on the unknown data.