<a href="https://colab.research.google.com/github/skkusme21j/skkusme21j/blob/main/%EC%9E%90%EC%9C%A8%EC%A3%BC%ED%96%89.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm # Displays a progress bar

import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from torchvision import datasets, transforms, models
from torch.utils.data import Dataset, Subset, DataLoader, random_split

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [16]:
import os
import cv2
import pandas as pd
from PIL import Image

def read_images(folder_path):
    images = []
    image_files = sorted(os.listdir(folder_path))
    for file_name in image_files:
        image_path = os.path.join(folder_path, file_name)
        image = Image.open(image_path)  # 이미지 읽어오기
        images.append(image)
    return images

def read_labels(label_file):
    df = pd.read_csv(label_file)
    image_filenames = df['center'].tolist()
    steering_angles = df['steering'].tolist()
    labels = [float(angle) for angle in steering_angles]
    return labels

# def read_labels(label_file):
#     df = pd.read_csv(label_file)
#     image_filenames = df['center'].tolist()
#     steering_angles = df['steering'].tolist()
#     labels = [name[6:] for name in image_filenames]
#     return labels
  
def image_titles(folder_path):
  titles=[]
  image_files=sorted(os.listdir(folder_path))
  for file_name in image_files:
    titles.append(file_name)
  return titles


def augment_data(images,labels):
  augmented_images = []
  augmented_labels = []

  for image, label in zip(images,labels):
    augmented_images.append(image)  # 원본 이미지 추가
    # Create a horizontal flip transform
    transform = transforms.RandomHorizontalFlip(p=1.0)  # p=1.0 to always perform the flip
    # Apply the horizontal flip to the image
    flipped_image = transform(image)
    augmented_images.append(flipped_image)
    augmented_labels.append(label) 
    # 가로 반전 (horizontal flipping)
    flipped_label = -label   # steering wheel angle 부호 반전
    augmented_labels.append(flipped_label)

  return augmented_images, augmented_labels

def crop_image(image_path):
    # Load the image
    img = Image.open(image_path)

    # Calculate crop sizes
    width, height = img.size
    crop_top = int(height * 0.35)
    crop_bottom = int(height * 0.18)

    # Define the transformation
    transform = transforms.Compose([
        transforms.CenterCrop((height - crop_top - crop_bottom, width)),
    ])

    # Apply the transformation
    cropped_img = transform(img)

    return cropped_img

def crop_images_in_folder(image_folder):
    # Create a list to store the cropped images
    cropped_images = []
    image_files = sorted(os.listdir(image_folder))

    # Iterate over the files in the image folder
    for filename in os.listdir(image_folder):
        # Construct the image path
        image_path = os.path.join(image_folder, filename)
        image=crop_image(image_path)
        a=np.array(image)
        # Load the image
        

        # Append the cropped image to the list
        cropped_images.append(a)

    return cropped_images

# A2D2 데이터셋 경로
# dataset_path = '/content/drive/MyDrive/자율주행인공지능 및 제어/IMG'

# A2D2 데이터셋의 이미지 폴더와 라벨 파일 경로 설정
image_folder ='/content/drive/MyDrive/자율주행인공지능 및 제어/IMG'
label_file = '/content/drive/MyDrive/자율주행인공지능 및 제어/자율주행 프로젝트/driving_log.csv'

In [5]:
# folder 생성
def createFolder(directory):
    try:
        if not os.path.exists(directory):
            os.makedirs(directory)
    except OSError:
        print ('Error: Creating directory. ' +  directory)

In [95]:
import os
from PIL import Image

def save_image_pairs(dataset, save_folder):
    # Create the save folder if it doesn't exist
    os.makedirs(save_folder, exist_ok=True)

    # Iterate over the dataset and save image-label pairs
    for idx in range(len(dataset)):
        # Get the image from the dataset
        image = dataset[idx]

        # Construct the file paths for saving
        image_path = os.path.join(save_folder, f'image_{idx}.jpg')

        # Save the image
        image.save(image_path)

        
