In [None]:
!pip install qkeras
!pip install tensorflow==2.12.0 # 

In [None]:
import numpy as np

In [None]:
n_classes = 20

train_data = np.load(f"/kaggle/input/fpga-challenge-final-round/cityscapes_train_size(64 64).npz")
x_train = train_data['x_train']
y_train = train_data['y_train']

x_test = np.load(f"/kaggle/input/fpga-challenge-final-round/test_images_size(64 64).npy")

print(f"x_train shape: {x_train.shape}")
print(f"y_train shape: {y_train.shape}")
print(f"x_test shape: {x_test.shape}")

In [None]:
import matplotlib.pyplot as plt

# Sample visualization to verify extracted data
sample_idx = 0
fig, axes = plt.subplots(1, 2, figsize=(10, 5))

# Show original image
axes[0].imshow(x_train[sample_idx])
axes[0].set_title(f"Extracted Image\nShape: {x_train[sample_idx].shape}\nRange: [{x_train[sample_idx].min():.3f}, {x_train[sample_idx].max():.3f}]")
axes[0].axis('off')

# Show mask 
axes[1].imshow(y_train[sample_idx], cmap='tab20', vmin=0, vmax=n_classes-1)
axes[1].set_title(f"Extracted Mask\nShape: {y_train[sample_idx].shape}\nRange: [{y_train[sample_idx].min()}, {y_train[sample_idx].max()}]")
axes[1].axis('off')

plt.tight_layout()
plt.show()

print(f"Sample validation:")
print(f"  Image shape: {x_train[sample_idx].shape} (should be H, W, 3)")
print(f"  Mask shape: {y_train[sample_idx].shape} (should be H, W)")
print(f"  Image dtype: {x_train[sample_idx].dtype} (should be float32)")
print(f"  Mask dtype: {y_train[sample_idx].dtype} (should be int32)")
print(f"  Unique classes in sample: {np.unique(y_train[sample_idx])}")

In [None]:
from sklearn.model_selection import train_test_split

# Split the training data into train and validation sets (80:20 split)

x_train, x_val, y_train, y_val = train_test_split(
    x_train, y_train, test_size=0.2, random_state=42, stratify=y_train.reshape(len(y_train), -1)[:, 0]
)

print(f"Train set: {x_train.shape}, {y_train.shape}")
print(f"Validation set: {x_val.shape}, {y_val.shape}")

# build qkeras Model

In [None]:
from qkeras import *
from tensorflow.keras.layers import Input, MaxPooling2D, UpSampling2D, Concatenate
from tensorflow.keras.models import Model

def build_qkeras_unet_vgg(input_shape=(32, 64, 3), n_classes=20):
    
    inputs = Input(shape=input_shape)

    def vgg_qconv_block(x, filters, n_convs=2, activation='quantized_relu(6)',
                        kernel_quantizer='quantized_bits(6,0,alpha=1)',
                        bias_quantizer='quantized_bits(6,0,alpha=1)'):
        for _ in range(n_convs):
            x = QConv2DBatchnorm(
                filters=filters,
                kernel_size=3,
                padding='same',
                kernel_quantizer=kernel_quantizer,
                bias_quantizer=bias_quantizer,
                use_bias=True
            )(x)
            x = QActivation(activation)(x)
        return x

    # === ENCODER ===
    e1 = vgg_qconv_block(inputs, 16, n_convs=2)
    p1 = MaxPooling2D(pool_size=(2, 2))(e1)

    e2 = vgg_qconv_block(p1, 32, n_convs=2)
    p2 = MaxPooling2D(pool_size=(2, 2))(e2)

    e3 = vgg_qconv_block(p2, 64, n_convs=2)

    # === DECODER ===
    d2 = UpSampling2D(size=(2, 2))(e3)
    d2 = Concatenate()([d2, e2])
    d2 = vgg_qconv_block(d2, 32, n_convs=2)

    d1 = UpSampling2D(size=(2, 2))(d2)
    d1 = Concatenate()([d1, e1])
    d1 = vgg_qconv_block(d1, 16, n_convs=2)

    # === OUTPUT ===
    outputs = QConv2D(
        filters=n_classes,
        kernel_size=1,
        padding='same',
        activation=None,
        kernel_quantizer='quantized_bits(6,0,alpha=1)',
        bias_quantizer='quantized_bits(6,0,alpha=1)',
        use_bias=True
    )(d1)

    model = Model(inputs=inputs, outputs=outputs)
    return model

In [None]:
model = build_qkeras_unet_vgg(input_shape=(64, 64, 3), n_classes=n_classes)
model.summary()

# training

In [None]:
import numpy as np
from sklearn.metrics import jaccard_score
from tensorflow.keras.callbacks import Callback
import tensorflow as tf

