2.5.1


4.0.0


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, trim, coalesce, col

# Initialize Spark session
spark = SparkSession.builder.appName("StockPred").getOrCreate()

df_clean = spark.read.option("header", True).csv("D:\\stock\\Market.csv")


# Remove extra spaces from Date column
df_clean = df_clean.withColumn("Date", trim(col("Date")))


df_clean = df_clean.withColumn(
    "Date",
    coalesce(
        to_date(col("Date"), "M/d/yyyy"),
        to_date(col("Date"), "MM/dd/yyyy")
    )
)

df_clean.show()



+-----+----------+----------+----------+----------+----------+----------+------+
|Index|      Date|      Open|      High|       Low|     Close| Adj Close|Volume|
+-----+----------+----------+----------+----------+----------+----------+------+
|  NYA|1965-12-31|528.690002|528.690002|528.690002|528.690002|528.690002|     0|
|  NYA|1966-01-03|527.210022|527.210022|527.210022|527.210022|527.210022|     0|
|  NYA|1966-01-04|527.840027|527.840027|527.840027|527.840027|527.840027|     0|
|  NYA|1966-01-05|531.119995|531.119995|531.119995|531.119995|531.119995|     0|
|  NYA|1966-01-06|532.070007|532.070007|532.070007|532.070007|532.070007|     0|
|  NYA|1966-01-07|532.599976|532.599976|532.599976|532.599976|532.599976|     0|
|  NYA|1966-01-10|533.869995|533.869995|533.869995|533.869995|533.869995|     0|
|  NYA|1966-01-11|534.289978|534.289978|534.289978|534.289978|534.289978|     0|
|  NYA|1966-01-12|533.340027|533.340027|533.340027|533.340027|533.340027|     0|
|  NYA|1966-01-13|534.400024

In [6]:
df = spark.read.csv(r"D:\stock\Market.csv", header=True)

print("Number of rows:", df.count())
print("Number of columns:", len(df.columns))

df.show(5)

Number of rows: 112457
Number of columns: 8
+-----+----------+----------+----------+----------+----------+----------+------+
|Index|      Date|      Open|      High|       Low|     Close| Adj Close|Volume|
+-----+----------+----------+----------+----------+----------+----------+------+
|  NYA|12/31/1965|528.690002|528.690002|528.690002|528.690002|528.690002|     0|
|  NYA|  1/3/1966|527.210022|527.210022|527.210022|527.210022|527.210022|     0|
|  NYA|  1/4/1966|527.840027|527.840027|527.840027|527.840027|527.840027|     0|
|  NYA|  1/5/1966|531.119995|531.119995|531.119995|531.119995|531.119995|     0|
|  NYA|  1/6/1966|532.070007|532.070007|532.070007|532.070007|532.070007|     0|
+-----+----------+----------+----------+----------+----------+----------+------+
only showing top 5 rows


In [7]:
sample_cols = ["Open", "High", "Low", "Close", "Volume"]

df_small = df.select(sample_cols)
df_small.show(5)



+----------+----------+----------+----------+------+
|      Open|      High|       Low|     Close|Volume|
+----------+----------+----------+----------+------+
|528.690002|528.690002|528.690002|528.690002|     0|
|527.210022|527.210022|527.210022|527.210022|     0|
|527.840027|527.840027|527.840027|527.840027|     0|
|531.119995|531.119995|531.119995|531.119995|     0|
|532.070007|532.070007|532.070007|532.070007|     0|
+----------+----------+----------+----------+------+
only showing top 5 rows


In [8]:
from pyspark.sql.functions import col

df_small = df_small.select(
    [col(c).cast("double").alias(c) for c in sample_cols]
)
df_small.show(5)

+----------+----------+----------+----------+------+
|      Open|      High|       Low|     Close|Volume|
+----------+----------+----------+----------+------+
|528.690002|528.690002|528.690002|528.690002|   0.0|
|527.210022|527.210022|527.210022|527.210022|   0.0|
|527.840027|527.840027|527.840027|527.840027|   0.0|
|531.119995|531.119995|531.119995|531.119995|   0.0|
|532.070007|532.070007|532.070007|532.070007|   0.0|
+----------+----------+----------+----------+------+
only showing top 5 rows


