In [None]:
import os
import optuna
from optuna.trial import TrialState
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data
from torchvision import datasets
from torchvision import transforms
from pathlib import Path
from optuna_dashboard import run_server

In [None]:
DB_PATH = Path("/Users/maryamhomayoon/PycharmProjects/optuna/optuna-examples/db.sqlite3")
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
STORAGE = f"sqlite:///{DB_PATH.as_posix()}"

SEED = 42

DEVICE = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(f"Using device: {DEVICE}")
if DEVICE.type == "mps":
    torch.mps.manual_seed(SEED)
else:
    torch.manual_seed(SEED)

BATCHSIZE = 128
CLASSES = 10
DIR = os.getcwd()
EPOCHS = 10
# N_TRAIN_EXAMPLES = BATCHSIZE * 30
# N_VALID_EXAMPLES = BATCHSIZE * 10

In [None]:
def get_mnist():
    # Load FashionMNIST dataset.
    train_loader = torch.utils.data.DataLoader(
        datasets.FashionMNIST(DIR, train=True, download=True, transform=transforms.ToTensor()),
        batch_size=BATCHSIZE,
        shuffle=True,
    )
    test_loader = torch.utils.data.DataLoader(
        datasets.FashionMNIST(DIR, train=False, transform=transforms.ToTensor()),
        batch_size=BATCHSIZE,
        shuffle=False,
    )

    return train_loader, test_loader

In [None]:
# run for simple model with only linear layer
def define_model(trial):
    # We optimize the number of layers, hidden units and #dropout ratio in each layer.
    n_layers = trial.suggest_int("n_layers", 1, 10)
    layers = []
    in_out_features = [] 

    in_features = 28 * 28
    for i in range(n_layers):
        out_features = trial.suggest_int(f"n_units_l{i}", 4, 128)
        layers.append(nn.Linear(in_features, out_features))
        layers.append(nn.ReLU())
        in_out_features.append((in_features, out_features))
        # p = trial.suggest_float("dropout_l{}".format(i), 0.2, 0.5)
        # layers.append(nn.Dropout(p))

        in_features = out_features

    layers.append(nn.Linear(in_features, CLASSES))
    in_out_features.append((in_features, CLASSES))
    layers.append(nn.LogSoftmax(dim=1))

    return nn.Sequential(*layers) , in_out_features

In [None]:
# run for simple linear model
def objective(trial):
    # Generate the model.
    model, out = define_model(trial)
    model = model.to(DEVICE)
    
    # Generate the optimizers.
    # optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "RMSprop", "SGD"])
    # lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)
    # optimizer = getattr(optim, optimizer_name)(model.parameters(), lr=lr)
    lr = 0.001
    optimizer = optim.Adam(model.parameters(), lr=lr)

    latency = 0.0

    for i, (in_f, out_f) in enumerate(out):
        latency += estimate_linear_latency(in_f, out_f,num_cores=1, batch=BATCHSIZE)
    trial.set_user_attr("latency_ms_est", latency)

    # Get the FashionMNIST dataset.
    train_loader, valid_loader = get_mnist()

    # Training of the model.
    for epoch in range(EPOCHS):
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader):
            # # Limiting training data for faster epochs.
            # if batch_idx * BATCHSIZE >= N_TRAIN_EXAMPLES:
            #     break

            data, target = data.view(data.size(0), -1).to(DEVICE), target.to(DEVICE)    # since we only have linear layers we need to flatten the input first

            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()

        # Validation of the model.
        model.eval()
        correct = 0
        with torch.no_grad():
            for batch_idx, (data, target) in enumerate(valid_loader):
                # # Limiting validation data.
                # if batch_idx * BATCHSIZE >= N_VALID_EXAMPLES:
                #     break
                data, target = data.view(data.size(0), -1).to(DEVICE), target.to(DEVICE)
                output = model(data)
                # Get the index of the max log-probability.
                pred = output.argmax(dim=1, keepdim=True)
                correct += pred.eq(target.view_as(pred)).sum().item()

        accuracy = correct / len(valid_loader.dataset)

        # trial.report(accuracy, epoch)

        # # Handle pruning based on the intermediate value.
        # if trial.should_prune():
        #     raise optuna.exceptions.TrialPruned()
        
    return accuracy, latency

