<a href="https://colab.research.google.com/github/rsuwa/self-driving-mini-car/blob/main/learn_steer_throttle.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pwd

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!ls /content/drive/MyDrive/train_data_D | wc -l

In [None]:
import os
import matplotlib.pyplot as plt

# フォルダのパスを設定
folder_path = "/content/drive/MyDrive/train_data_D"  # 例: "/path/to/your/folder"

# ステア角のリストを作成
steer_angles = []

# フォルダ内のすべてのファイルをループ
for filename in os.listdir(folder_path):
    if filename.endswith(".jpg"):  # .jpgファイルのみを対象
        # ステア角を取得
        angle = filename.split("_")[0]
        try:
            steer_angles.append(float(angle))
        except ValueError:
            print(f"Could not convert {angle} to float from filename {filename}")
            continue

# ステア角の分布をヒストグラムとしてプロット
plt.hist(steer_angles, bins=50)
plt.title("Distribution of Steer Angles")
plt.xlabel("Steer Angle")
plt.ylabel("Number of Images")
plt.show()


In [None]:
import os
import matplotlib.pyplot as plt

# フォルダのパスを設定
folder_path = "/content/drive/MyDrive/train_data_D"  # 例: "/path/to/your/folder"

# ステア角のリストを作成
throttles = []

# フォルダ内のすべてのファイルをループ
for filename in os.listdir(folder_path):
    if filename.endswith(".jpg"):  # .jpgファイルのみを対象
        # throttle値を取得
        throttle = filename.split("_")[1]
        try:
            throttles.append(float(throttle))
        except ValueError:
            print(f"Could not convert {throttle} to float from filename {filename}")
            continue

# ステア角の分布をヒストグラムとしてプロット
plt.hist(throttles, bins=100)
plt.title("Distribution of Throttle values")
plt.xlabel("Throttle value")
plt.ylabel("Number of Images")
plt.show()


In [None]:
import torch
import torch.optim as optim
import torch.nn.functional as F
import torchvision
import torchvision.datasets as datasets
import torchvision.models as models
import torchvision.transforms as transforms
import glob
import PIL.Image
import os
import numpy as np

# 無駄なファイル削除

In [None]:
import os

def delete_files(start, end, dir_path, file_extension=".jpg"):
    """
    Delete files in a numerical range with a specific extension.

    :param start: Start of the numerical range
    :param end: End of the numerical range
    :param dir_path: Path to the directory containing the files
    :param file_extension: Extension of the files to delete
    """
    for i in range(start, end + 1):
        # Generate filename
        filename = f"{i:04d}{file_extension}"
        file_path = os.path.join(dir_path, filename)

        # Try to delete the file, and pass if it doesn't exist
        try:
            os.remove(file_path)
            print(f"Deleted: {file_path}")
        except FileNotFoundError:
            print(f"File not found: {file_path}")
            pass

# Example usage:
delete_files(0, 4000, "/content/drive/MyDrive/train_data_D")


## スロットル値が0に近い画像を削除

In [None]:
import os

# フォルダのパスを設定
folder_path = "/content/drive/MyDrive/train_data_D"  # 例: "/path/to/your/folder"
throttle_threshold = 0.005 # スロットル値がこの値未満のファイルを削除
steer_angle_threshold = 0.05
cnt = 0
# フォルダ内のすべてのファイルをループ
for filename in os.listdir(folder_path):
    if filename.endswith(".jpg"):  # .jpgファイルのみを対象
        # スロットル値を取得
        throttle_str = filename.split("_")[1]
        # ステア値を取得
        steer_angle_str = filename.split("_")[0]
        try:
            throttle = float(throttle_str)
            steer_angle = float(steer_angle_str)

            # スロットル値が指定された閾値未満かをチェック
            if abs(throttle) < throttle_threshold:
                # and abs(steer_angle) < steer_angle_threshold:
                file_path = os.path.join(folder_path, filename)
                os.remove(file_path)
                print(f"Removed {filename}, steer={steer_angle}, throttle={throttle}")
                cnt += 1
        except ValueError:
            print(f"Could not convert {throttle_str} to float from filename {filename}")
            continue
print(cnt)


In [None]:
def get_steering(path):
    """Gets the steering value from the image filename"""
    gets = path.split('_')
    return float(gets[0])

def get_speed(path):
    """Gets the speed value from the image filename"""
    gets = path.split('_')
    return float(gets[1])

class SSDataset(torch.utils.data.Dataset):

    def __init__(self, directory, random_hflips=False):
        self.directory = directory
        self.random_hflips = random_hflips
        self.image_paths = glob.glob(os.path.join(self.directory, '*.jpg'))
        self.color_jitter = transforms.ColorJitter(0.3, 0.3, 0.3, 0.3)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]

        image = PIL.Image.open(image_path)
        x = float(get_steering(os.path.basename(image_path)))
        y = float(get_speed(os.path.basename(image_path)))

        image = self.color_jitter(image)
        image = transforms.functional.resize(image, (224, 224))
        image = transforms.functional.to_tensor(image)
        image = image.numpy()[::-1].copy()
        image = torch.from_numpy(image)
        image = transforms.functional.normalize(image, [0.485, 0.456, 0.406], [0.229, 0.224, 0.225])

        return image, torch.tensor([x, y]).float()

dataset = SSDataset('/content/drive/MyDrive/train_data_D', random_hflips=False)

In [None]:
test_percent = 0.1
num_test = int(test_percent * len(dataset))
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [len(dataset) - num_test, num_test])


In [None]:
train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=16,
    shuffle=True,
    num_workers=4
)

test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=16,
    shuffle=True,
    num_workers=4
)

In [None]:
model = models.resnet18(pretrained=True)

