In [None]:
# preprocess.py #
# imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os

# create spark session
spark = SparkSession.builder.appName("FlowerPreprocessing").getOrCreate()

# path to dataset
# (each folder = one flower class)
base_dir = "/home/sat3812/projects/smallProject4/flowers"

# list the class names (folder names)
classes = os.listdir(base_dir)

# create a Spark dataframe manually from file paths and labels
data = []

for label, class_name in enumerate(classes):
    class_folder = os.path.join(base_dir, class_name)

    # Loop through all image files in each class
    for img_file in os.listdir(class_folder):
        full_path = os.path.join(class_folder, img_file)
        data.append((full_path, class_name, label))

# create Spark DataFrame
df = spark.createDataFrame(data, ["path", "label_str", "label_int"])

# shuffle data randomly
df = df.orderBy(F.rand(seed = 42))

# 80/20 split
train_df, test_df = df.randomSplit([0.8, 0.2], seed = 42)

# save to CSV for the training script
train_df.write.mode("overwrite").csv("train_data.csv", header = True)
test_df.write.mode("overwrite").csv("test_data.csv", header = True)

print("Preprocessing complete!")
print("Training samples:", train_df.count())
print("Test samples:", test_df.count())

spark.stop()

In [None]:
# train_cnn.py #
# imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pandas as pd
from PIL import Image
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

# dataset class to load flower images
class FlowerDataset(Dataset):
    def __init__(self, df: pd.DataFrame, transform = None):
        # store the dataframe and transforms
        self.df = df.reset_index(drop = True)
        self.transform = transform

    def __len__(self):
        # number of samples
        return len(self.df)

    def __getitem__(self, idx):
        # get row
        row = self.df.iloc[idx]

        # load image from file path
        img_path = row["path"]
        label = int(row["label_int"])

        # open image in RGB mode
        img = Image.open(img_path).convert("RGB")

        # apply transforms (resize, tensor, normalize)
        if self.transform:
            img = self.transform(img)

        return img, label

# CNN model
class CNNModel(nn.Module):
    def __init__(self, num_classes):
        super(CNNModel, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size = 3, padding = 1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(2), # 128 -> 64

            nn.Conv2d(16, 32, kernel_size = 3, padding = 1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2), # 64 -> 32

            nn.Conv2d(32, 64, kernel_size = 3, padding = 1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2), # 32 -> 16

            nn.Conv2d(64, 128, kernel_size = 3, padding = 1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2), # 16 -> 8

            nn.AdaptiveAvgPool2d((1, 1))
        )

        # fully connected classifier
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128, 256), # changed from 32 * 32 * 32, 128, then 64 * 16 * 16, 256, then 128 * 8 * 8, 256
            nn.ReLU(),
            nn.Dropout(0.5), # dropout to reduce overfitting, 0.5 -> 0.4
            nn.Linear(256, num_classes) # changed from 128
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

# training pipeline
def train_flower_cnn():

    # create spark session
    spark = (SparkSession.builder.appName("FlowerCNNTraining").getOrCreate())
    # suppress spark backend logs
    spark.sparkContext.setLogLevel("WARN")
    # spark.sparkContext.setLogLevel("ERROR")

    # paths to csv files
    train_csv_path = "/home/sat3812/projects/smallProject4/train.csv"
    test_csv_path = "/home/sat3812/projects/smallProject4/test.csv"

    # read csvs with spark
    train_df_spark = spark.read.csv(train_csv_path, header = True, inferSchema = True)
    test_df_spark = spark.read.csv(test_csv_path, header = True, inferSchema = True)

    # show a preview of data
    print("Train DataFrame preview:")
    train_df_spark.show(5)
    print("Test DataFrame preview:")
    test_df_spark.show(5)

    print("Train rows:", train_df_spark.count())
    print("Test rows:", test_df_spark.count())

    # convert spark DataFrames to pandas
    train_df = train_df_spark.toPandas()
    test_df = test_df_spark.toPandas()

    # stop spark
    spark.stop()

    # image transforms with augmentation
    img_transforms = transforms.Compose([
        transforms.Resize((128, 128)), # resize images
        transforms.RandomHorizontalFlip(), # randomly flip image
      # transforms.RandomRotation(20), # randomly rotate image 20 degrees
        transforms.ColorJitter(brightness = 0.1, contrast = 0.1, saturation = 0.1), # change color of images
        transforms.ToTensor(), # turn into 3xHxW tensors
        transforms.Normalize((0.5, 0.5, 0.5), # normalize RGB
                             (0.5, 0.5, 0.5))
    ])

    # create PyTorch datasets
    train_dataset = FlowerDataset(train_df, transform = img_transforms)
    test_dataset = FlowerDataset(test_df, transform = transforms.Compose([
        transforms.Resize((128, 128)),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]))

    # data loaders
    train_loader = DataLoader(train_dataset, batch_size = 32, shuffle = True)
    test_loader = DataLoader(test_dataset, batch_size = 32, shuffle = False)

    # number of flower classes
    num_classes = train_df["label_int"].nunique()
    print("Number of flower classes:", num_classes)

    # device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("Using device:", device)

    # create model
    model = CNNModel(num_classes).to(device)

    # loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr = 5e-4, weight_decay = 1e-4)

    # learning rate scheduler
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size = 5, gamma = 0.5)

    # training loop
    epochs = 15 # increase epochs
    print("Starting training loop..")
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        correct = 0
        total = 0

        for imgs, labels in train_loader:
            imgs = imgs.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(imgs)
            loss = criterion(outputs, labels)

            loss.backward()
            optimizer.step()

            total_loss += loss.item() * labels.size(0)

            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

        scheduler.step()
        print(f"Epoch {epoch+1}/{epochs} - Loss: {total_loss/total:.4f}, Acc: {correct/total:.4f}")

    # evaluate on test set
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for imgs, labels in test_loader:
            imgs = imgs.to(device)
            labels = labels.to(device)

            outputs = model(imgs)
            _, preds = torch.max(outputs, 1)

            correct += (preds == labels).sum().item()
            total += labels.size(0)

    print(f"Test Accuracy: {correct/total:.4f}")

    # save model
    torch.save(model.state_dict(), "flower_cnn.pth")
    print("Saved model as flower_cnn.pth")

# run script
if __name__ == "__main__":
    train_flower_cnn()