In [None]:
def estimate_linear_cost(in_features, out_features, num_cores=1, cycles_per_mac=1):
    macs = in_features * out_features
    bias_adds = out_features
    flops = 2 * macs + bias_adds
    cycles = (macs * cycles_per_mac) / num_cores
    return macs, flops, cycles

In [None]:
def estimate_conv_cost(c_in, c_out, h_in, w_in, kernel_size, stride=1, num_cores=1, cycles_per_mac=1):
    out_h = (h_in + stride - 1) // stride
    out_w = (w_in + stride - 1) // stride

    kernel_area = kernel_size * kernel_size

    # MACs:one multiply + one add (dot-products), for each output value, Cin*K*K MACs
    macs = c_out * out_h * out_w * (c_in * kernel_area)

    # FLOPs: 2 FLOPs per MAC (mul + add)
    # plus bias adds (one add per output value)
    bias_adds = c_out * out_h * out_w
    flops = 2 * macs + bias_adds
    
    cycles = (macs * cycles_per_mac) / num_cores

    return macs, flops, cycles

In [None]:
def estimate_maxpool_cost(channels, h_in, w_in, kernel_size=2, stride=2, num_cores=1, cycles_per_comp=1):
    # Output spatial size (no padding pool)
    out_h = (h_in - kernel_size) // stride + 1
    out_w = (w_in  - kernel_size) // stride + 1

    out_values = channels * out_h * out_w
    comps_per_out = kernel_size * kernel_size - 1

    macs = 0  # no multiply-accumulate
    flops = out_values * comps_per_out  # not "true FLOPs", but op-count
    cycles = (flops * cycles_per_comp) / num_cores

    return macs, flops, cycles

In [None]:
def estimate_global_avg_pool_cost(channels, h_in, w_in, num_cores=1, cycles_per_add=1, cycles_per_div=2):
    area = h_in * w_in

    adds = channels * (area - 1)
    divs = channels  # 1 divide per channel

    macs = 0
    flops = adds + divs  # again: "ops" count
    cycles = (adds * cycles_per_add + divs * cycles_per_div) / num_cores

    return macs, flops, cycles

