In [9]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Stock_Price_Analysis") \
    .getOrCreate()

# Load historical stock price data
stock_price_data = spark.read.csv("stocks.csv", header=True, inferSchema=True)

# Show the first few rows of the DataFrame
stock_price_data.show()

# Visualize stock price trends over time
stock_price_data.createOrReplaceTempView("stock_prices")
spark.sql("SELECT Date, Close FROM stock_prices").show()

+------+----------+------------------+------------------+------------------+------------------+------------------+--------+
|Ticker|      Date|              Open|              High|               Low|             Close|         Adj Close|  Volume|
+------+----------+------------------+------------------+------------------+------------------+------------------+--------+
|  AAPL|2023-02-07|150.63999938964844|155.22999572753906|150.63999938964844|154.64999389648438| 154.4142303466797|83322600|
|  AAPL|2023-02-08| 153.8800048828125| 154.5800018310547| 151.1699981689453| 151.9199981689453| 151.6884002685547|64120100|
|  AAPL|2023-02-09|153.77999877929688| 154.3300018310547| 150.4199981689453| 150.8699951171875|150.63999938964844|56007100|
|  AAPL|2023-02-10| 149.4600067138672|151.33999633789062|149.22000122070312|151.00999450683594|151.00999450683594|57450700|
|  AAPL|2023-02-13| 150.9499969482422|154.25999450683594| 150.9199981689453|153.85000610351562|153.85000610351562|62199000|
|  AAPL|

In [10]:
spark.sql("SELECT Date, Volume FROM stock_prices").show()

+----------+--------+
|      Date|  Volume|
+----------+--------+
|2023-02-07|83322600|
|2023-02-08|64120100|
|2023-02-09|56007100|
|2023-02-10|57450700|
|2023-02-13|62199000|
|2023-02-14|61707600|
|2023-02-15|65573800|
|2023-02-16|68167900|
|2023-02-17|59144100|
|2023-02-21|58867200|
|2023-02-22|51011300|
|2023-02-23|48394200|
|2023-02-24|55469600|
|2023-02-27|44998500|
|2023-02-28|50547000|
|2023-03-01|55479000|
|2023-03-02|52238100|
|2023-03-03|70732300|
|2023-03-06|87558000|
|2023-03-07|56182000|
+----------+--------+
only showing top 20 rows



In [19]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Feature engineering
vectorAssembler = VectorA+ssembler(inputCols=["Open", "High", "Low", "Adj Close", "Volume"], outputCol="features")
feature_df = vectorAssembler.transform(stock_price_data)

# Split data into train and test sets
train_data, test_data = feature_df.randomSplit([0.8, 0.2], seed=42)

# Train a Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="Close")
lr_model = lr.fit(train_data)

# Make predictions
predictions = lr_model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="Close", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) for Tradding stock price Prediction:", rmse)


Root Mean Squared Error (RMSE) for Tradding stock price Prediction: 0.11759753623393689


In [20]:
# Similar to previous steps but with 'Volume' as the label column
lr = LinearRegression(featuresCol="features", labelCol="Volume")
lr_model = lr.fit(train_data)
predictions = lr_model.transform(test_data)
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) for Trading Volume Prediction:", rmse)

Root Mean Squared Error (RMSE) for Trading Volume Prediction: 42456847.62089314


In [40]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Stock_Price_Analysis") \
    .getOrCreate()

# Load stock price dataset
stock_price_data = spark.read.csv("stocks.csv", header=True, inferSchema=True)

# Feature engineering
feature_cols = ["Open", "High", "Low", "Adj Close"]
vectorAssembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
feature_df = vectorAssembler.transform(stock_price_data.select(feature_cols))

# Calculate correlation matrix
corr_matrix = Correlation.corr(feature_df, "features").head()[0]

# Display correlation matrix
print("Correlation Matrix for Stock Price Prediction:")
print("\t", end="")
for col in feature_cols:
    print(f"{col}\t", end="")
