In [None]:
import warnings
import os
import resource
warnings.simplefilter(action='ignore', category=FutureWarning)

import tensorflow as tf
import numpy as np
from PIL import Image
import pandas as pd

import matplotlib.pyplot as plt
import matplotlib.patches as patches

from tqdm.notebook import tqdm

from tensorflow import keras
from tensorflow.keras import optimizers
import keras_cv

from keras_cv import bounding_box, visualizations


from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf, PandasUDFType
from pyspark.sql.types import ArrayType, FloatType

from petastorm.spark import SparkDatasetConverter, make_spark_converter
from petastorm.transform import TransformSpec


MAX_BOXES = 20
IMG_SHAPE = (256,256)
FOLDER_PATH = "./images/"

from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession
config = ConfigProto()
config.gpu_options.allow_growth = True
session = InteractiveSession(config=config)



# Spark Session
spark = SparkSession.builder.master("local").getOrCreate()
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, "file:///src/object_detection/peta/")

# Load the data
df = spark.read.parquet("./annotations.parquet")["image_name","bounding_boxes"]

spark.conf.set('spark.sql.repl.eagerEval.enabled', True)



# Define a Pandas UDF to prepend the image path
@pandas_udf("string")
def prepend_path(img_name):
    return img_name.apply(lambda x: os.path.join(FOLDER_PATH, x))

# Add the full path to the image names
df = df.withColumn('image_name', prepend_path(df['image_name']))

# Prepare the data for splitting
df = df.limit(10).repartition(4)  # Use 4 partitions as an example, adjust as needed

# Split the data into train and validation datasets
df_train, df_val = df.randomSplit([0.9, 0.1], seed=12345)

# Convert to Petastorm datasets
converter_train = make_spark_converter(df_train)
converter_val = make_spark_converter(df_val)

# Your transform_row function and the rest of your code remains largely unchanged...




# Your data loading code and the rest remains unchanged..


def transform_row(pd_batch):
    num_rows = len(pd_batch)

    # Initialize arrays
    imgs = []
    labels = []

    for idx in range(num_rows):
        
        # Load the image
        # However, add error handling to the image processing step in your transform function:
        try:
            img = Image.open(pd_batch["image_name"].iloc[idx])
        except Exception as e:
            print(f"Error opening image {pd_batch['image_name'].iloc[idx]}: {e}")
            continue  # Skip this image
        
        # Original image size
        orig_width, orig_height = img.size
        
        # Resize and normalize the image
        img = img.resize(IMG_SHAPE[::-1]).convert('RGB')
        img = np.array(img) / 255.0

        # Compute the scale factors
        width_scale = IMG_SHAPE[1] / orig_width
        height_scale = IMG_SHAPE[0] / orig_height

        # Scale the bounding boxes
        bboxes = np.array(pd_batch["bounding_boxes"].iloc[idx])
        bboxes[0::5] = bboxes[0::5]*width_scale # x1-coordinates
        bboxes[2::5] = bboxes[2::5]*width_scale # x2-coordinates
        bboxes[1::5] = bboxes[1::5]*height_scale # y1-coordinates
        bboxes[3::5] = bboxes[3::5]*height_scale # y2-coordinates 

        # Pad array to MAX_BOXES*5
        if len(bboxes) < MAX_BOXES*5:
            bboxes = np.pad(bboxes,(0,MAX_BOXES*5-len(bboxes)),constant_values=-1)
        else:
            bboxes = bboxes[:MAX_BOXES*5]
        
        imgs.append(img)
        labels.append(bboxes)

    result_df = pd.DataFrame({"image": imgs, "labels": labels})

    return result_df

# And in your TransformSpec:
transform_spec_fn = TransformSpec(
    transform_row,
    edit_fields=[
        ("image",np.float32,IMG_SHAPE + (3,),False),
        ("labels",np.float32,(MAX_BOXES*5,),False)],
    removed_fields=["image_name","bounding_boxes"]
)


def make_ragged_boxes_and_classes(labels):
    t = tf.reshape(labels,(BATCH_SIZE,MAX_BOXES,5))
    d = tf.RaggedTensor.from_tensor(t, padding=[-1,-1,-1,-1,-1])
    classes = d[:,:,4]
    boxes = d[:,:,:4]

    return {"boxes":boxes,"classes":classes}