In [None]:
# model with conv and linear while taking care of padding and while trying different strides
def define_model(trial):
    layers = []
    # FashionMNIST is 28x28 grayscale images
    in_channels = 1
    in_height = 28
    in_width = 28

    current_features = None
    spatial_mode = True     # a flag to control what architecture are allowed and that spatial sturctures of the connected layers makes sense
    used_global_pool = False    # we can classify with conv -> global pooling -> classifier(log softmax)

    layer_descriptions = []

    n_layers = trial.suggest_int("n_layers", 1, 10)

    for layer_idx in range(n_layers):
        # First layer should be conv or if it's not conv then we need to flatten the input
        # last layer should either be linear or we have to apply a conv with 10 output channels and then we have to apply global pooling and then classifier
        # 10 channel conv beacause it's FashionMNIST dataset
        # if the last global pool is not with 10 out channel then we linearly have it to 10 classes output and then go for classifier
        if layer_idx == 0:
            layer_type = "conv"
        elif spatial_mode:
            layer_type = trial.suggest_categorical(
                f"layer_type_{layer_idx}",
                ["conv", "pool", "global_pool", "linear"]
            )
        else:
            # Once spatial structure is gone, only Linear is allowed
            layer_type = "linear"

        if layer_type == "conv":
            out_channels = trial.suggest_categorical(
                f"conv_out_channels_{layer_idx}", [16, 32, 64, 128, 256, 512]
            )
            kernel_size = trial.suggest_int(
                f"conv_kernel_{layer_idx}", 1, 7, step=2  # odd only
            )
            stride = trial.suggest_categorical(
                f"conv_stride_{layer_idx}", [1, 2]
            )

            # Save input shape BEFORE the layer (for latency)
            c_in, h_in, w_in = in_channels, in_height, in_width

            padding = kernel_size // 2  # works with odd kernels for stride 1 and 2

            layers.append(
                nn.Conv2d(
                    in_channels=in_channels,
                    out_channels=out_channels,
                    kernel_size=kernel_size,
                    stride=stride,
                    padding=padding,
                )
            )
            layers.append(nn.ReLU())

            # Output size tracking (matches your "ceil" style)
            out_h = (h_in + stride - 1) // stride
            out_w = (w_in + stride - 1) // stride

            in_channels = out_channels
            in_height = out_h
            in_width = out_w

            layer_descriptions.append({
                "type": "conv2d",
                "out_channels": out_channels,
                "kernel_size": kernel_size,
                "stride": stride,
                "input_shape": (c_in, h_in, w_in),
            })

        elif layer_type == "pool":
            c_in, h_in, w_in = in_channels, in_height, in_width

            layers.append(nn.MaxPool2d(kernel_size=2, stride=2))

            in_height = h_in // 2
            in_width = w_in // 2

            layer_descriptions.append({
                "type": "maxpool2d",
                "kernel_size": 2,
                "stride": 2,
                "input_shape": (c_in, h_in, w_in),
            })

        elif layer_type == "global_pool":
            c_in, h_in, w_in = in_channels, in_height, in_width

            layers.append(nn.AdaptiveAvgPool2d((1, 1)))
            layers.append(nn.Flatten())

            current_features = c_in

            spatial_mode = False
            used_global_pool = True

            # After flatten, no more spatial dims
            in_channels = None
            in_height = None
            in_width = None

            layer_descriptions.append({
                "type": "global_avg_pool",
                "features": current_features,
                "input_shape": (c_in, h_in, w_in),
            })

        else:  # linear
            if spatial_mode:
                layers.append(nn.Flatten())
                current_features = in_channels * in_height * in_width
                spatial_mode = False

                in_channels = None
                in_height = None
                in_width = None

            out_features = trial.suggest_int(
                f"linear_out_{layer_idx}", 16, 128
            )

            layers.append(nn.Linear(current_features, out_features))
            layers.append(nn.ReLU())

            layer_descriptions.append({
                "type": "linear",
                "in_features": current_features,
                "out_features": out_features,
            })

            current_features = out_features

    # Final classifier head
    if used_global_pool:
        if current_features != CLASSES:
            layers.append(nn.Linear(current_features, CLASSES))
            layer_descriptions.append({
                "type": "linear",
                "in_features": current_features,
                "out_features": CLASSES,
            })
    else:
        if spatial_mode:
            layers.append(nn.Flatten())
            current_features = in_channels * in_height * in_width

        layers.append(nn.Linear(current_features, CLASSES))
        layer_descriptions.append({
            "type": "linear",
            "in_features": current_features,
            "out_features": CLASSES,
        })

    layers.append(nn.LogSoftmax(dim=1))

    return nn.Sequential(*layers), layer_descriptions

