In [61]:
import numpy as np

X = []
Y = []


def add_data(X, Y, csv_path, bitflip_chance=0.0):
    print(f"Adding entries with {bitflip_chance=}")
    with open(f"../utils/datasets/{csv_path}") as f:
        for i, line in enumerate(f):
            data = line.split(",")
            x = np.array(data[2:], dtype=np.float32)
            y = np.array(data[:2], dtype=np.float32)

            y = y[1]  # steer only

            # if adding noise, redo multiple times to reduce overfitting to noise
            if bitflip_chance > 0:
                for cell in range(len(x)):
                    if np.random.rand() < bitflip_chance:
                        x[cell] = 0 if x[cell] == 1 else 1  # random bit flips

            grid_size = 30
            x = x.reshape(grid_size, grid_size, 1)  # transform state back to grid

            X.append(x)
            Y.append(y)
    return X, Y


X, Y = add_data(X, Y, "kaggle_30_binary.csv", bitflip_chance=0.0)
X, Y = add_data(X, Y, "kaggle_30_binary.csv", bitflip_chance=0.1)
X, Y = add_data(X, Y, "kaggle_30_binary.csv", bitflip_chance=0.2)
# X, Y = add_data(X, Y, "sim_oval_30_binary.csv", bitflip_chance=0.2)

X = np.array(X, dtype=np.float32)
Y = np.array(Y, dtype=np.float32)

print(f"Loaded {len(X)} entires")
print(f"entry {x.shape=}, {y.shape=}")

Adding entries with bitflip_chance=0.0
Adding entries with bitflip_chance=0.1
Adding entries with bitflip_chance=0.2
Loaded 41376 entires
entry x.shape=(30, 30, 1), y.shape=()


In [63]:
import keras
import tensorflow as tf

# lenet-5
model = keras.models.Sequential(
    [
        keras.layers.Conv2D(
            6, kernel_size=(5, 5), activation="relu", input_shape=(30, 30, 1)
        ),
        keras.layers.MaxPooling2D(pool_size=(2, 2)),
        keras.layers.Conv2D(16, kernel_size=(5, 5), activation="relu"),
        keras.layers.MaxPooling2D(pool_size=(2, 2)),
        keras.layers.Flatten(),
        keras.layers.Dense(120, activation="relu"),
        keras.layers.Dense(84, activation="relu"),
        keras.layers.Dense(1, activation="linear"),
    ]
)


# TODO: add normal following to all experts' ds

model.compile(
    optimizer="adam",
    loss="mean_squared_error",
)

model.fit(
    X,
    Y,
    epochs=10,
    batch_size=32,
    # validation_split=0.2,
    # callbacks=[
    #     keras.callbacks.EarlyStopping(patience=1),
    # ],
)

model_name = "follow_30_binary"

model_save_path = f"../shared/models/{model_name}.keras"
model.save(model_save_path)

# cvt to tflite
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
model_save_path = f"../shared/models/{model_name}.tflite"
with open(model_save_path, "wb") as f:
    f.write(tflite_model)

Epoch 1/10
Epoch 2/10
Epoch 3/10
  63/1293 [>.............................] - ETA: 5s - loss: 0.0099

KeyboardInterrupt: 

In [28]:
# time inference speed
import time

start = time.time()
model(X[0:100])
end = time.time()
avg = (end - start) / 100
print(f"avg inference time: {avg*1000:.2f}ms")

avg inference time: 0.11ms
