In [None]:
import pandas as pd
from PIL import Image
import os
import numpy as np
from sklearn.metrics import confusion_matrix, accuracy_score, recall_score, f1_score
# from concurrent.futures import ThreadPoolExecutor, as_completed

In [2]:
# first, preprocess data

df = pd.read_csv("./metadata/training_mixed_frames_labels.csv", header=0)

print(df.shape[0])

print(df.iloc[0])

class0Data = df[(df["label"] == 0) | (df["label"] == 7)]

class1Data = df[(df["label"] == 1) | (df["label"] == 2) | (df["label"] == 3) | (df["label"] == 4) | (df["label"] == 17)]

class0Data = class0Data.iloc[::2]
class1Data = class1Data.iloc[::2]

class0Data["label"] = 0
class1Data["label"] = 1

print(class0Data.shape[0])
print(class1Data.shape[0])

finalDf = pd.concat([class0Data, class1Data], ignore_index=True)


# exclude_indexes = class0Data.index.union(class1Data.index)
# df_filtered = df.drop(exclude_indexes)
# print(df_filtered.shape)


# for frame in range(df_filtered.shape[0]):
#     currentImageIndex = df_filtered.iloc[frame]["frame_id"]
#     imagePath = f"./unlabeled_set/{currentImageIndex:06d}_512_512.jpg"
#     if os.path.exists(imagePath):
#         os.remove(imagePath)
#         print(f"{imagePath} deleted.")

# # list all items in the folder
# children = os.listdir("./unlabeled_set")

# print(f"Number of items in: {len(children)}")


144603
frame_id    0
label       1
Name: 0, dtype: int64
27496
10796


In [None]:
images = np.zeros((finalDf.shape[0], 96, 96), dtype=np.float32)
print(images.shape)  # (100, 512, 512)

def returnProcessedImage(dfIndex, pictureNumber):
    currentImage = Image.open(f"dataset/{pictureNumber:06d}_512_512.jpg")
    # convert this to 96 x 96 and grayscale
    processedImage = currentImage.resize([96, 96], Image.Resampling.LANCZOS).convert("L")
    normalizedNpArrImage = np.array(processedImage, dtype=np.float32) / 255.0

    return (dfIndex, normalizedNpArrImage)

# you guys ever just spend a good amount of time  on something only to realize it's all useless? yeah that's what happened right below

# currentDfIndex = 0
# batchSize = 10
# while(currentDfIndex < finalDf.shape[0]):
#     with ThreadPoolExecutor(max_workers=10) as executor:
#             futures = []
#             for i in range(0, min(finalDf.shape[0], currentDfIndex + batchSize), 1):
#                  dfIndex = currentDfIndex + i
#                  futures.append(executor.submit(returnProcessedImage, dfIndex, finalDf.iloc[dfIndex]["frame_id"]))
#             for future in as_completed(futures):
#                  result = future.result()
#                  images[result[0]] = result[1]
#     currentDfIndex += batchSize
#     if(currentDfIndex % 2000 == 0):
#         print(f"Made it to index {currentDfIndex}!")

for i in range(0, finalDf.shape[0], 1):
    actualPicIndex = finalDf.iloc[i]["frame_id"]
    currentImage = Image.open(f"dataset/{actualPicIndex:06d}_512_512.jpg")
    # convert this to 96 x 96 and grayscale
    processedImage = currentImage.resize([96, 96], Image.Resampling.LANCZOS).convert("L")
    normalizedNpArrImage = np.array(processedImage, dtype=np.float32) / 255.0

    images[i] = normalizedNpArrImage

    if(i % 2000 == 0):
        print(f"Made it to index {i}!")

(38292, 96, 96)
Made it to index 2000!
Made it to index 4000!


In [None]:
labels = finalDf["label"].to_numpy()

# now shuffle them when passing in
randomIndexOrder = np.random.permutation(labels.shape[0])

images = images[randomIndexOrder]
labels = images[randomIndexOrder]

# also must add an extra dimension
images = images[..., np.newaxis]

# now make a training and test set
testPercentage = 0.85
cutoffPoint = int(testPercentage * labels.shape[0])

trainingXSet = images[0:cutoffPoint]
trainingYSet = labels[0:cutoffPoint]

testingXSet = images[cutoffPoint:]
testingYSet = labels[cutoffPoint:]

In [None]:
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dense, GlobalAveragePooling2D
from tensorflow.keras.models import Sequential
import tensorflow as tf

In [None]:
model = Sequential([
    # 96 x 96
    Conv2D(16, (3, 3), activation='relu', padding="same", input_shape=(96, 96, 1)),
    # becomes 48 x 48 after
    MaxPooling2D((2, 2)),

    # 
    Conv2D(32, (2, 2), padding="same", activation='relu'),
    GlobalAveragePooling2D(),

    Dense(32, activation='relu'),
    Dense(1, activation='sigmoid')  # for binary classification
])

# tensorflow is werid, so apparently you gotta compile it first? interesting
model.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=['accuracy', 'AUC', 'Precision', 'Recall']
)

history = model.fit(
    trainingXSet, trainingYSet,
    batch_size=16,
    epochs=30
)

predictions = model.predict(testingXSet) 

# apparently this just works
predicted_classes = (predictions > 0.5).astype(int)

In [None]:
confusionMatrix = confusion_matrix(testingYSet, predicted_classes)
print("Confusion Matrix:\n", confusionMatrix)

accuracy = accuracy_score(testingYSet, predicted_classes)
print("Accuracy:\n", accuracy)

recall = recall_score(testingYSet, predicted_classes)
print("Recall:\n", recall)

f1Score = f1_score(testingYSet, predicted_classes)
print("F1 Score:\n", f1Score)

In [None]:
# its kinda weird, but it needs a dataset of representations to quantize to see what to scale immediate activation weights as
def representative_dataset():
    for i in range(400):  # take ~100 samples from your training set
        # x_train[i] shape: (96, 96, 1), values between 0 and 1
        # Must add batch dimension
        yield [trainingXSet[i:i+1].astype('float32')]


# begin converting, not only to tflite but also doing quantization

# specifies what we're gonna convert to
converter = tf.lite.TFLiteConverter.from_keras_model(model)

# give it the function to calculate middle activations
converter.representative_dataset = representative_dataset

# Force int8 quantization for both weights and activations
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]

# Set input/output to uint8
converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8

In [None]:
# actually make the quantized small model
finalModel = converter.convert()

# now, save it to a file
with open("../cnn_model_uint8.tflite", "wb") as f:
    f.write(finalModel)