In [9]:
from pyspark.sql.functions import col

# Select the real column names and rename them
df_clean = df.select(
    col("Open").cast("double").alias("open"),
    col("High").cast("double").alias("high"),
    col("Low").cast("double").alias("low"),
    col("Close").cast("double").alias("close"),
    col("Volume").cast("double").alias("volume")
)

df_clean.show(5)


+----------+----------+----------+----------+------+
|      open|      high|       low|     close|volume|
+----------+----------+----------+----------+------+
|528.690002|528.690002|528.690002|528.690002|   0.0|
|527.210022|527.210022|527.210022|527.210022|   0.0|
|527.840027|527.840027|527.840027|527.840027|   0.0|
|531.119995|531.119995|531.119995|531.119995|   0.0|
|532.070007|532.070007|532.070007|532.070007|   0.0|
+----------+----------+----------+----------+------+
only showing top 5 rows


In [10]:
from pyspark.sql.functions import mean, col

# Drop rows where all are null
df_clean = spark.read.option("header", True) \
    .option("nullValue", "null") \
    .option("inferSchema", True) \
    .csv("D:/stock/Market.csv")

df_clean = df_clean.na.drop(how="all")

# Identify numeric columns
numeric_cols = [f.name for f in df_clean.schema.fields if f.dataType.simpleString() in ("int", "bigint", "double", "float")]

# Build impute dictionary only for numeric columns
impute_dict = {}
for c in numeric_cols:
    mean_val = df_clean.select(mean(col(c))).collect()[0][0]
    impute_dict[c] = mean_val if mean_val is not None else 0

# Fill missing values
df_clean = df_clean.na.fill(impute_dict)

df_clean.show(5)


+-----+----------+----------+----------+----------+----------+----------+------+
|Index|      Date|      Open|      High|       Low|     Close| Adj Close|Volume|
+-----+----------+----------+----------+----------+----------+----------+------+
|  NYA|12/31/1965|528.690002|528.690002|528.690002|528.690002|528.690002|     0|
|  NYA|  1/3/1966|527.210022|527.210022|527.210022|527.210022|527.210022|     0|
|  NYA|  1/4/1966|527.840027|527.840027|527.840027|527.840027|527.840027|     0|
|  NYA|  1/5/1966|531.119995|531.119995|531.119995|531.119995|531.119995|     0|
|  NYA|  1/6/1966|532.070007|532.070007|532.070007|532.070007|532.070007|     0|
+-----+----------+----------+----------+----------+----------+----------+------+
only showing top 5 rows


In [11]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["open", "high", "low", "close", "volume"],
    outputCol="features"
)

df_features = assembler.transform(df_clean)

In [12]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
scaler_model = scaler.fit(df_features)
df_scaled = scaler_model.transform(df_features)
df_scaled.show(5)

