#**Enhanced Pedestrian Intention Detection for ADAS and Autonomous Vehicles**
Training and evaluation of the DenseNet model through the use of the JAAD dataset.


##Step 1: Mount google drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Steps 2-5
## Step 2: Clone the JAAD repositry to the content directory on google colab:
https://github.com/ykotseruba/JAAD

##Step 3: Upload a zipped file of the JAAD clips into your google drive:
http://data.nvision2.eecs.yorku.ca/JAAD_dataset/data/JAAD_clips.zip

##Step 4: Unzip JAAD_clips.zip and copy the jaad_data.py file into the content directory on google colab.

##Step 5: Clone the YOLOP repository into the content directory.

In [2]:
!unzip "/content/drive/My Drive/JAAD/JAAD_clips.zip" -d "/content"
!git clone https://github.com/ykotseruba/JAAD.git
!cp '/content/JAAD/jaad_data.py' '/content'
!git clone https://github.com/hustvl/YOLOP

Archive:  /content/drive/My Drive/JAAD/JAAD_clips.zip
replace /content/JAAD_clips/video_0001.mp4? [y]es, [n]o, [A]ll, [N]one, [r]ename: Cloning into 'JAAD'...
remote: Enumerating objects: 6149, done.[K
remote: Counting objects: 100% (718/718), done.[K
remote: Compressing objects: 100% (66/66), done.[K
remote: Total 6149 (delta 668), reused 695 (delta 652), pack-reused 5431[K
Receiving objects: 100% (6149/6149), 42.16 MiB | 12.21 MiB/s, done.
Resolving deltas: 100% (5487/5487), done.
fatal: destination path 'YOLOP' already exists and is not an empty directory.


## Step 5: Import keras and tensorflow utilities

In [3]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import keras
import keras.backend as K
from keras.layers import Flatten
from keras.layers import Activation
from keras.layers import AveragePooling3D
from keras.layers import BatchNormalization
from keras.layers import Conv3D
from keras.layers import Conv3DTranspose
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import GlobalAveragePooling3D
from keras.layers import GlobalMaxPooling3D
from keras.layers import Input
from keras.layers import MaxPooling3D
from keras.layers import Reshape
from keras.layers import UpSampling3D
from keras.layers import concatenate
from keras.models import Model
from keras.regularizers import l2
!sudo pip install git+https://www.github.com/keras-team/keras-contrib.git
from keras_contrib.layers import SubPixelUpscaling
import csv
import os
import pandas as pd
from keras.utils import to_categorical
import h5py
import numpy as np
import matplotlib.pyplot as plt
import cv2

import tensorflow as tf
from tensorflow.keras.metrics import Precision, Recall, AUC
from sklearn.metrics import f1_score
from tensorflow.keras.utils import get_source_inputs

from sklearn.model_selection import train_test_split

from jaad_data import JAAD

Collecting git+https://www.github.com/keras-team/keras-contrib.git
  Cloning https://www.github.com/keras-team/keras-contrib.git to /tmp/pip-req-build-3b4n_ual
  Running command git clone --filter=blob:none --quiet https://www.github.com/keras-team/keras-contrib.git /tmp/pip-req-build-3b4n_ual
  Resolved https://www.github.com/keras-team/keras-contrib.git to commit 3fc5ef709e061416f4bc8a92ca3750c824b5d2b0
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: keras-contrib
  Building wheel for keras-contrib (setup.py) ... [?25l[?25hdone
  Created wheel for keras-contrib: filename=keras_contrib-2.0.8-py3-none-any.whl size=101055 sha256=c653cd3fccc388165043e07f3d287c0b3d9913abfb00a2cffff382cfcde0b9b8
  Stored in directory: /tmp/pip-ephem-wheel-cache-6u35440g/wheels/74/d5/f7/0245af7ac33d5b0c2e095688649916e4bf9a8d6b3362a849f5
Successfully built keras-contrib
Installing collected packages: keras-contrib
Successfully installed keras-contrib-2.0.8


## Step 6: Define a function YOLOPdetect that returns the drivable area and lane segmentation for an image.

 This is a modified version of the YOLOPdetect function that is in demo.py in the YOLOP directory. All credit goes to its authors.

In [4]:
import argparse
import os, sys

BASE_DIR = os.path.dirname('/content/YOLOP')
sys.path.append('/content/YOLOP')

import shutil
import time
from pathlib import Path
import imageio

print(sys.path)
import torch
import torch.backends.cudnn as cudnn
from numpy import random
import scipy.special
import torchvision.transforms as transforms
import PIL.Image as image

!sudo pip install yacs
from lib.config import cfg
from lib.config import update_config
from lib.utils.utils import create_logger, select_device, time_synchronized
from lib.models import get_net
from lib.dataset import LoadImages, LoadStreams
from lib.core.general import non_max_suppression, scale_coords
from lib.utils import plot_one_box,show_seg_result
from lib.core.function import AverageMeter
from lib.core.postprocess import morphological_process, connect_lane
from lib.utils import letterbox_for_img, clean_str
from tqdm import tqdm
from numba import cuda

normalize = transforms.Normalize(
        mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
    )

transform=transforms.Compose([
            transforms.ToTensor(),
            normalize,
        ])

class opt:
      device = '0'
      weights = '/content/YOLOP/weights/End-to-end.pth'
      source = '/content/YOLOPstream.png'
      img_size = 640
      conf_thres = 0.25
      iou_thres = 0.45
      save_dir = '/content/YOLOP/output'

opt = opt()

device = select_device(None,opt.device)
half = device.type != 'cpu'  # half precision only supported on CUDA

# Load model
yolop_model = get_net(cfg)
checkpoint = torch.load(opt.weights, map_location= device)
yolop_model.load_state_dict(checkpoint['state_dict'])
yolop_model = yolop_model.to(device)
if half:
    yolop_model.half()  # to FP16

img = torch.zeros((1, 3, opt.img_size, opt.img_size), device=device)  # init img
_ = yolop_model(img.half() if half else img) if device.type != 'cpu' else None  # run once
yolop_model.eval()

def YOLOPdetect(img0):

    h0, w0 = img0.shape[:2]
    img, ratio, pad = letterbox_for_img(img0, new_shape=opt.img_size, auto=True)
    h, w = img.shape[:2]
    shapes = (h0, w0), ((h / h0, w / w0), pad)

    dataset = [(None, img, img0, None, shapes)]
    bs = 1  # batch_size


    # Get names and colors
    names = yolop_model.module.names if hasattr(yolop_model, 'module') else yolop_model.names
    colors = [[random.randint(0, 255) for _ in range(3)] for _ in range(len(names))]


    # Run inference
    t0 = time.time()

    inf_time = AverageMeter()
    nms_time = AverageMeter()



    da_seg_mask = None

    for i, (path, img, img_det, vid_cap,shapes) in enumerate(dataset):
        with torch.no_grad():
          torch.cuda.empty_cache()
          img = transform(img).to(device)
          img = img.half() if half else img.float()  # uint8 to fp16/32
          if img.ndimension() == 3:
              img = img.unsqueeze(0)
          # Inference
          t1 = time_synchronized()
          det_out, da_seg_out,ll_seg_out= yolop_model(img)
          t2 = time_synchronized()
          inf_out, _ = det_out
          inf_time.update(t2-t1,img.size(0))

          # Apply NMS
          t3 = time_synchronized()
          det_pred = non_max_suppression(inf_out, conf_thres=opt.conf_thres, iou_thres=opt.iou_thres, classes=None, agnostic=False)
          t4 = time_synchronized()

          nms_time.update(t4-t3,img.size(0))
          det=det_pred[0]

          _, _, height, width = img.shape
          h,w,_=img_det.shape
          pad_w, pad_h = shapes[1][1]
          pad_w = int(pad_w)
          pad_h = int(pad_h)
          ratio = shapes[1][0][1]

          da_predict = da_seg_out[:, :, pad_h:(height-pad_h),pad_w:(width-pad_w)]
          da_seg_mask = torch.nn.functional.interpolate(da_predict, scale_factor=int(1/ratio), mode='bilinear')
          _, da_seg_mask = torch.max(da_seg_mask, 1)
          da_seg_mask = da_seg_mask.int().squeeze().cpu().numpy()

          ll_predict = ll_seg_out[:, :,pad_h:(height-pad_h),pad_w:(width-pad_w)]
          ll_seg_mask = torch.nn.functional.interpolate(ll_predict, scale_factor=int(1/ratio), mode='bilinear')
          _, ll_seg_mask = torch.max(ll_seg_mask, 1)
          ll_seg_mask = ll_seg_mask.int().squeeze().cpu().numpy()

          img_det = show_seg_result(img_det, (da_seg_mask, ll_seg_mask), _, _, is_demo=True)

          del ll_predict
          del da_predict
          del ll_seg_mask
          del da_seg_mask
          del inf_out
          del det_out
          del det_pred
          del img
          torch.cuda.empty_cache()

    img_det = cv2.resize(img_det, (w0,h0))
    return img_det

['/content', '/env/python', '/usr/lib/python310.zip', '/usr/lib/python3.10', '/usr/lib/python3.10/lib-dynload', '', '/usr/local/lib/python3.10/dist-packages', '/usr/lib/python3/dist-packages', '/usr/local/lib/python3.10/dist-packages/IPython/extensions', '/root/.ipython', '/content/YOLOP']
Collecting yacs
  Downloading yacs-0.1.8-py3-none-any.whl (14 kB)
Installing collected packages: yacs
Successfully installed yacs-0.1.8


  return F.conv2d(input, weight, bias, self.stride,


##Step 7: Setup YOLO-NAS-Pose through super-gradients

In [5]:
!pip install super-gradients
from super_gradients.common.object_names import Models
from super_gradients.training import models
import sys
from super_gradients.training.utils.visualization.pose_estimation import draw_skeleton # draws skeletal graph given pose keypoints

model_pose = models.get(Models.YOLO_NAS_POSE_S, pretrained_weights="coco_pose")

model_pose = model_pose.to("cuda")



[2024-06-13 16:52:59] INFO - crash_tips_setup.py - Crash tips is enabled. You can set your environment variable to CRASH_HANDLER=FALSE to disable it


The console stream is logged into /root/sg_logs/console.log


 It is your responsibility to determine whether you have permission to use the models for your use case.
 The model you have requested was pre-trained on the coco_pose dataset, published under the following terms: https://cocodataset.org/#termsofuse
[2024-06-13 16:53:01] INFO - checkpoint_utils.py - License Notification: YOLO-NAS-POSE pre-trained weights are subjected to the specific license terms and conditions detailed in 
https://github.com/Deci-AI/super-gradients/blob/master/LICENSE.YOLONAS-POSE.md
By downloading the pre-trained weight files you agree to comply with these terms.
Downloading: "https://sghub.deci.ai/models/yolo_nas_pose_s_coco_pose.pth" to /root/.cache/torch/hub/checkpoints/yolo_nas_pose_s_coco_pose.pth
100%|██████████| 85.3M/85.3M [00:06<00:00, 13.7MB/s]
[2024-06-13 16:53:09] INFO - checkpoint_utils.py - Successfully loaded pretrained weights for architecture yolo_nas_pose_s


##Step 8: Setup the DenseNet model through keras

In [6]:
from IPython.display import clear_output

'''DenseNet and DenseNet-FCN models for Keras.

DenseNet is a network architecture where each layer is directly connected
to every other layer in a feed-forward fashion (within each dense block).
For each layer, the feature maps of all preceding layers are treated as
separate inputs whereas its own feature maps are passed on as inputs to
all subsequent layers. This connectivity pattern yields state-of-the-art
accuracies on CIFAR10/100 (with or without data augmentation) and SVHN.
On the large scale ILSVRC 2012 (ImageNet) dataset, DenseNet achieves a
similar accuracy as ResNet, but using less than half the amount of
parameters and roughly half the number of FLOPs.
'''

def DenseNet3D(input_shape=None,
               depth=40,
               nb_dense_block=3,
               growth_rate=12,
               nb_filter=-1,
               nb_layers_per_block=1,
               bottleneck=False,
               reduction=0.0,
               dropout_rate=0.0,
               weight_decay=1e-4,
               subsample_initial_block=False,
               include_top=True,
               input_tensor=None,
               pooling=None,
               classes=2,
               activation='softmax',
               transition_pooling='avg'):

    '''Instantiate the DenseNet architecture.

    The model and the weights are compatible with both
    TensorFlow and Theano. The dimension ordering
    convention used by the model is the one
    specified in your Keras config file.

    # Arguments
        input_shape: optional shape tuple, only to be specified
            if `include_top` is False (otherwise the input shape
            has to be `(224, 224, 224, 3)` (with `channels_last` dim ordering)
            or `(3, 224, 224, 224)` (with `channels_first` dim ordering).
            It should have exactly 4 inputs channels,
            and width and height should be no smaller than 8.
            E.g. `(224, 224, 224, 3)` would be one valid value.
        depth: number or layers in the DenseNet
        nb_dense_block: number of dense blocks to add to end
        growth_rate: number of filters to add per dense block
        nb_filter: initial number of filters. -1 indicates initial
            number of filters will default to 2 * growth_rate
        nb_layers_per_block: number of layers in each dense block.
            Can be a -1, positive integer or a list.
            If -1, calculates nb_layer_per_block from the network depth.
            If positive integer, a set number of layers per dense block.
            If list, nb_layer is used as provided. Note that list size must
            be nb_dense_block
        bottleneck: flag to add bottleneck blocks in between dense blocks
        reduction: reduction factor of transition blocks.
            Note : reduction value is inverted to compute compression.
        dropout_rate: dropout rate
        weight_decay: weight decay rate
        subsample_initial_block: Changes model type to suit different datasets.
            Should be set to True for ImageNet, and False for CIFAR datasets.
            When set to True, the initial convolution will be strided and
            adds a MaxPooling3D before the initial dense block.
        include_top: whether to include the fully-connected
            layer at the top of the network.
        input_tensor: optional Keras tensor (i.e. output of `layers.Input()`)
            to use as image input for the model.
        pooling: Optional pooling mode for feature extraction
            when `include_top` is `False`.
            - `None` means that the output of the model
                will be the 4D tensor output of the
                last convolutional layer.
            - `avg` means that global average pooling
                will be applied to the output of the
                last convolutional layer, and thus
                the output of the model will be a
                2D tensor.
            - `max` means that global max pooling will
                be applied.
        classes: optional number of classes to classify images
            into, only to be specified if `include_top` is True.
        activation: Type of activation at the top layer. Can be one of
            'softmax' or 'sigmoid'. Note that if sigmoid is used,
             classes must be 1.
        transition_pooling: `avg` for avg pooling (default), `max` for max pooling,
            None for no pooling during scale transition blocks. Please note that this
            default differs from the DenseNetFCN paper in accordance with the DenseNet
            paper.

    # Returns
        A Keras model instance.

    # Raises
        ValueError: in case of invalid input shape.
    '''

    if activation not in ['softmax', 'sigmoid']:
        raise ValueError('activation must be one of "softmax" or "sigmoid"')

    if activation == 'sigmoid' and classes != 1:
        raise ValueError('sigmoid activation can only be used when classes = 1')

    if input_tensor is None:
        img_input = Input(shape=input_shape)
    else:
        if not K.is_keras_tensor(input_tensor):
            img_input = Input(tensor=input_tensor, shape=input_shape)
        else:
            img_input = input_tensor

    x = __create_dense_net(classes, img_input, include_top, depth, nb_dense_block,
                           growth_rate, nb_filter, nb_layers_per_block, bottleneck,
                           reduction, dropout_rate, weight_decay,
                           subsample_initial_block, pooling, activation,
                           transition_pooling)

    # Ensure that the model takes into account
    # any potential predecessors of `input_tensor`.
    if input_tensor is not None:
        inputs = get_source_inputs(input_tensor)
    else:
        inputs = img_input
    # Create model.
    model = Model(inputs, x, name='densenet')

    return model

def DenseNet3D_121(input_shape=None,
                          bottleneck=True,
                          reduction=0.5,
                          dropout_rate=0.0,
                          weight_decay=1e-4,
                          include_top=False,
                          input_tensor=None,
                          pooling=None,
                          classes=2,
                          activation='softmax'):
    return DenseNet3D(input_shape, depth=121, nb_dense_block=3, growth_rate=24,
                      nb_filter=64, nb_layers_per_block=[4,4,4],
                      bottleneck=bottleneck, reduction=reduction,
                      dropout_rate=dropout_rate, weight_decay=weight_decay,
                      subsample_initial_block=True, include_top=include_top,
                      input_tensor=input_tensor,
                      pooling=pooling, classes=classes, activation=activation)
#nb filter was 64
#nb layers per block was 6,12,24,16
#nb dense blocks was 4
def name_or_none(prefix, name):
    return prefix + name if (prefix is not None and name is not None) else None

def __conv_block(ip, nb_filter, bottleneck=False, dropout_rate=None,
                 weight_decay=1e-4, block_prefix=None):
    '''
    Adds a convolution layer (with batch normalization and relu),
    and optionally a bottleneck layer.

    # Arguments
        ip: Input tensor
        nb_filter: integer, the dimensionality of the output space
            (i.e. the number output of filters in the convolution)
        bottleneck: if True, adds a bottleneck convolution block
        dropout_rate: dropout rate
        weight_decay: weight decay factor
        block_prefix: str, for unique layer naming

     # Input shape
        4D tensor with shape:
        `(samples, channels, rows, cols)` if data_format='channels_first'
        or 4D tensor with shape:
        `(samples, rows, cols, channels)` if data_format='channels_last'.

    # Output shape
        4D tensor with shape:
        `(samples, filters, new_rows, new_cols)` if data_format='channels_first'
        or 4D tensor with shape:
        `(samples, new_rows, new_cols, filters)` if data_format='channels_last'.
        `rows` and `cols` values might have changed due to stride.

    # Returns
        output tensor of block
    '''
    with K.name_scope('ConvBlock'):
        concat_axis = 1 if K.image_data_format() == 'channels_first' else -1

        x = BatchNormalization(axis=concat_axis, epsilon=1.1e-5,
                               name=name_or_none(block_prefix, '_bn'))(ip)
        x = Activation('relu')(x)

        if bottleneck:
            inter_channel = nb_filter * 4

            x = Conv3D(inter_channel, (1, 1, 1), kernel_initializer='he_normal',
                       padding='same', use_bias=False,
                       kernel_regularizer=l2(weight_decay),
                       name=name_or_none(block_prefix, '_bottleneck_Conv3D'))(x)
            x = BatchNormalization(axis=concat_axis, epsilon=1.1e-5,
                                   name=name_or_none(block_prefix, '_bottleneck_bn'))(x)
            x = Activation('relu')(x)

        x = Conv3D(nb_filter, (3, 3, 3), kernel_initializer='he_normal', padding='same',
                   use_bias=False, name=name_or_none(block_prefix, '_Conv3D'))(x)
        if dropout_rate:
            x = Dropout(dropout_rate)(x)

    return x


def __dense_block(x, nb_layers, nb_filter, growth_rate, bottleneck=False,
                  dropout_rate=None, weight_decay=1e-4, grow_nb_filters=True,
                  return_concat_list=False, block_prefix=None):
    '''
    Build a dense_block where the output of each conv_block is fed
    to subsequent ones

    # Arguments
        x: input keras tensor
        nb_layers: the number of conv_blocks to append to the model
        nb_filter: integer, the dimensionality of the output space
            (i.e. the number output of filters in the convolution)
        growth_rate: growth rate of the dense block
        bottleneck: if True, adds a bottleneck convolution block to
            each conv_block
        dropout_rate: dropout rate
        weight_decay: weight decay factor
        grow_nb_filters: if True, allows number of filters to grow
        return_concat_list: set to True to return the list of
            feature maps along with the actual output
        block_prefix: str, for block unique naming

    # Return
        If return_concat_list is True, returns a list of the output
        keras tensor, the number of filters and a list of all the
        dense blocks added to the keras tensor

        If return_concat_list is False, returns a list of the output
        keras tensor and the number of filters
    '''
    with K.name_scope('DenseBlock'):
        concat_axis = 1 if K.image_data_format() == 'channels_first' else -1
        x_list = [x]
        for i in range(nb_layers):
            cb = __conv_block(x, growth_rate, bottleneck, dropout_rate, weight_decay,
                              block_prefix=name_or_none(block_prefix, '_%i' % i))
            x_list.append(cb)
            x = concatenate([x, cb], axis=concat_axis)
            if grow_nb_filters:
                nb_filter += growth_rate
        if return_concat_list:
            return x, nb_filter, x_list
        else:
            return x, nb_filter


def __transition_block(ip, nb_filter, compression=1.0, weight_decay=1e-4,
                       block_prefix=None, transition_pooling='max'):
    '''
    Adds a pointwise convolution layer (with batch normalization and relu),
    and an average pooling layer. The number of output convolution filters
    can be reduced by appropriately reducing the compression parameter.

    # Arguments
        ip: input keras tensor
        nb_filter: integer, the dimensionality of the output space
            (i.e. the number output of filters in the convolution)
        compression: calculated as 1 - reduction. Reduces the number
            of feature maps in the transition block.
        weight_decay: weight decay factor
        block_prefix: str, for block unique naming

    # Input shape
        4D tensor with shape:
        `(samples, channels, rows, cols)` if data_format='channels_first'
        or 4D tensor with shape:
        `(samples, rows, cols, channels)` if data_format='channels_last'.

    # Output shape
        4D tensor with shape:
        `(samples, nb_filter * compression, rows / 2, cols / 2)`
        if data_format='channels_first'
        or 4D tensor with shape:
        `(samples, rows / 2, cols / 2, nb_filter * compression)`
        if data_format='channels_last'.

    # Returns
        a keras tensor
    '''
    with K.name_scope('Transition'):
        concat_axis = 1 if K.image_data_format() == 'channels_first' else -1

        x = BatchNormalization(axis=concat_axis, epsilon=1.1e-5,
                               name=name_or_none(block_prefix, '_bn'))(ip)
        x = Activation('relu')(x)
        x = Conv3D(int(nb_filter * compression), (1, 1, 1), kernel_initializer='he_normal',
                   padding='same', use_bias=False, kernel_regularizer=l2(weight_decay),
                   name=name_or_none(block_prefix, '_Conv3D'))(x)
        if transition_pooling == 'avg':
            x = AveragePooling3D((2, 2, 2), strides=(2, 2, 2))(x)
        elif transition_pooling == 'max':
            x = MaxPooling3D((2, 2, 2), strides=(2, 2, 2))(x)

        return x


def __transition_up_block(ip, nb_filters, type='deconv', weight_decay=1E-4,
                          block_prefix=None):
    '''Adds an upsampling block. Upsampling operation relies on the the type parameter.

    # Arguments
        ip: input keras tensor
        nb_filters: integer, the dimensionality of the output space
            (i.e. the number output of filters in the convolution)
        type: can be 'upsampling', 'subpixel', 'deconv'. Determines
            type of upsampling performed
        weight_decay: weight decay factor
        block_prefix: str, for block unique naming

    # Input shape
        4D tensor with shape:
        `(samples, channels, rows, cols)` if data_format='channels_first'
        or 4D tensor with shape:
        `(samples, rows, cols, channels)` if data_format='channels_last'.

    # Output shape
        4D tensor with shape:
        `(samples, nb_filter, rows * 2, cols * 2)` if data_format='channels_first'
        or 4D tensor with shape:
        `(samples, rows * 2, cols * 2, nb_filter)` if data_format='channels_last'.

    # Returns
        a keras tensor
    '''
    with K.name_scope('TransitionUp'):

        if type == 'upsampling':
            x = UpSampling3D(name=name_or_none(block_prefix, '_upsampling'))(ip)
        elif type == 'subpixel':
            x = Conv3D(nb_filters, (3, 3, 3), activation='relu', padding='same',
                       kernel_regularizer=l2(weight_decay), use_bias=False,
                       kernel_initializer='he_normal',
                       name=name_or_none(block_prefix, '_Conv3D'))(ip)
            x = SubPixelUpscaling(scale_factor=2,
                                  name=name_or_none(block_prefix, '_subpixel'))(x)
            x = Conv3D(nb_filters, (3, 3, 3), activation='relu', padding='same',
                       kernel_regularizer=l2(weight_decay), use_bias=False,
                       kernel_initializer='he_normal',
                       name=name_or_none(block_prefix, '_Conv3D'))(x)
        else:
            x = Conv3DTranspose(nb_filters, (3, 3, 3), activation='relu', padding='same',
                                strides=(2, 2, 2), kernel_initializer='he_normal',
                                kernel_regularizer=l2(weight_decay),
                                name=name_or_none(block_prefix, '_Conv3DT'))(ip)
        return x


def __create_dense_net(nb_classes, img_input, include_top, depth=40, nb_dense_block=3,
                       growth_rate=12, nb_filter=-1, nb_layers_per_block=-1,
                       bottleneck=False, reduction=0.0, dropout_rate=None,
                       weight_decay=1e-4, subsample_initial_block=False, pooling=None,
                       activation='softmax', transition_pooling='avg'):
    ''' Build the DenseNet model

    # Arguments
        nb_classes: number of classes
        img_input: tuple of shape (channels, rows, columns) or (rows, columns, channels)
        include_top: flag to include the final Dense layer
        depth: number or layers
        nb_dense_block: number of dense blocks to add to end (generally = 3)
        growth_rate: number of filters to add per dense block
        nb_filter: initial number of filters. Default -1 indicates initial number
            of filters is 2 * growth_rate
        nb_layers_per_block: number of layers in each dense block.
                Can be a -1, positive integer or a list.
                If -1, calculates nb_layer_per_block from the depth of the network.
                If positive integer, a set number of layers per dense block.
                If list, nb_layer is used as provided. Note that list size must
                be (nb_dense_block + 1)
        bottleneck: add bottleneck blocks
        reduction: reduction factor of transition blocks. Note : reduction value is
            inverted to compute compression
        dropout_rate: dropout rate
        weight_decay: weight decay rate
        subsample_initial_block: Changes model type to suit different datasets.
            Should be set to True for ImageNet, and False for CIFAR datasets.
            When set to True, the initial convolution will be strided and
            adds a MaxPooling3D before the initial dense block.
        pooling: Optional pooling mode for feature extraction
            when `include_top` is `False`.
            - `None` means that the output of the model
                will be the 4D tensor output of the
                last convolutional layer.
            - `avg` means that global average pooling
                will be applied to the output of the
                last convolutional layer, and thus
                the output of the model will be a
                2D tensor.
            - `max` means that global max pooling will
                be applied.
        activation: Type of activation at the top layer. Can be one of 'softmax' or
            'sigmoid'. Note that if sigmoid is used, classes must be 1.
        transition_pooling: `avg` for avg pooling (default), `max` for max pooling,
            None for no pooling during scale transition blocks. Please note that this
            default differs from the DenseNetFCN paper in accordance with the DenseNet
            paper.

    # Returns
        a keras tensor

    # Raises
        ValueError: in case of invalid argument for `reduction`
            or `nb_dense_block`
    '''
    with K.name_scope('DenseNet'):
        concat_axis = 1 if K.image_data_format() == 'channels_first' else -1

        if reduction != 0.0:
            if not (reduction <= 1.0 and reduction > 0.0):
                raise ValueError('`reduction` value must lie between 0.0 and 1.0')

        # layers in each dense block
        if type(nb_layers_per_block) is list or type(nb_layers_per_block) is tuple:
            nb_layers = list(nb_layers_per_block)  # Convert tuple to list

            if len(nb_layers) != nb_dense_block:
                raise ValueError('If `nb_dense_block` is a list, its length must match '
                                 'the number of layers provided by `nb_layers`.')

            final_nb_layer = nb_layers[-1]
            nb_layers = nb_layers[:-1]
        else:
            if nb_layers_per_block == -1:
                assert (depth - 4) % 3 == 0, ('Depth must be 3 N + 4 '
                                              'if nb_layers_per_block == -1')
                count = int((depth - 4) / 3)

                if bottleneck:
                    count = count // 2

                nb_layers = [count for _ in range(nb_dense_block)]
                final_nb_layer = count
            else:
                final_nb_layer = nb_layers_per_block
                nb_layers = [nb_layers_per_block] * nb_dense_block

        # compute initial nb_filter if -1, else accept users initial nb_filter
        if nb_filter <= 0:
            nb_filter = 2 * growth_rate

        # compute compression factor
        compression = 1.0 - reduction

        # Initial convolution
        if subsample_initial_block:
            initial_kernel = (7, 7, 7)
            initial_strides = (2, 2, 2)
        else:
            initial_kernel = (3, 3, 3)
            initial_strides = (1, 1, 1)

        x = Conv3D(nb_filter, initial_kernel, kernel_initializer='he_normal',
                   padding='same', name='initial_Conv3D', strides=initial_strides,
                   use_bias=False, kernel_regularizer=l2(weight_decay))(img_input)

        if subsample_initial_block:
            x = BatchNormalization(axis=concat_axis, epsilon=1.1e-5,
                                   name='initial_bn')(x)
            x = Activation('relu')(x)
            x = MaxPooling3D((3, 3, 3), strides=(2, 2, 2), padding='same')(x)

        # Add dense blocks
        for block_idx in range(nb_dense_block - 1):
            x, nb_filter = __dense_block(x, nb_layers[block_idx], nb_filter,
                                         growth_rate, bottleneck=bottleneck,
                                         dropout_rate=dropout_rate,
                                         weight_decay=weight_decay,
                                         block_prefix='dense_%i' % block_idx)
            # add transition_block
            x = __transition_block(x, nb_filter, compression=compression,
                                   weight_decay=weight_decay,
                                   block_prefix='tr_%i' % block_idx,
                                   transition_pooling=transition_pooling)
            nb_filter = int(nb_filter * compression)
        # The last dense_block does not have a transition_block
        #x = Flatten()(x)
        x, nb_filter = __dense_block(x, final_nb_layer, nb_filter, growth_rate,
                                     bottleneck=bottleneck, dropout_rate=dropout_rate,
                                     weight_decay=weight_decay,
                                     block_prefix='dense_%i' % (nb_dense_block - 1))
        x = BatchNormalization(axis=concat_axis, epsilon=1.1e-5, name='final_bn')(x)
        x = Activation('softmax')(x)
        x = AveragePooling3D((7, 7, 4), strides=(1,1,1),padding='same')(x)
        '''
        if include_top:
            if pooling == 'avg':
                #x = GlobalAveragePooling3D(padding='same')(x)
                x = AveragePooling3D((7, 7, 4), strides=(1,1,1), name=pool_name_base,padding='same')(x)
            elif pooling == 'max':
                #x = GlobalMaxPooling3D(padding='same')(x)
                x = MaxPooling3D((7, 7, 4), strides=(1,1,1), name=pool_name_base,padding='same')(x)
            x = Dense(nb_classes, activation=activation)(x)
        else:
            if pooling == 'avg':
                #x = GlobalAveragePooling3D(padding='same')(x)
                x = AveragePooling3D((7, 7, 4), strides=(1,1,1), name=pool_name_base,padding='same')(x)
            elif pooling == 'max':
                #x = GlobalMaxPooling3D(padding='same')(x)
                x = MaxPooling3D((7, 7, 4), strides=(1,1,1), name=pool_name_base,padding='same')(x)
        '''
        x = Flatten()(x)
        x = Dense(nb_classes, activation='softmax')(x)
        return x

## Step 9: Compile the DenseNet model
* 100x100x16x3 input (16 consecutive 100x100 images of pedestrians)
* Evaluation metrics for accuracy, precision, recall, AUC and F1 score.

In [7]:
model = DenseNet3D_121((100, 100, 16, 3))

model.compile(
    loss=keras.losses.categorical_crossentropy,
    optimizer = keras.optimizers.SGD(lr=1e-4),
    metrics=['accuracy', keras.metrics.Precision(name='precision'), keras.metrics.Recall(name='recall'), keras.metrics.AUC(name='auc'), keras.metrics.F1Score()]
)

print(model.summary())



## Step 10: Define a function parse_video that returns 100x100x16x3 images for each pedestrian in each frame of a video from the JAAD dataset

The input to the function is the ID of the video (eg. 'video_0001')

In [8]:
jaad = JAAD(data_path='/content/JAAD')

def parse_video(video_id):
    # Load the JAAD annotations from the JSON file
    annotations = jaad._get_annotations(video_id)
    annotations = annotations["ped_annotations"]
    attributes = jaad._get_ped_attributes(video_id)

    # Initialize arrays
    rolling_data = {}
    video_bboxes = []
    video_labels = []

    # Capture the video frame by frame (instead of seperating the video into frames, which would take too much space and time)
    video = cv2.VideoCapture(f"/content/JAAD_clips/{video_id}.mp4")

    # Loop over each pedestrian in the video
    for key in annotations.keys():
        if(annotations[key]["behavior"]=={}):
            continue
        # if(attributes[key]['group_size']<=3):
        #       continue
        labels = []
        bboxes = annotations[key]["bbox"]
        frames = annotations[key]["frames"]
        cropped_imgs = []

        # Start capturing from the first frame of the detected pedestrian and loop over every frame
        video.set(cv2.CAP_PROP_POS_FRAMES, frames[0]-1)
        for i, frame_id in enumerate(frames):
            ret, frame = video.read()

            # Highlight lane segmentation and drivable area onto the frame
            frame = YOLOPdetect(frame)

            # Extract BBox of pedestrian
            bbox = np.array(bboxes[i])
            bbox = bbox.astype('int')
            # Expand BBox
            x1 = bbox[0] - 10 if bbox[0]  - 10 > 0 else 0
            y1 = bbox[1] - 10 if bbox[1]  - 10 > 0 else 0
            x2 = bbox[2] + 10 if bbox[2]  + 10 < frame.shape[1] else frame.shape[1]
            y2 = bbox[3] + 10 if bbox[3]  + 10 < frame.shape[0] else frame.shape[0]
            bbox = [x1, y1, x2, y2]
            # Crop frame into the bounding box
            cropped_img = frame[bbox[1]:bbox[3], bbox[0]:bbox[2]]

            # Detect pose keypoints and construct skeletal graph
            media_predictions = model_pose.predict(cropped_img, conf=0.2, skip_image_resizing=True)
            poses= media_predictions.prediction.poses
            conf = media_predictions.prediction.scores
            edge_links= media_predictions.prediction.edge_links
            edge_colors=media_predictions.prediction.edge_colors
            keypoint_colors=media_predictions.prediction.keypoint_colors
            clear_output()
            for i in range(len(poses)):
                cropped_img = draw_skeleton(cropped_img,poses[i],conf[i],edge_links,edge_colors,2,keypoint_colors,4,0.0,False)

            # Resize cropped image to 100x100
            cropped_img = cv2.resize(cropped_img, (100,100))

            # Place cropped image onto rolling data
            if key not in list(rolling_data.keys()):
                rolling_data[key] = [np.asarray(cropped_img)]
            elif len(rolling_data[key]) < 16: # bboxes values for 16 frames
               rolling_data[key].append(np.asarray(cropped_img)) # append the image
            else:
               del rolling_data[key][0] # delete oldest frame bbox and append latest frame bbox
               rolling_data[key].append(np.asarray(cropped_img))

            # Extract 16 consecutive 100x100 images for the pedestrian from the rolling data
            if len(rolling_data[key]) == 16:
              seq = np.stack(np.array(rolling_data[key]),axis=2) # (100*100*16*3)
              cropped_imgs = cropped_imgs+[seq] # classification output
            else:
              seq = np.stack(np.array([rolling_data[key][-1]] * 16),axis=2)
              cropped_imgs = cropped_imgs+[seq]

            # Extract labels
            labels = labels + [annotations[key]["behavior"]["cross"][i]]

        # Append cropped images and labels for each pedestrian
        video_bboxes = video_bboxes + cropped_imgs
        video_labels = video_labels + labels

    print(f"parsed {video_id}", flush=True)

    return video_bboxes, video_labels

## Run to train the model

In [9]:
dataset_bboxes = []
dataset_labels = []

# Setup range for videos to process (depending on available RAM)
for i in range (1,301):

    video_id = f"video_000{i}" if i < 10 else (f"video_00{i}" if i < 100 else f"video_0{i}")
    video_bboxes, video_labels = parse_video(video_id)
    dataset_bboxes = dataset_bboxes + video_bboxes
    dataset_labels = dataset_labels + video_labels

X_train, X_val, y_train, y_val = train_test_split(np.array(dataset_bboxes), np.array(dataset_labels), test_size=0.2, random_state=42)

y_train=tf.one_hot(y_train,depth=2)
y_val=tf.one_hot(y_val,depth=2)

# Train the model
history = model.fit(X_train, y_train, batch_size=32, epochs=10, validation_data=(X_val, y_val))

# Save the model architecture to JSON file
model_json = model.to_json()
with open("densenet_model.json", "w") as json_file:
    json_file.write(model_json)

# Save the model weights to HDF5 file
model.save_weights("densenet_weights.h5")

# # Save the model architecture to JSON file
# model_json = model.to_json()
# with open("/content/drive/My Drive/densenet_model.json", "w") as json_file:
#     json_file.write(model_json)

# # Save the model weights to HDF5 file
# model.save_weights("/content/drive/My Drive/densenet_weights.h5")

[2024-06-13 17:10:23] INFO - processing.py - Skipping processing `KeypointsLongestMaxSizeRescale` because it resizes the image.
[2024-06-13 17:10:23] INFO - processing.py - Skipping processing `KeypointsBottomRightPadding` because it resizes the image.
[2024-06-13 17:10:23] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


KeyboardInterrupt: 

## Run to evaluate the model

In [None]:
dataset_bboxes = []
dataset_labels = []

# Setup range for videos to process (depending on available RAM)
for i in range (301,347):

    video_id = f"video_000{i}" if i < 10 else (f"video_00{i}" if i < 100 else f"video_0{i}")
    video_bboxes, video_labels = parse_video(video_id)
    dataset_bboxes = dataset_bboxes + video_bboxes
    dataset_labels = dataset_labels + video_labels

# Open pre-trained model
with open('/content/drive/My Drive/densenet_model_8.json', 'r') as json_file:
    json_savedModel= json_file.read()

model = tf.keras.models.model_from_json(json_savedModel)
model.load_weights('/content/drive/My Drive/densenet_weights_8.h5')

model.compile(loss=keras.losses.categorical_crossentropy,  optimizer = keras.optimizers.SGD(lr=1e-4), metrics=['accuracy'])
print(model.summary())

# Evaluate the model
y_test = tf.one_hot(np.array(dataset_labels),depth=2)
results = model.evaluate(np.array(dataset_bboxes), y_test, verbose=1)
print("Test Loss: {}, Test Accuracy: {}, Test Precision: {}, Test Recall: {}, Test AUC: {}".format(results[0], results[1], results[2], results[3], results[4]))
print(results)

## Run if precision and recall values are incorrect (sometimes they turn out to be exactly equal to the accuracy value)

In [None]:
# Predict probabilities for test set
y_probs = []
for bbox in np.array(dataset_bboxes):
  bbox = np.expand_dims(bbox, axis=0)
  y_probs = y_probs + [model.predict(bbox)]  # these are probabilities of the positive class
y_pred = (np.array(y_probs) > 0.5).astype('int32')  # convert probabilities to binary predictions
y_pred = [x[0][1] for x in y_pred]

# Calculate F1 Score
f1 = f1_score(dataset_labels, y_pred)
print("F1 Score:", f1)

from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score

cm = confusion_matrix(dataset_labels, y_pred)
print("Confusion Matrix:\n", cm)

accuracy = accuracy_score(dataset_labels, y_pred)
precision = precision_score(dataset_labels, y_pred, zero_division=1)
recall = recall_score(dataset_labels, y_pred, zero_division=1)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)

## Run to save metrics in a text file

In [None]:
# Open a file for writing ('w' mode), creates the file if it does not exist
with open('/content/drive/My Drive/model accuracy.txt', 'w') as file:
    file.write("Test Loss: {}, Test Accuracy: {}, Test Precision: {}, Test Recall: {}, Test AUC: {}".format(results[0], results[1], precision, recall, results[4]))
    file.write("\nF1 Score: {}".format(f1))