print()
for i, row in enumerate(corr_matrix.toArray()):
    print(f"{feature_cols[i]}\t", end="")
    for val in row:
        print(f"{val:.4f}\t", end="")
    print()


Correlation Matrix for Stock Price Prediction:
	Open	High	Low	Adj Close	
Open	1.0000	0.9996	0.9996	0.9992	
High	0.9996	1.0000	0.9997	0.9996	
Low	0.9996	0.9997	1.0000	0.9997	
Adj Close	0.9992	0.9996	0.9997	1.0000	


In [39]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Stock_Price_Analysis") \
    .getOrCreate()

# Load stock price dataset
stock_price_data = spark.read.csv("stocks.csv", header=True, inferSchema=True)

# Feature engineering
feature_cols = ["Open", "High", "Low", "Adj Close", "Volume"]
vectorAssembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
feature_df = vectorAssembler.transform(stock_price_data.select(feature_cols))

# Calculate correlation matrix
corr_matrix = Correlation.corr(feature_df, "features").head()[0]

# Display correlation values for the "Volume" feature
volume_index = feature_cols.index("Volume")
print("Correlation Matrix for Trading Volume Prediction:")
print(f"Correlation with Volume:")
for i, col in enumerate(feature_cols):
    print(f"{col}: {corr_matrix[volume_index, i]:.4f}")


Correlation Matrix for Trading Volume Prediction:
Correlation with Volume:
Open: -0.5477
High: -0.5462
Low: -0.5446
Adj Close: -0.5444
Volume: 1.0000


In [43]:
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Define the evaluator
evaluator = RegressionEvaluator(labelCol="Close", predictionCol="prediction", metricName="rmse")

# Train and evaluate different models
models = [LinearRegression(), DecisionTreeRegressor(), RandomForestRegressor()]
for model in models:
    model_name = model.__class__.__name__
    # Specify the label column correctly here
    label_col = "Close"
    model = model.setLabelCol(label_col).fit(train_data)
    predictions = model.transform(test_data)
    rmse = evaluator.evaluate(predictions)
    print(f"Root Mean Squared Error (RMSE) for {model_name}: {rmse}")


Root Mean Squared Error (RMSE) for LinearRegression: 0.11759753623393689
Root Mean Squared Error (RMSE) for DecisionTreeRegressor: 3.959521025741843
Root Mean Squared Error (RMSE) for RandomForestRegressor: 3.0740932681527173


In [47]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Stock_Price_Analysis") \
    .getOrCreate()

# Load stock price dataset
stock_price_data = spark.read.csv("stocks.csv", header=True, inferSchema=True)

# Define feature columns
feature_cols = ["Open", "High", "Low", "Adj Close", "Volume"]

# Ensure the label column exists in the dataset
if "Volume" not in stock_price_data.columns:
    raise ValueError("Volume column is missing in the dataset")

# Feature engineering
vectorAssembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
feature_df = vectorAssembler.transform(stock_price_data)

# Split the data into train and test sets
train_data, test_data = feature_df.randomSplit([0.8, 0.2], seed=42)

# Define the regression models
models = [
    LinearRegression(labelCol="Volume"),
    DecisionTreeRegressor(labelCol="Volume"),
    RandomForestRegressor(labelCol="Volume")
]

# Train and evaluate different models
evaluator = RegressionEvaluator(labelCol="Volume", predictionCol="prediction", metricName="rmse")
for model in models:
    model_name = model.__class__.__name__
    fitted_model = model.fit(train_data)
    predictions = fitted_model.transform(test_data)
    rmse = evaluator.evaluate(predictions)
    print(f"Root Mean Squared Error (RMSE) for {model_name} in Trading Volume Prediction: {rmse}")


Root Mean Squared Error (RMSE) for LinearRegression in Trading Volume Prediction: 1.4536125918779715e-08
Root Mean Squared Error (RMSE) for DecisionTreeRegressor in Trading Volume Prediction: 1591740.152249611
Root Mean Squared Error (RMSE) for RandomForestRegressor in Trading Volume Prediction: 2244980.467444841
