In [None]:
!pip install -q git+https://github.com/tensorflow/docs

  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for tensorflow-docs (setup.py) ... [?25l[?25hdone


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
cd  'drive/MyDrive/Colab Notebooks'

/content/drive/MyDrive/Colab Notebooks


In [None]:
#Import library
from tensorflow_docs.vis import embed
from tensorflow import keras
from imutils import paths

import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import numpy as np
import imageio
import cv2
import os

In [None]:
# Mendefenisikan hyperparameter
IMG_SIZE = 224 
BATCH_SIZE = 8
EPOCHS = 20

MAX_SEQ_LENGTH = 500 #Maksimal jumlah frame yang akan dipakai 
NUM_FEATURES = 2048

In [None]:
# Load dataset
train_df = pd.read_csv("/content/drive/MyDrive/jigsaw/Needle_Passing/train.csv")
test_df = pd.read_csv("/content/drive/MyDrive/jigsaw/Needle_Passing/test.csv")

print(f"Total videos for training: {len(train_df)}")
print(f"Total videos for testing: {len(test_df)}")

train_df.sample(10)

Total videos for training: 19
Total videos for testing: 8


Unnamed: 0,video_name,tag
0,Needle_Passing_B001_capture1.avi,N
18,Needle_Passing_I004_capture1.avi,N
2,Needle_Passing_B003_capture1.avi,N
8,Needle_Passing_D004_capture1.avi,E
10,Needle_Passing_E001_capture1.avi,E
7,Needle_Passing_D003_capture1.avi,E
14,Needle_Passing_F004_capture1.avi,I
4,Needle_Passing_C003_capture1.avi,I
17,Needle_Passing_I003_capture1.avi,N
9,Needle_Passing_D005_capture1.avi,E


In [None]:
def crop_center_square(frame):
    y, x = frame.shape[0:2]
    min_dim = min(y, x)
    start_x = (x // 2) - (min_dim // 2)
    start_y = (y // 2) - (min_dim // 2)
    return frame[start_y : start_y + min_dim, start_x : start_x + min_dim]


def load_video(path, max_frames=0, resize=(IMG_SIZE, IMG_SIZE)):
    cap = cv2.VideoCapture(path)
    frames = []
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = crop_center_square(frame)
            frame = cv2.resize(frame, resize)
            frame = frame[:, :, [2, 1, 0]]
            frames.append(frame)

            if len(frames) == max_frames:
                break
    finally:
        cap.release()
    return np.array(frames)


In [None]:
#Feature extraction bermekanisme transfer learning pakai CNN(Inception V3)
def build_feature_extractor():
    feature_extractor = keras.applications.InceptionV3(
        weights="imagenet",
        include_top=False,
        pooling="avg",
        input_shape=(IMG_SIZE, IMG_SIZE, 3),
    )
    preprocess_input = keras.applications.inception_v3.preprocess_input

    inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
    preprocessed = preprocess_input(inputs)

    outputs = feature_extractor(preprocessed)
    return keras.Model(inputs, outputs, name="feature_extractor")


feature_extractor = build_feature_extractor()

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/inception_v3/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
# Transformasi label tag(label_processor)
label_processor = keras.layers.StringLookup(
    num_oov_indices=0, vocabulary=np.unique(train_df["tag"])
)
print(label_processor.get_vocabulary())

['E', 'I', 'N']


In [None]:
def prepare_all_videos(df, root_dir):
    num_samples = len(df)
    video_paths = df["video_name"].values.tolist()
    labels = df["tag"].values
    labels = label_processor(labels[..., None]).numpy()

    # `frame_masks` dan `frame_features` sebagai input ke dalam sequence model.
    # Akan diisi 0 jika jumlah frame < Max_Seq_length
    # `frame_masks` akan berisi boolean yang akan menunjukkan apakah timestamp ditutup dengan padding atau tidak
    frame_masks = np.zeros(shape=(num_samples, MAX_SEQ_LENGTH), dtype="bool")
    frame_features = np.zeros(
        shape=(num_samples, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
    )

    # Untuk setiap video
    for idx, path in enumerate(video_paths):
        # Mengumpulkan semua frame dan menambahkan batch dimensionnya.
        frames = load_video(os.path.join(root_dir, path))
        frames = frames[None, ...]

        # Inisialisasi placeholder untuk menyimpan masks dan features video saat ini
        temp_frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
        temp_frame_features = np.zeros(
            shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
        )

        # Mengekstraksi feature dari video saat ini
        for i, batch in enumerate(frames):
            video_length = batch.shape[0]
            length = min(MAX_SEQ_LENGTH, video_length)
            for j in range(length):
                temp_frame_features[i, j, :] = feature_extractor.predict(
                    batch[None, j, :]
                )
            temp_frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked

        frame_features[idx,] = temp_frame_features.squeeze()
        frame_masks[idx,] = temp_frame_mask.squeeze()

    return (frame_features, frame_masks), labels


train_data, train_labels = prepare_all_videos(train_df, "/content/drive/MyDrive/jigsaw/Needle_Passing/train")
test_data, test_labels = prepare_all_videos(test_df, "/content/drive/MyDrive/jigsaw/Needle_Passing/test")

print(f"Frame features in train set: {train_data[0].shape}")
print(f"Frame masks in train set: {train_data[1].shape}")

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Frame features in train set: (19, 500, 2048)
Frame masks in train set: (19, 500)


In [None]:
# Utilitas ubtuk sequence model
def get_sequence_model():
    class_vocab = label_processor.get_vocabulary()

    frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
    mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")
    x = keras.layers.GRU(16, return_sequences=True)(
        frame_features_input, mask=mask_input
    )
    x = keras.layers.GRU(8)(x)
    x = keras.layers.Dropout(0.1)(x)
    x = keras.layers.Dense(8, activation="relu")(x)
    output = keras.layers.Dense(len(class_vocab), activation="softmax")(x)
    rnn_model = keras.Model([frame_features_input, mask_input], output)

    rnn_model.compile(
        loss="sparse_categorical_crossentropy", optimizer="sgd", metrics=["accuracy"]
    )
    return rnn_model


# Utilitas untuk menjalankan eksperimen
def run_experiment():
    filepath = "/content/drive/MyDrive/jigsaw/Knot_Tying/tmp/video_classifier"
    checkpoint = keras.callbacks.ModelCheckpoint(
        filepath, save_weights_only=True, save_best_only=True, verbose=1
    )

    seq_model = get_sequence_model()
    history = seq_model.fit(
        [train_data[0], train_data[1]],
        train_labels,
        validation_split=0.3,
        epochs=EPOCHS,
        callbacks=[checkpoint],
    )

    seq_model.load_weights(filepath)
    _, accuracy = seq_model.evaluate([test_data[0], test_data[1]], test_labels)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

    return history, seq_model


_, sequence_model = run_experiment()

Epoch 1/20
Epoch 1: val_loss improved from inf to 1.47593, saving model to /content/drive/MyDrive/jigsaw/Knot_Tying/tmp/video_classifier
Epoch 2/20
Epoch 2: val_loss did not improve from 1.47593
Epoch 3/20
Epoch 3: val_loss did not improve from 1.47593
Epoch 4/20
Epoch 4: val_loss did not improve from 1.47593
Epoch 5/20
Epoch 5: val_loss did not improve from 1.47593
Epoch 6/20
Epoch 6: val_loss did not improve from 1.47593
Epoch 7/20
Epoch 7: val_loss did not improve from 1.47593
Epoch 8/20
Epoch 8: val_loss did not improve from 1.47593
Epoch 9/20
Epoch 9: val_loss did not improve from 1.47593
Epoch 10/20
Epoch 10: val_loss did not improve from 1.47593
Epoch 11/20
Epoch 11: val_loss improved from 1.47593 to 1.46245, saving model to /content/drive/MyDrive/jigsaw/Knot_Tying/tmp/video_classifier
Epoch 12/20
Epoch 12: val_loss did not improve from 1.46245
Epoch 13/20
Epoch 13: val_loss did not improve from 1.46245
Epoch 14/20
Epoch 14: val_loss did not improve from 1.46245
Epoch 15/20
Epoc

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import GRU, Input, Dense, Dropout

# Menentukan parameter model
MAX_SEQ_LENGTH = 500
NUM_FEATURES = 2048
NUM_CLASSES = 3
HIDDEN_UNITS = 256
DROPOUT_RATE = 0.1

class_vocab = label_processor.get_vocabulary()

frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")

x = GRU(16, return_sequences=True)(
    frame_features_input, mask=mask_input
)
x = GRU(8)(x)
x = Dropout(DROPOUT_RATE)(x)
output = Dense(NUM_CLASSES, activation='softmax')(x)
model = keras.Model([frame_features_input, mask_input], output)

# Menentukan optimizer dan loss function
adam = optimizers.Adam(lr=0.00001)

model.compile(
    loss="sparse_categorical_crossentropy", optimizer=adam, metrics=["accuracy"]
)

# Train model
history = model.fit(
    [train_data[0], train_data[1]],
    train_labels,
    validation_split=0.3,
    epochs=EPOCHS
)

# Evaluasi model
test_loss, test_accuracy = model.evaluate([train_data[0], train_data[1]],train_labels)
print('Test accuracy:', test_accuracy)

NameError: ignored

In [14]:
def prepare_single_video(frames):
    frames = frames[None, ...]
    frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
    frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")

    for i, batch in enumerate(frames):
        video_length = batch.shape[0]
        length = min(MAX_SEQ_LENGTH, video_length)
        for j in range(length):
            frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
        frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked

    return frame_features, frame_mask


def sequence_prediction(path):
    class_vocab = label_processor.get_vocabulary()

    frames = load_video(os.path.join("/content/drive/MyDrive/jigsaw/Needle_Passing/test", path))
    frame_features, frame_mask = prepare_single_video(frames)
    probabilities = sequence_model.predict([frame_features, frame_mask])[0]

    for i in np.argsort(probabilities)[::-1]:
        print(f"  {class_vocab[i]}: {probabilities[i] * 100:5.2f}%")
    return frames


# Visualisasi
def to_gif(images):
    converted_images = images.astype(np.uint8)
    imageio.mimsave("animation.gif", converted_images, fps=25)
    return embed.embed_file("animation.gif")


test_video = np.random.choice(test_df["video_name"].values.tolist())
print(f"Test video path: {test_video}")
test_frames = sequence_prediction(test_video)
to_gif(test_frames[:MAX_SEQ_LENGTH])

Test video path: Needle_Passing_I002_capture1.avi
  N: 37.35%
  E: 31.33%
  I: 31.32%
