In [None]:
import tensorflow as tf

import os
import tensorflow_datasets as tfds

In [None]:
resolver = tf.distribute.cluster_resolver.TPUClusterResolver()
tf.config.experimental_connect_to_cluster(resolver)
# This is the TPU initialization code that has to be at the beginning.
tf.tpu.experimental.initialize_tpu_system(resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))

All devices:  [LogicalDevice(name='/device:TPU:0', device_type='TPU'), LogicalDevice(name='/device:TPU:1', device_type='TPU'), LogicalDevice(name='/device:TPU:2', device_type='TPU'), LogicalDevice(name='/device:TPU:3', device_type='TPU'), LogicalDevice(name='/device:TPU:4', device_type='TPU'), LogicalDevice(name='/device:TPU:5', device_type='TPU'), LogicalDevice(name='/device:TPU:6', device_type='TPU'), LogicalDevice(name='/device:TPU:7', device_type='TPU')]


In [None]:
strategy = tf.distribute.TPUStrategy(resolver)

In [None]:
from google.colab  import drive
drive.mount("/gdrive")
%cd /gdrive

Mounted at /gdrive
/gdrive


In [None]:
directory_path='/gdrive/My Drive/Yedek/Video_Dataset_hsv'

In [None]:
subset_paths={'train':directory_path+'/train','test':directory_path+'/test'}

In [None]:
!pip install hickle
import hickle as hkl
import random
from pathlib import Path
import os
import cv2
import matplotlib.pyplot as plt
from random import randint
import numpy as np
import matplotlib.pyplot as plt

Collecting hickle
  Downloading hickle-5.0.3-py3-none-any.whl (107 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m108.0/108.0 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: hickle
Successfully installed hickle-5.0.3


In [None]:
def frames_from_video_file(video_path):
  result = []
  src = cv2.VideoCapture(str(video_path))

  for _ in range(0,20,2):
    ret, frame = src.read()
    frame = tf.image.convert_image_dtype(frame, tf.float32)
    tf.image.resize(frame, [456,456])
    result.append(frame)
  src.release()
  result = np.array(result)[..., [2, 1, 0]]
  return result

In [None]:
class FrameGenerator:
  def __init__(self, path, training = False):

    self.path = path
    self.training = training
    self.class_names = sorted(set(p.name for p in self.path.iterdir() if p.is_dir()))
    self.class_ids_for_name = dict((name, idx) for idx, name in enumerate(self.class_names))

  def get_files_and_class_names(self):
    video_paths = list(self.path.glob('*/*.avi'))
    classes = [p.parent.name for p in video_paths]
    return video_paths, classes

  def __call__(self):
    video_paths, classes = self.get_files_and_class_names()

    pairs = list(zip(video_paths, classes))

    if self.training:
      random.shuffle(pairs)

    for path, name in pairs:
      video_frames = frames_from_video_file(path)
      label = self.class_ids_for_name[name] # Encode labels
      yield video_frames, label

In [None]:
train_path = Path(subset_paths['train'])
test_path = Path(subset_paths['test'])

In [None]:
fg = FrameGenerator(train_path, training=True)
print(fg.class_ids_for_name)

{'healthy': 0, 'sick': 1}


In [None]:
output_signature = (tf.TensorSpec(shape = (None, None, None, 3), dtype = tf.float32),
                    tf.TensorSpec(shape = (), dtype = tf.float32))

In [None]:
train_ds = tf.data.Dataset.from_generator(FrameGenerator(train_path, training=True),
                                          output_signature = output_signature)
test_ds = tf.data.Dataset.from_generator(FrameGenerator(test_path, training=False),
                                          output_signature = output_signature)

In [None]:
AUTOTUNE = tf.data.AUTOTUNE
train_ds = train_ds.cache().shuffle(1000).prefetch(buffer_size = AUTOTUNE)

In [None]:
train_ds = train_ds.batch(16)
test_ds = test_ds.batch(16)

train_frames, train_labels = next(iter(train_ds))
print(f'Shape of training set of frames: {train_frames.shape}')
print(f'Shape of training labels: {train_labels.shape}')

test_frames, test_labels = next(iter(test_ds))
print(f'Shape of validation set of frames: {test_frames.shape}')
print(f'Shape of validation labels: {test_labels.shape}')

Shape of training set of frames: (16, 10, 480, 640, 3)
Shape of training labels: (16,)
Shape of validation set of frames: (16, 10, 480, 640, 3)
Shape of validation labels: (16,)


In [None]:
def build_efficient():
    inputs = tf.keras.layers.Input(shape=(456, 456, 3))
    model = tf.keras.applications.EfficientNetB5(include_top=False, input_tensor=inputs, weights="imagenet")

    # Freeze the pretrained weights
    model.trainable = False

    # Rebuild top
    model = tf.keras.Sequential([model,tf.keras.layers.GlobalAveragePooling2D(),tf.keras.layers.BatchNormalization()])

    return model

In [None]:
with strategy.scope():
  net=build_efficient()

  model = tf.keras.Sequential([
      tf.keras.layers.Rescaling(scale=255),
      tf.keras.layers.TimeDistributed(net),
      tf.keras.layers.LSTM(24),
      tf.keras.layers.Dropout(0.3),
      tf.keras.layers.Dense(8),
      tf.keras.layers.Dense(4),
      tf.keras.layers.Dense(1,'sigmoid')
  ])

  model.compile(optimizer = tf.keras.optimizers.AdamW(learning_rate=1e-3,  use_ema=True, ema_momentum = 0.8, weight_decay=0.1),
                loss='mse', metrics=tf.keras.metrics.BinaryAccuracy(name='binary_accuracy', dtype=None, threshold=0.5))

Downloading data from https://storage.googleapis.com/keras-applications/efficientnetb5_notop.h5


In [None]:
epochs=20

In [None]:
model.fit(train_ds, epochs = epochs)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.src.callbacks.History at 0x7b9b845ab2b0>

In [None]:
def get_actual_labels(dataset):
  actual = [labels for _, labels in dataset.unbatch()]
  actual = tf.stack(actual, axis=0)
  return actual

In [None]:
model.evaluate(test_ds ,return_dict=True)



{'loss': 0.11042124032974243, 'binary_accuracy': 0.8289473652839661}

In [None]:
import seaborn as sns

In [None]:
actual = get_actual_labels(test_ds)

In [None]:
y_pred=model.predict(test_ds)
y_pred=np.where(y_pred > 0.5, 1,0)

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay,classification_report

cm=confusion_matrix(actual, y_pred)
print(cm)
print(classification_report(actual, y_pred))

In [None]:
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['healthy','sick'])
sns.reset_orig()
disp.plot()