# SafeNet: Data Pipeline

In [None]:
import os
from PIL import Image, ImageDraw
import matplotlib.pyplot as plt
import  numpy as np
import csv

In [None]:
%pip install -U albumentations

In [None]:
import albumentations as A

In [None]:
def load_single_image(input_folder, mask_folder, input_filename, mask_filename):
    input_path = os.path.join(input_folder, input_filename)
    mask_path = os.path.join(mask_folder, mask_filename)
    input_image = Image.open(input_path)
    mask_image = Image.open(mask_path)
    input_array = np.array(input_image)
    mask_array = np.array(mask_image)

    return input_array, mask_array

In [None]:
def load_data(input_folder, mask_folder, input_filenames, mask_filenames):
    input_list = []
    mask_list = []

    for input_filename, mask_filename in zip(input_filenames, mask_filenames):
        input_array, mask_array = load_single_image(input_folder, mask_folder, input_filename, mask_filename)
        input_list.append(input_array)
        mask_list.append(mask_array)

    return np.array(input_list), np.array(mask_list)

In [None]:
transform = A.Compose([
    A.Resize(600, 800),
    A.RandomBrightnessContrast(p=1.),
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.Rotate(limit=40)
])

In [None]:
resize = A.Compose([
    A.Resize(600, 800),
])

In [None]:
def anchor_label_1D(img,mask,threshold,step,anchor_boxes):
    TARGET_PIXEL_VALUES = np.array([0, 1, 2, 3, 5, 6, 7, 8])

    height = img.shape[0]
    width = img.shape[1]
    num_boxes = len(anchor_boxes)

    labels = np.zeros((height // step, width // step), dtype=int)
    for i in range(num_boxes):
        box = anchor_boxes[i]
        b_w = box[1]
        b_h = box[0]
        for h in range(0,height, step):
            cell_col = h // step
            for w in range(0,width,step):
                cell_row = w // step

                window = mask[h: h + b_h, w: w + b_w]
                pixel_count = np.isin(window, TARGET_PIXEL_VALUES).sum()
                if pixel_count >= threshold * b_h * b_w:
                    labels[cell_col][cell_row] = 1
    flattened_labels = labels.ravel()
    #print(flattened_labels)
    return np.array(flattened_labels)


In [None]:
def save_augmented_data(image_folder, mask_folder, num_augmentations, output_image_folder, output_mask_folder):

    batch_size = 256
    image_paths = sorted(os.listdir(image_folder))
    #print(image_paths)
    mask_paths = sorted(os.listdir(mask_folder))
    #print(mask_paths)
    print(f'Total Images: {len(image_paths)}')
    print(f'Total Masks: {len(mask_paths)}')

    total_images = 0
    for start_idx in range(0, len(image_paths), batch_size):
      end_idx = min(start_idx + batch_size, len(image_paths))
      batch_image_paths = image_paths[start_idx:end_idx]
      batch_mask_paths = mask_paths[start_idx:end_idx]

      # Load data for the current batch
      image_list, mask_list = load_data(image_folder, mask_folder, batch_image_paths, batch_mask_paths)
      print(f'Processing batch {start_idx} to {end_idx}...')
      for idx, (image, mask) in enumerate(zip(image_list, mask_list)):
          global_idx = start_idx + idx  # Global index for naming
          resized = resize(image=image, mask=mask)
          resized_image = np.array(resized['image'])
          resized_mask = np.array(resized['mask'])

          image_path = os.path.join(output_image_folder, f"image_{global_idx}_original.png")
          mask_path = os.path.join(output_mask_folder, f"mask_{global_idx}_original.png")
          Image.fromarray(resized_image).convert('RGB').save(image_path)
          Image.fromarray(resized_mask).convert('L').save(mask_path)

          total_images += 1

          for aug_idx in range(num_augmentations):
              transformed = transform(image=image, mask=mask)
              transformed_image = np.array(transformed['image'])
              transformed_mask = np.array(transformed['mask'])

              aug_image_path = os.path.join(output_image_folder, f"image_{global_idx}_aug_{aug_idx}.png")
              aug_mask_path = os.path.join(output_mask_folder, f"mask_{global_idx}_aug_{aug_idx}.png")


              Image.fromarray(transformed_image).convert('RGB').save(aug_image_path)
              Image.fromarray(transformed_mask).convert('L').save(aug_mask_path)

              total_images += 1

    print(f'Final Number of Images: {total_images}')

In [None]:
def labels_to_csv(img_folder, mask_folder, csv_file, threshold, step, anchor_boxes):
    with open(csv_file, mode='w', newline='') as file:
        writer = csv.writer(file)

        for image, mask in zip(os.listdir(img_folder), os.listdir(mask_folder)):
            image_path = os.path.join(img_folder, image)
            mask_path = os.path.join(mask_folder, mask)

            img_array, mask_array = load_single_image(img_folder,mask_folder,image_path,mask_path)

            anchor_label = anchor_label_1D(img_array, mask_array, threshold, step, anchor_boxes)

            writer.writerow([image] + anchor_label.tolist())

    print(f'Labels have been saved to {csv_file}')

In [None]:
def get_labels(csv_file):
    labels = []
    with open(csv_file, mode='r') as file:
        reader = csv.reader(file)
        for row in reader:
           label = np.array(row[1:], dtype=int)
           labels.append(label)

    return np.array(labels)


## Data Pipeline

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%%capture
#Training data
!unzip '' -d '/content/train_imgs'
!unzip '' -d '/content/train_masks'

#Val data
!unzip '' -d '/content/val_imgs'
!unzip '' -d '/content/val_masks'

#Test data
!unzip '' -d '/content/test_imgs'
!unzip '' -d '/content/test_masks'


In [None]:
train_imgs = ''
train_masks = ''

output_imgs = ''
output_masks = ''

label_file = ''

augmentations = 4
save_augmented_data(train_imgs,train_masks,augmentations,output_imgs,output_masks)

step = 10
threshold = 0.2
ANCHOR_BOXES = np.array([
    [50,50],
])
labels_to_csv(output_imgs,output_masks, label_file, threshold, step, ANCHOR_BOXES)

labels = get_labels(label_file)
print(labels.shape)


In [None]:
val_imgs = ''
val_masks = ''

output_imgs = ''
output_masks = ''

label_file = ''

augmentations = 0
save_augmented_data(val_imgs,val_masks,augmentations,output_imgs,output_masks)

step = 10
threshold = 0.2
ANCHOR_BOXES = np.array([
    [50,50],
])
labels_to_csv(output_imgs,output_masks, label_file, threshold, step, ANCHOR_BOXES)

labels = get_labels(label_file)
print(labels.shape)


In [None]:
test_imgs = ''
test_masks = ''

output_imgs = ''
output_masks = ''

label_file = ''

augmentations = 0
save_augmented_data(test_imgs,test_masks,augmentations,output_imgs,output_masks)

step = 10
threshold = 0.2
ANCHOR_BOXES = np.array([
    [50,50],
])
labels_to_csv(output_imgs,output_masks, label_file, threshold, step, ANCHOR_BOXES)

labels = get_labels(label_file)
print(labels.shape)