class IoUCallback(Callback):

    def __init__(self, x_val, y_val, num_classes, batch_size=16, frequency=1):
        self.x_val = x_val
        self.y_val = y_val
        self.num_classes = num_classes
        self.batch_size = batch_size
        self.frequency = frequency  # Calculate IoU every N epochs

    def on_epoch_end(self, epoch, logs=None):
        # Only calculate IoU every N epochs to save time
        if (epoch + 1) % self.frequency != 0:
            return
            
        # === Batch prediction to avoid memory issues ===
        y_pred_list = []
        for i in range(0, len(self.x_val), self.batch_size):
            batch_x = self.x_val[i:i+self.batch_size]
            pred = self.model.predict(batch_x, verbose=0)
            # Convert logits to class predictions
            pred_classes = np.argmax(pred, axis=-1)
            y_pred_list.append(pred_classes)
        
        y_pred = np.concatenate(y_pred_list, axis=0)
        y_true = self.y_val

        # === Calculate IoU for each class ===
        ious = []
        class_counts = []
        
        for cls in range(self.num_classes):
            # Get binary masks for this class
            pred_mask = (y_pred == cls).astype(np.uint8)
            true_mask = (y_true == cls).astype(np.uint8)
            
            # Count pixels of this class in ground truth
            true_pixels = np.sum(true_mask)
            class_counts.append(true_pixels)
            
            # Skip classes that don't appear in ground truth
            if true_pixels == 0:
                if np.sum(pred_mask) == 0:
                    # Both prediction and ground truth have no pixels of this class
                    ious.append(1.0)  # Perfect IoU for absent class
                else:
                    # False positives for this class
                    ious.append(0.0)
                continue
            
            # Calculate IoU using sklearn (handles edge cases)
            iou = jaccard_score(
                true_mask.ravel(), 
                pred_mask.ravel(), 
                average='binary',
                zero_division=0.0  # Handle division by zero
            )
            ious.append(iou)

        # === Calculate metrics ===
        mean_iou = np.mean(ious)
        
        # Calculate weighted IoU (weighted by class frequency)
        total_pixels = np.sum(class_counts)
        if total_pixels > 0:
            weights = np.array(class_counts) / total_pixels
            weighted_iou = np.average(ious, weights=weights)
        else:
            weighted_iou = mean_iou

        # === Log results ===
        if logs is None:
            logs = {}
        
        logs['val_mean_iou'] = mean_iou
        logs['val_weighted_iou'] = weighted_iou
        
        print(f" — val_mean_IoU: {mean_iou:.4f} | val_weighted_IoU: {weighted_iou:.4f}")
        
        # Optionally print per-class IoU for debugging
        if (epoch + 1) % (self.frequency * 5) == 0:  # Every 5th IoU calculation
            print("Per-class IoU:")
            for cls in range(min(10, self.num_classes)):  # Show first 10 classes
                print(f"  Class {cls}: {ious[cls]:.3f} ({class_counts[cls]} pixels)")

# Create the IoU callback
iou_callback = IoUCallback(
    x_val=x_val, 
    y_val=y_val, 
    num_classes=n_classes, 
    batch_size=32,
    frequency=1  # Calculate every epoch
)

In [None]:
# === LOSS FUNCTIONS FOR SEGMENTATION ===
import tensorflow as tf
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import SparseCategoricalAccuracy

def dice_loss(y_true, y_pred, smooth=1e-6):
    """
    Dice Loss for segmentation (works with sparse labels)
    y_true: (batch, H, W) - integer class labels
    y_pred: (batch, H, W, num_classes) - logits/probabilities
    """
    # Convert logits to probabilities
    y_pred = tf.nn.softmax(y_pred, axis=-1)
    
    # Convert sparse labels to one-hot
    y_true_one_hot = tf.one_hot(tf.cast(y_true, tf.int32), depth=tf.shape(y_pred)[-1])
    
    # Flatten for computation
    y_true_flat = tf.reshape(y_true_one_hot, [-1, tf.shape(y_pred)[-1]])
    y_pred_flat = tf.reshape(y_pred, [-1, tf.shape(y_pred)[-1]])
    
    # Dice coefficient for each class
    intersection = tf.reduce_sum(y_true_flat * y_pred_flat, axis=0)
    union = tf.reduce_sum(y_true_flat, axis=0) + tf.reduce_sum(y_pred_flat, axis=0)
    
    dice = (2.0 * intersection + smooth) / (union + smooth)
    
    # Return 1 - mean dice as loss
    return 1.0 - tf.reduce_mean(dice)

def combined_loss(y_true, y_pred, dice_weight=0.7, ce_weight=0.3):
    """
    Combined Dice + Cross-Entropy Loss
    Often works better than either alone
    """
    dice = dice_loss(y_true, y_pred)
    ce = SparseCategoricalCrossentropy(from_logits=True)(y_true, y_pred)
    return dice_weight * dice + ce_weight * ce

In [None]:
model.compile(
    optimizer=Adam(learning_rate=1e-3),
    loss=combined_loss,
    metrics=[SparseCategoricalAccuracy(name='accuracy')]
)

In [None]:
callbacks = [
    iou_callback  # Only IoU monitoring for validation metrics
]