def save_label_pairs(dataset, save_folder):
    # Create the save folder if it doesn't exist
    os.makedirs(save_folder, exist_ok=True)

    # Iterate over the dataset and save image-label pairs
    for idx in range(len(dataset)):
        # Get the label from the dataset
        label = dataset[idx]

        # Construct the file paths for saving
        label_path = os.path.join(save_folder, f'label_{idx}.txt')


        # Save the label (assuming it's a string or a single value)
        with open(label_path, 'w') as file:
            file.write(str(label))

In [6]:
createFolder('./Dataset/Train')
createFolder('./Dataset/Test')
createFolder('./Dataset/Train/image')
createFolder('./Dataset/Train/label')
createFolder('./Dataset/Test/image')
createFolder('./Dataset/Test/label')
createFolder('./Dataset/Train/crop_image')
createFolder('./Dataset/Test/crop_image')

In [43]:
class A2D2Dataset(Dataset):
  def __init__(self,images,labels):
    self.images=images
    self.labels=labels

  def __len__(self):
    return len(self.images)

  def __getitem__(self,idx):
    image=self.images[idx]
    label=self.labels[idx]
    return image,label

In [17]:
images=read_images(image_folder)
labels=read_labels(label_file)
aug_images, aug_labels=augment_data(images, labels)
dataset=A2D2Dataset(aug_images,aug_labels)

In [90]:
#train, test=random_split(dataset,[0.8,0.2])
train_indices, test_indices = random_split(range(len(dataset)), [0.8, 0.2])

train_images = [dataset[i][0] for i in train_indices]
train_labels = [dataset[i][1] for i in train_indices]

test_images = [dataset[i][0] for i in test_indices]
test_labels = [dataset[i][1] for i in test_indices]

In [96]:
# 폴더에 저장
save_image_pairs(train_images,'./Dataset/Train/image')
save_image_pairs(test_images,'./Dataset/Test/image')
save_label_pairs(train_labels,'./Dataset/Train/label')
save_label_pairs(test_labels,'./Dataset/Test/label')

In [104]:
print("Loading datasets...")

Loading datasets...


In [126]:
# crop image folder에 저
def crop_image_in_folder(image_folder, output_folder):
    # Create the output folder if it doesn't exist
    os.makedirs(output_folder, exist_ok=True)

    # Iterate over the files in the image folder
    for filename in os.listdir(image_folder):
        # Construct the image path
        image_path = os.path.join(image_folder, filename)

        # Load the image
        img = Image.open(image_path)

        # Calculate crop sizes
        width, height = img.size
        crop_top = int(height * 0.35)
        crop_bottom = int(height * 0.18)

        # Define the transformation
        transform = transforms.Compose([
            transforms.CenterCrop((height - crop_top - crop_bottom, width)),
        ])

        # Apply the transformation
        cropped_img = transform(img)

        # Construct the output file path
        output_path = os.path.join(output_folder, filename)

        # Save the cropped image
        cropped_img.save(output_path)

crop_image_in_folder('./Dataset/Train/image','./Dataset/Train/image')
crop_image_in_folder('./Dataset/Test/image','./Dataset/Test/image')

In [115]:
MyTransform = transforms.Compose([
    transforms.ToTensor(), # Transform from [0,255] uint8 to [0,1] float
    transforms.Normalize([0.5], [0.5]) # TODO: Normalize to zero mean and unit variance with appropriate parameters
])

In [150]:
def read_all_label_files(folder_path):
    text_contents = []
    for filename in os.listdir(folder_path):
        file_path = os.path.join(folder_path, filename)
        if os.path.isfile(file_path) and filename.endswith('.txt'):
            with open(file_path, 'r') as file:
                content = file.read()
            text_contents.append(content)
    return text_contents

In [151]:
class A2D2(Dataset):
  def __init__(self,image_folder,label_folder):
    self.images=read_images(image_folder)
    self.labels=read_all_label_files(label_folder)

  def __len__(self):
    return len(self.images)

  def __getitem__(self,idx):
    image=self.images[idx]
    label=self.labels[idx]
    return image,label
 