In [None]:
model.fc = torch.nn.Linear(512, 2)
device = torch.device('cuda')
model = model.to(device)

# Frozen for modle layer1 to layer3
for l in model.layer1.parameters():
    l.requires_grad=False
for l in model.layer2.parameters():
    l.requires_grad=False
for l in model.layer3.parameters():
    l.requires_grad=False

In [None]:
EARLY_STOP = True


NUM_EPOCHS = 100
BEST_MODEL_PATH = 'best_steering_model_ResNet.pth'
best_loss = 1e9
optimizer = optim.Adam(model.parameters())

es_counter = 0
for epoch in range(NUM_EPOCHS):

    model.train()
    train_loss = 0.0
    for images, labels in iter(train_loader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = F.mse_loss(outputs, labels)
        loss.backward()
        train_loss += float(loss)
        optimizer.step()
    train_loss /= len(train_loader)

    model.eval()
    test_loss = 0.0
    for images, labels in iter(test_loader):
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        loss = F.mse_loss(outputs, labels)
        test_loss += float(loss)
    test_loss /= len(test_loader)
    print('%d, %f, %f' % (epoch, train_loss, test_loss))
    if test_loss < best_loss:
        torch.save(model.state_dict(), BEST_MODEL_PATH)
        best_loss = test_loss
        es_counter=0
    else:
        if es_counter == 10 and EARLY_STOP:
            print("Early Stopping EPOCH[{}], val_loss {:4f}".format(epoch, best_loss))
            break
        else:
            es_counter += 1


In [None]:
TEST_DATA_IMAGE = '/content/drive/MyDrive/train_data_D/1.000000_-0.128851_00dd6bba-6100-11ee-b386-0242c6b6e9d9.jpg'

model = torchvision.models.resnet18(pretrained=False)
model.fc = torch.nn.Linear(512, 2)
model.load_state_dict(torch.load('best_steering_model_ResNet.pth'))
model = model.to(device)
model = model.eval()
image_path = TEST_DATA_IMAGE
image = PIL.Image.open(image_path)
image = transforms.functional.resize(image, (224, 224))
image = transforms.functional.to_tensor(image)
image = image.numpy()[::-1].copy()
image = torch.from_numpy(image)
image = transforms.functional.normalize(image, [0.485, 0.456, 0.406], [0.229, 0.224, 0.225]).to(device)
outputs = model(image[None, ...])
print(outputs)

In [None]:
image = PIL.Image.open(image_path)
image_orig_size = image.size

sample_image = transforms.functional.resize(image, (224, 224))


sample_image = transforms.functional.to_tensor(sample_image)
sample_image = sample_image.numpy()[::-1].copy()
sample_image = torch.from_numpy(sample_image)
sample_image = transforms.functional.normalize(sample_image, [0.485, 0.456, 0.406], [0.229, 0.224, 0.225]).to(device)

In [None]:
# https://tech.jxpress.net/entry/2018/12/12/130057
# https://jacobgil.github.io/deeplearning/vehicle-steering-angle-visualizations
class GradCAM:
    def __init__(self, model, feature_layer):
        self.model = model
        self.feature_layer = feature_layer
        self.model.eval()
        self.feature_grad = None
        self.feature_map = None
        self.hooks = []

        # 最終層逆伝播時の勾配を記録する
        def save_feature_grad(module, in_grad, out_grad):
            self.feature_grad = out_grad[0]
        self.hooks.append(self.feature_layer.register_backward_hook(save_feature_grad))

        # 最終層の出力 Feature Map を記録する
        def save_feature_map(module, inp, outp):
            self.feature_map = outp[0]
        self.hooks.append(self.feature_layer.register_forward_hook(save_feature_map))

    def forward(self, x):
        return self.model(x)

    def backward_on_target(self,output):
        self.model.zero_grad()
        output.backward()

    def clear_hook(self):
        for hook in self.hooks:
            hook.remove()


In [None]:
model.eval()
grad_cam = GradCAM(model=model, feature_layer=list(model.layer4.modules())[-1])

In [None]:
model_output = grad_cam.forward(sample_image[None, ...])
print(model_output)
#model_output.backward()
grad_cam.backward_on_target(model_output[0][0])

In [None]:
import numpy as np
# Get feature gradient
feature_grad = grad_cam.feature_grad.data.cpu().numpy()[0]
# Get weights from gradient
weights = np.mean(feature_grad, axis=(1, 2))  # Take averages for each gradient
# Get features outputs
feature_map = grad_cam.feature_map.data.cpu().numpy()
grad_cam.clear_hook()

In [None]:
# Get cam
cam = np.sum((weights * feature_map.T), axis=2).T
cam = np.maximum(cam, 0)  # apply ReLU to cam

In [None]:
print(cam)

In [None]:
#(cam*10000).astype(np.uint8)
import cv2
import matplotlib.pyplot as plt
cam = cv2.resize(cam, (224,224))
cam = (cam - np.min(cam)) / (np.max(cam) - np.min(cam))  # Normalize between 0-1
cam = np.uint8(cam * 255)  # Scale between 0-255 to visualize
plt.imshow(cam)
plt.show()

In [None]:
activation_heatmap = np.expand_dims(cam, axis=0).transpose(1,2,0)
org_img = np.asarray(image.resize((224,224)))
img_with_heatmap = np.multiply(np.float32(activation_heatmap), np.float32(org_img))
img_with_heatmap = img_with_heatmap / np.max(img_with_heatmap)
org_img = cv2.resize(org_img, image_orig_size)

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(15,10))
plt.subplot(1,2,1)
plt.imshow(org_img)
plt.subplot(1,2,2)
plt.imshow(cv2.resize(np.uint8(255 * img_with_heatmap), image_orig_size))
plt.show()