history = model.fit(
    x_train, y_train,
    batch_size=32,
    epochs=20, 
    validation_data=(x_val, y_val),
    callbacks=callbacks,
    verbose=1
)

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Vuisualize predictions
def visualize_predictions(model, x_val, y_val, num_samples=5):
    indices = np.random.choice(len(x_val), num_samples, replace=False)
    
    fig, axes = plt.subplots(num_samples, 3, figsize=(12, num_samples * 4))
    
    for i, idx in enumerate(indices):
        img = x_val[idx]
        true_mask = y_val[idx]
        
        # Predict mask
        pred_logits = model.predict(np.expand_dims(img, axis=0))
        pred_mask = np.argmax(pred_logits, axis=-1)[0]
        
        # Plot original image
        axes[i, 0].imshow(img)
        axes[i, 0].set_title("Original Image")
        axes[i, 0].axis('off')
        
        # Plot true mask
        axes[i, 1].imshow(true_mask, cmap='tab20', vmin=0, vmax=n_classes-1)
        axes[i, 1].set_title("True Mask")
        axes[i, 1].axis('off')
        
        # Plot predicted mask
        axes[i, 2].imshow(pred_mask, cmap='tab20', vmin=0, vmax=n_classes-1)
        axes[i, 2].set_title("Predicted Mask")
        axes[i, 2].axis('off')
    
    plt.tight_layout()
    plt.show()

visualize_predictions(model, x_val, y_val, num_samples=5)

In [None]:
from sklearn.metrics import jaccard_score

model.evaluate(x_val, y_val, verbose=1)

# evaluate on mIoU (mean Intersection over Union)

# Predict on validation set
y_pred_logits = model.predict(x_val, verbose=0)
y_pred_classes = np.argmax(y_pred_logits, axis=-1)

# Flatten for sklearn
y_true_flat = y_val.flatten()
y_pred_flat = y_pred_classes.flatten()

# Compute mean IoU over all classes
miou = jaccard_score(y_true_flat, y_pred_flat, average='macro', labels=np.arange(n_classes), zero_division=0)
print(f"Validation mIoU (sklearn): {miou:.4f}")

# hls4ml conversion

In [None]:
!pip install hls4ml==1.1.0

In [None]:
import hls4ml

In [None]:
config = hls4ml.utils.config_from_keras_model(model, 
                                              granularity='name', 
                                              backend='Vitis',
                                              default_precision='fixed<32,16>', 
                                              default_reuse_factor=16384,                                             
                                             )
config['Model']['Strategy'] = 'Resource'

In [None]:
hls_path = "cityscape_hls_project"

hls_model = hls4ml.converters.convert_from_keras_model(
    model,
    hls_config=config,
    output_dir=hls_path,
    backend = "Vitis",
    clock_period=5,
    io_type = "io_stream", # default is "io_parallel"
    part='xcu55c-fsvh2892-2L-e', # U55C FPGA
)

In [None]:
hls_model.compile()

# HLS prediction

In [None]:
from tqdm import tqdm

hls_pred_classes_all = []
hls_pred_all = []
for data in tqdm(x_test):
    hls_pred = hls_model.predict(np.expand_dims(data, axis=0))
    hls_pred_all.append(hls_pred)
    hls_pred = hls_pred.reshape(64, 64, 20)
    hls_pred_classes = np.argmax(hls_pred, axis=-1)
    hls_pred_classes_all.append(hls_pred_classes)

In [None]:
hls_pred_classes_all_np =  np.stack(hls_pred_classes_all, axis=0)
hls_pred_all_np =  np.stack(hls_pred_all, axis=0)

In [None]:
# Generate submission CSV
import pandas as pd

num_samples = len(x_test)

hls_pred_classes_all_np_flat = hls_pred_classes_all_np.reshape(num_samples, -1)

submission_df = pd.DataFrame(
    hls_pred_classes_all_np_flat,
    columns=[f"label{i}" for i in range(64*64)]
)
submission_df.insert(0, "id", np.arange(num_samples))

submission_df.to_csv("submission.csv", index=False)
print(f"submission.csv saved with shape: {submission_df.shape}")


In [None]:
# save testbench files for hls project
batch_num=5
x_test_batch = x_test[:batch_num]  # Use only a subset for testbench
hls_pred_all_np_batch = hls_pred_all_np[:batch_num]

#write x_test to tb_input_features.dat
INPUT_FILE = hls_path + '/tb_data/tb_input_features.dat'
np.savetxt(INPUT_FILE, x_test_batch.reshape(x_test_batch.shape[0], -1), delimiter=' ', fmt='%f')

#write x_test to tb_input_features.dat
OUTPUT_FILE = hls_path + '/tb_data/tb_output_predictions.dat'
np.savetxt(OUTPUT_FILE, hls_pred_all_np_batch.reshape(hls_pred_all_np_batch.shape[0], -1), delimiter=' ', fmt='%f')

# ZIP the HLS project

In [None]:
!zip -r {hls_path}.zip {hls_path}