In [None]:
def make_ragged_boxes_and_classes(labels):
    t = tf.reshape(labels,(BATCH_SIZE,MAX_BOXES,5))
    d = tf.RaggedTensor.from_tensor(t, padding=[-1,-1,-1,-1,-1])
    classes = d[:,:,4]
    boxes = d[:,:,:4]

    # Return a dictionary
    return {"boxes":boxes,"classes":classes}



BATCH_SIZE = 2
data = None
i,b=[],[]
with converter_train.make_tf_dataset(transform_spec=transform_spec_fn, batch_size=BATCH_SIZE) as ds:
    
    ds = ds.map(lambda x: (x.image, make_ragged_boxes_and_classes(x.labels)))
    ds = ds.map(lambda x, y: {"image": x, "bounding_boxes": y})
    # Dataset matches KERAS specifications with bounding box format XYXY

    for data in ds.take(1):
        i.append(data)


class_mapping = ['SYH',
'SVB',
'CSV',
'CFR',
'CDY']


def visualize_dataset(inputs, value_range, rows, cols, bounding_box_format):
    images, bounding_boxes = inputs["image"], inputs["bounding_boxes"]
    visualization.plot_bounding_box_gallery(
        inputs["image"],
        value_range=value_range,
        rows=rows,
        cols=cols,
        y_true=bounding_boxes,
        scale=5,
        font_scale=0.7,
        bounding_box_format=bounding_box_format,
        class_mapping=class_mapping,
    )


class_mapping = ['SYH',
'SVB',
'CSV',
'CFR',
'CDY']

model = keras_cv.models.RetinaNet.from_preset(
    "resnet50_imagenet",
    num_classes=len(class_mapping),
    # For more info on supported bounding box formats, visit
    # https://keras.io/api/keras_cv/bounding_box/
    bounding_box_format="xyxy",
)


base_lr = 0.005
# including a global_clipnorm is extremely important in object detection tasks
optimizer = tf.keras.optimizers.SGD(
    learning_rate=base_lr, momentum=0.9, global_clipnorm=10.0
    )

model.compile(
    classification_loss="focal",
    box_loss="smoothl1",
    optimizer=optimizer,
    # We will use our custom callback to evaluate COCO metrics
    metrics=None,
)

BATCH_SIZE = 2
data = None

class_mapping = ['SYH',
'SVB',
'CSV',
'CFR',
'CDY']


class EvaluateCOCOMetricsCallback(keras.callbacks.Callback):
    def __init__(self, data):
        super().__init__()
        self.data = data
        self.metrics = keras_cv.metrics.BoxCOCOMetrics(
            bounding_box_format="xyxy",
            # passing 1e9 ensures we never evaluate until
            # `metrics.result(force=True)` is
            # called.
            evaluate_freq=1e9,
        )

    def on_epoch_end(self, epoch, logs):
        self.metrics.reset_state()
        for batch in tqdm(self.data):
            images, y_true = batch[0], batch[1]
            y_pred = self.model.predict(images, verbose=0)
            self.metrics.update_state(y_true, y_pred)

        metrics = self.metrics.result(force=True)
        logs.update(metrics)
        return logs
    

def dict_to_tuple(inputs):
    return inputs["images"], bounding_box.to_dense(
        inputs["bounding_boxes"], max_boxes=20
    )




with converter_train.make_tf_dataset(transform_spec=transform_spec_fn, batch_size=BATCH_SIZE) as dst, \
    converter_val.make_tf_dataset(transform_spec=transform_spec_fn, batch_size=BATCH_SIZE) as dsv :
    
    dst = dst.map(lambda x: (x.image, make_ragged_boxes_and_classes(x.labels)))
    dst = dst.map(lambda x, y: {"images": x, "bounding_boxes": y})
    dsv = dsv.map(lambda x: (x.image, make_ragged_boxes_and_classes(x.labels)))
    dsv = dsv.map(lambda x, y: {"images": x, "bounding_boxes": y})
    dsv = dsv.map(dict_to_tuple, num_parallel_calls=tf.data.AUTOTUNE)
    dst = dst.map(dict_to_tuple, num_parallel_calls=tf.data.AUTOTUNE)
    
    model.fit(
        dst.take(4),
        validation_data=dsv.take(4),
        # Run for 10-35~ epochs to achieve good scores.
        epochs=1,
        callbacks=[EvaluateCOCOMetricsCallback(dsv.take(4))],
    )