In [None]:
# run for conv model
def objective(trial):
    # Generate the model.
    model, out = define_model(trial)
    model = model.to(DEVICE)
    
    # Generate the optimizers.
    # optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "RMSprop", "SGD"])
    # lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)
    # optimizer = getattr(optim, optimizer_name)(model.parameters(), lr=lr)
    lr = 0.001
    optimizer = optim.Adam(model.parameters(), lr=lr)

    macs_total, flops_total, cycles_total = 0.0, 0.0, 0.0
    macs_per_layer, flops_per_layer, cycles_per_layer = [], [], []


    for layer in out:

        if layer["type"] == "conv2d":
            c_in, h_in, w_in = layer["input_shape"]
            macs, flops, cycles = estimate_conv_cost(c_in=c_in, c_out=layer["out_channels"], h_in=h_in, w_in=w_in, kernel_size=layer["kernel_size"], stride=layer["stride"])

        elif layer["type"] == "maxpool2d":
            c, h, w = layer["input_shape"]
            macs, flops, cycles = estimate_maxpool_cost(channels=c, h_in=h, w_in=w, kernel_size=layer["kernel_size"], stride=layer["stride"])

        elif layer["type"] == "global_avg_pool":
            c, h, w = layer["input_shape"]
            macs, flops, cycles = estimate_global_avg_pool_cost(channels=c, h_in=h, w_in=w)

        elif layer["type"] == "linear":
            macs, flops, cycles = estimate_linear_cost(layer["in_features"], layer["out_features"])

        else:
            continue

        macs_total += macs
        flops_total += flops
        cycles_total += cycles

        macs_per_layer.append(macs)
        flops_per_layer.append(flops)
        cycles_per_layer.append(cycles)

    # store for Optuna dashboard
    trial.set_user_attr("macs_total", macs_total)
    trial.set_user_attr("flops_total", flops_total)
    trial.set_user_attr("cycles_total", cycles_total)
    trial.set_user_attr("macs_per_layer", macs_per_layer)
    trial.set_user_attr("flops_per_layer", flops_per_layer)
    trial.set_user_attr("cycles_per_layer", cycles_per_layer)

    # 2nd objective
    latency = cycles_total

    # Get the FashionMNIST dataset.
    train_loader, valid_loader = get_mnist()

    # Training of the model.
    for epoch in range(EPOCHS):
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader):
            # # Limiting training data for faster epochs.
            # if batch_idx * BATCHSIZE >= N_TRAIN_EXAMPLES:
            #     break

            data, target = data.to(DEVICE), target.to(DEVICE)

            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()

        # Validation of the model.
        model.eval()
        correct = 0
        with torch.no_grad():
            for batch_idx, (data, target) in enumerate(valid_loader):
                # # Limiting validation data.
                # if batch_idx * BATCHSIZE >= N_VALID_EXAMPLES:
                #     break
                data, target = data.to(DEVICE), target.to(DEVICE)
                output = model(data)
                # Get the index of the max log-probability.
                pred = output.argmax(dim=1, keepdim=True)
                correct += pred.eq(target.view_as(pred)).sum().item()

        accuracy = correct / len(valid_loader.dataset)

        # trial.report(accuracy, epoch)

        # # Handle pruning based on the intermediate value.
        # if trial.should_prune():
        #     raise optuna.exceptions.TrialPruned()
        
    return accuracy, latency

In [None]:
study = optuna.create_study(
    directions=["maximize","minimize"],
    study_name="first try for the cycles objective",
    storage=STORAGE,
    load_if_exists=True
)
study.optimize(objective, n_trials=100, timeout=600)

for t in study.best_trials:
    acc, cycles = t.values  # because you returned (accuracy, latency_cycles)

    print("=" * 60)
    print(f"Trial #{t.number} | accuracy={acc:.4f} | cycles={cycles:.0f}")

    flops = t.user_attrs.get("flops_per_layer")
    latencies = t.user_attrs.get("cycles_per_layer")
    macs = t.user_attrs.get("macs_per_layer")

    print(f"MACs per layer   : {macs}")
    print(f"FLOPs per layer  : {flops}")
    print(f"Cycles per layer : {latencies}")

    print("Totals:")
    print("  MACs  :", t.user_attrs.get("macs_total"))
    print("  FLOPs :", t.user_attrs.get("flops_total"))
    print("  cycles:", t.user_attrs.get("cycles_total"))

# pruned_trials = study.get_trials(deepcopy=False, states=[TrialState.PRUNED])
# complete_trials = study.get_trials(deepcopy=False, states=[TrialState.COMPLETE])

# print("Study statistics: ")

# print("  Number of pruned trials: ", len(pruned_trials))
# print("  Number of finished trials: ", len(study.trials))
# print("  Number of complete trials: ", len(complete_trials))

# best_trials = study.best_trials
# print(f"\nNumber of Pareto-optimal trials: {len(best_trials)}")

# for t in best_trials:
#     print(f"  Values: accuracy={t.values[0]:.4f}, latency={t.values[1]:.4f}")
#     print("  Params:")
#     for k, v in t.params.items():
#         print(f"    {k}: {v}")

In [None]:
# Start the Optuna Dashboard server on localhost:8080
run_server(STORAGE)