+-----+----------+----------+----------+----------+----------+----------+------+--------------------+--------------------+
|Index|      Date|      Open|      High|       Low|     Close| Adj Close|Volume|            features|     scaled_features|
+-----+----------+----------+----------+----------+----------+----------+------+--------------------+--------------------+
|  NYA|12/31/1965|528.690002|528.690002|528.690002|528.690002|528.690002|     0|[528.690002,528.6...|[-0.7990700677731...|
|  NYA|  1/3/1966|527.210022|527.210022|527.210022|527.210022|527.210022|     0|[527.210022,527.2...|[-0.7992359343876...|
|  NYA|  1/4/1966|527.840027|527.840027|527.840027|527.840027|527.840027|     0|[527.840027,527.8...|[-0.7991653274899...|
|  NYA|  1/5/1966|531.119995|531.119995|531.119995|531.119995|531.119995|     0|[531.119995,531.1...|[-0.7987977298279...|
|  NYA|  1/6/1966|532.070007|532.070007|532.070007|532.070007|532.070007|     0|[532.070007,532.0...|[-0.7986912586092...|
+-----+---------

In [13]:
%pip install pandas

Note: you may need to restart the kernel to use updated packages.


In [14]:
pdf = df_scaled.select("scaled_features", "close").toPandas()
pdf = pdf.rename(columns={"close": "TARGET"})

In [15]:
import numpy as np
import torch
from torch.utils.data import Dataset
import numpy as np


class SparkDataset(Dataset):
    def __init__(self, spark_df):
        data = spark_df.collect()

        # Convert list of numpy arrays into one big NumPy array
        features = np.array([row.scaled_features.toArray() for row in data], dtype=np.float32)
        targets = np.array([row.close for row in data], dtype=np.float32).reshape(-1, 1)

        self.X = torch.from_numpy(features)
        self.y = torch.from_numpy(targets)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


In [16]:
%pip install scikit-learn

Note: you may need to restart the kernel to use updated packages.


In [17]:
from sklearn.model_selection import train_test_split

In [18]:
import numpy as np

# Collect from Spark DataFrame
data = df_scaled.select("scaled_features", "close").collect()

X = np.array([row.scaled_features.toArray() for row in data], dtype=np.float32)
y = np.array([row.close for row in data], dtype=np.float32)

In [19]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

In [20]:
import torch
from torch.utils.data import TensorDataset, DataLoader

# Convert only if they are NumPy arrays
if isinstance(X_train, np.ndarray):
    X_train = torch.from_numpy(X_train).float()
if isinstance(y_train, np.ndarray):
    y_train = torch.from_numpy(y_train).float().view(-1, 1)
if isinstance(X_test, np.ndarray):
    X_test = torch.from_numpy(X_test).float()
if isinstance(y_test, np.ndarray):
    y_test = torch.from_numpy(y_test).float().view(-1, 1)

# Wrap into TensorDataset
train_ds = TensorDataset(X_train, y_train)
test_ds = TensorDataset(X_test, y_test)

# DataLoaders
train_dl = DataLoader(train_ds, batch_size=32, shuffle=True)
test_dl = DataLoader(test_ds, batch_size=32)


In [21]:
import torch

class MyAdam:
    def __init__(self, params, lr=0.001, betas=(0.9, 0.999), eps=1e-8):
        self.params = list(params)          # model parameters
        self.lr = lr
        self.beta1, self.beta2 = betas
        self.eps = eps
        self.m = [torch.zeros_like(p) for p in self.params]  # first moment
        self.v = [torch.zeros_like(p) for p in self.params]  # second moment
        self.t = 0  # time step

    def step(self): #track of opt steps
        self.t += 1
        for i, p in enumerate(self.params):
            if p.grad is None:
                continue

            g = p.grad.data
            self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * g
            self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * (g * g)

            # Bias correction
            m_hat = self.m[i] / (1 - self.beta1 ** self.t)
            v_hat = self.v[i] / (1 - self.beta2 ** self.t)

            # Update parameter
            p.data -= self.lr * m_hat / (torch.sqrt(v_hat) + self.eps)

    def zero_grad(self):
        for p in self.params:
            if p.grad is not None:
                p.grad.detach_()
                p.grad.zero_()


In [22]:
import torch.nn as nn
import torch.optim as optim

class StockNN(nn.Module):
    def __init__(self, input_dim):
        super(StockNN, self).__init__()
        self.layers = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 1)  # regression (TARGET = price)
        )

    def forward(self, x):
        return self.layers(x)

# Initialize model
model = StockNN(input_dim=X_train.shape[1])
criterion = nn.MSELoss()

# Use custom Adam instead of torch.optim.Adam
optimizer = MyAdam(model.parameters(), lr=0.001)


In [23]:
sum(p.numel() for p in model.parameters() if p.requires_grad)

2497

In [30]:
model.eval()
with torch.no_grad():
    preds = model(X_test).numpy()

print("Sample predictions:", preds[:5].flatten())

Sample predictions: [ 2861.205     692.44086  2341.0344   7636.1357  12862.609  ]


In [25]:
import torch
import torch.nn as nn
import numpy as np

# assume: model, criterion, optimizer, train_dl, test_dl are already defined

for epoch in range(10):  # number of epochs
    model.train()
    train_losses = []

    # --- Training Loop ---
    for xb, yb in train_dl:  # batches from DataLoader
        preds = model(xb)                 # forward pass
        loss = criterion(preds, yb)       # compute loss
        train_losses.append(loss.item())

        optimizer.zero_grad()             # reset gradients
        loss.backward()                   # backward pass
        optimizer.step()                  # update params

    # --- Compute train loss & RMSE ---
    train_loss = np.mean(train_losses)
    train_rmse = np.sqrt(train_loss)

    # --- Evaluation on test data ---
    model.eval()
    test_losses = []
    with torch.no_grad():
        for xb, yb in test_dl:
            preds = model(xb)
            loss = criterion(preds, yb)
            test_losses.append(loss.item())

    test_loss = np.mean(test_losses)
    test_rmse = np.sqrt(test_loss)

    # --- Print results ---
    print(
        f"Epoch {epoch+1}, "
        f"Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}, "
        f"Train RMSE: {train_rmse:.4f}, Test RMSE: {test_rmse:.4f}"
    )



Epoch 1, Train Loss: 768339.3986, Test Loss: 16385.5905, Train RMSE: 876.5497, Test RMSE: 128.0062
Epoch 2, Train Loss: 652077.2239, Test Loss: 20664.3792, Train RMSE: 807.5130, Test RMSE: 143.7511
Epoch 3, Train Loss: 648604.1198, Test Loss: 5513.8133, Train RMSE: 805.3596, Test RMSE: 74.2551
Epoch 4, Train Loss: 625072.3534, Test Loss: 7867.3009, Train RMSE: 790.6152, Test RMSE: 88.6978
Epoch 5, Train Loss: 616930.9409, Test Loss: 34534.8973, Train RMSE: 785.4495, Test RMSE: 185.8357
Epoch 6, Train Loss: 618155.3060, Test Loss: 8093.1736, Train RMSE: 786.2285, Test RMSE: 89.9621
Epoch 7, Train Loss: 604505.5840, Test Loss: 10917.7047, Train RMSE: 777.4996, Test RMSE: 104.4878
Epoch 8, Train Loss: 616733.4181, Test Loss: 4881.8570, Train RMSE: 785.3238, Test RMSE: 69.8703
Epoch 9, Train Loss: 606415.4309, Test Loss: 9303.1683, Train RMSE: 778.7268, Test RMSE: 96.4529
Epoch 10, Train Loss: 593864.0790, Test Loss: 42571.6130, Train RMSE: 770.6258, Test RMSE: 206.3289


In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import trim, col, coalesce, to_date

# 1. Create Spark session
spark = SparkSession.builder.appName("MovieRecSys").getOrCreate()

# 2. Load CSV into df  (⚠️ update path)
df = spark.read.option("header", True).csv(r"D:\stock\Market.csv")

# 3. Trim spaces in Date column
df_clean = df.withColumn("Date", trim(col("Date")))

# 4. Try parsing both formats: "M/d/yyyy" and "MM/dd/yyyy"
df_clean = df_clean.withColumn(
    "Date",
    coalesce(
        to_date(col("Date"), "M/d/yyyy"),
        to_date(col("Date"), "MM/dd/yyyy")
    )
)

# 5. Drop rows where Date could not be parsed
df_clean = df_clean.na.drop(subset=["Date"])

print("After Cleaning:")
df_clean.show(5, truncate=False)


After Cleaning:
+-----+----------+----------+----------+----------+----------+----------+------+
|Index|Date      |Open      |High      |Low       |Close     |Adj Close |Volume|
+-----+----------+----------+----------+----------+----------+----------+------+
|NYA  |1965-12-31|528.690002|528.690002|528.690002|528.690002|528.690002|0     |
|NYA  |1966-01-03|527.210022|527.210022|527.210022|527.210022|527.210022|0     |
|NYA  |1966-01-04|527.840027|527.840027|527.840027|527.840027|527.840027|0     |
|NYA  |1966-01-05|531.119995|531.119995|531.119995|531.119995|531.119995|0     |
|NYA  |1966-01-06|532.070007|532.070007|532.070007|532.070007|532.070007|0     |
+-----+----------+----------+----------+----------+----------+----------+------+
only showing top 5 rows