real_train_dataset = A2D2('./Dataset/Train/image','./Dataset/Train/label')
real_test_dataset = A2D2('./Dataset/Test/image','./Dataset/Test/label')

In [165]:
createFolder('./DATASET')
createFolder('./DATASET/Train')
createFolder('./DATASET/Test')

In [164]:
def delete_folder(folder_path):
    # 폴더 안에 있는 모든 파일 삭제
    for filename in os.listdir(folder_path):
        file_path = os.path.join(folder_path, filename)
        if os.path.isfile(file_path):
            os.remove(file_path)
        elif os.path.isdir(file_path):
            shutil.rmtree(file_path)

    # 폴더 삭제
    shutil.rmtree(folder_path)

folder_path = '/content/DATASET/Test'
delete_folder(folder_path)

In [169]:
save_folder = '/content/DATASET/Train'  # 저장할 폴더 경로 설정

for idx, (image, label) in enumerate(real_train_dataset):
    # Set the filename using the label
    filename = "label/" +label + ".jpg"

    # Construct the full file paths
    image_path = os.path.join(save_folder, filename)
    os.makedirs(os.path.dirname(image_path), exist_ok=True)
    # Save the image
    image.save(image_path)

In [171]:
save_folder = '/content/DATASET/Test'  # 저장할 폴더 경로 설정

for idx, (image, label) in enumerate(real_test_dataset):
    # Set the filename using the label
    filename = "label/" +label + ".jpg"

    # Construct the full file paths
    image_path = os.path.join(save_folder, filename)
    os.makedirs(os.path.dirname(image_path), exist_ok=True)
    # Save the image
    image.save(image_path)


In [172]:
Data_train = datasets.ImageFolder(root='./DATASET/Train', transform=MyTransform, target_transform=False)
Data_test = datasets.ImageFolder(root='./DATASET/Test', transform=MyTransform, target_transform=False)

In [173]:
trainloader = DataLoader(Data_train, batch_size=128, shuffle=True)
testloader = DataLoader(Data_test, batch_size=1, shuffle=True)

In [None]:
class Network(nn.Module):
    def __init__(self):
        super().__init__()
        # TODO: [Transfer learning with pre-trained ResNet-50] Design your own fully-connected network (FCN).
        # Design your own FCN for regression. Here I provide a sample of two-layer FCN.
        # Refer to PyTorch documentations of torch.nn to pick your layers. (https://pytorch.org/docs/stable/nn.html)
        # Some common Choices are: Linear, ReLU, Dropout, MaxPool2d, AvgPool2d
        # If you have many layers, consider using nn.Sequential() to simplify your code
        
        # Load pretrained ResNet-50
        self.model_resnet = models.resnet50(pretrained=True)
                
        # Set ResNet-50's FCN as an identity mapping
        num_fc_in = self.model_resnet.fc.in_features
        self.model_resnet.fc = nn.Identity()
        
        # TODO: Design your own FCN
        self.fc1 = nn.Linear(num_fc_in, 256, bias = True) # from input of size num_fc_in to output of size ?
        self.fc2 = nn.Linear(256, 1, bias = True) # from hidden layer to steering angle

        # CNN
        self.conv1 = nn.Conv2d(3, 3, kernel_size=5, padding=2)
        self.conv2 = nn.Conv2d(3, 5, kernel_size=5, padding=2)

        # YOLO
        


    def forward(self,x):
        # TODO: Design your own network, implement forward pass here
        
        relu = nn.ReLU() # No need to define self.relu because it contains no parameters
        
        with torch.no_grad():
            features = self.model_resnet(x)
            
        x = self.fc1(features) # Activation are flattened before being passed to the fully connected layers
        x = relu(x)
        x = self.fc2(x)
        
        x_cnn = self.conv1(x)
        x_cnn = nn.ReLU()(x_cnn)
        x_cnn = self.conv2(x_cnn)
        x_cnn = nn.ReLU()(x_cnn)
        x_cnn = torch.flatten(x_cnn, 1)
        
        # The loss layer will be applied outside Network class
        return x_cnn