*數據準備和預處理*


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

# 圖像預處理

class AccidentDataset(Dataset):
    def __init__(self, image_dir, labels_file, transform=None):
        self.image_dir = image_dir
        self.labels = pd.read_csv(labels_file)['risk'].values
        #讀tarin內的資料夾
        self.image_paths = sorted(
                  os.path.join(image_dir, i)
                  for i in os.listdir(image_dir)
                  )
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_paths = self.image_paths[idx]
        img_list = []
        for image_path in os.listdir(image_paths):
          image = cv2.imread(os.path.join(image_paths, image_path))
          image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

          if self.transform:
              image = self.transform(image)
          img_list.append(image)
        while len(img_list)<169:
          img_list.append(image)
        image_tensor = torch.stack(img_list)  # Convert list of images to tensor
        label = torch.stack([torch.tensor(self.labels[idx])])
        return image_tensor, label

transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

dataset = AccidentDataset(
    image_dir='/content/drive/MyDrive/登登/期末ML/freeway/train',
    labels_file='/content/drive/MyDrive/登登/期末ML/freeway/freeway_train.csv',
    transform=transform)


分割訓練集和測試集

In [None]:
from sklearn.model_selection import train_test_split
from torch.utils.data import Subset

train_indices, test_indices = train_test_split(list(range(len(dataset))), test_size=0.2, random_state=42)

train_dataset = Subset(dataset, train_indices)
test_dataset = Subset(dataset, test_indices)


train_loader = DataLoader(train_dataset, batch_size=2, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=2, shuffle=False)




#分隔線

構建模型

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CNN_LSTM(nn.Module):
    def __init__(self, num_classes):
        super(CNN_LSTM, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1)
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)
        self.conv5 = nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)

        self.fc1 = nn.Linear(256 * 7 * 7, 512)

        self.lstm = nn.LSTM(input_size=512, hidden_size=256, num_layers=2, batch_first=True)
        self.fc2 = nn.Linear(256, num_classes)

    def forward(self, x):
        batch_size, timesteps, C, H, W = x.size()
        c_in = x.view(batch_size * timesteps, C, H, W)
        x = self.pool(F.relu(self.conv1(c_in)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = self.pool(F.relu(self.conv4(x)))
        x = self.pool(F.relu(self.conv5(x)))
        x = F.relu(self.fc1(x.view(batch_size * timesteps,-1)))
        x = x.view(batch_size, timesteps, -1)
        x, _ = self.lstm(x)
        x = torch.sigmoid(self.fc2(x[:, -1, :]))
        return x
model = CNN_LSTM(1)


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
model = model.to(device)

訓練模型

In [None]:
import torch.optim as optim

criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, labels in tqdm(train_loader):
      #outputs = model(images.to(device))
      #images, labels = images.to('cuda'), labels.to('cuda').float().view(-1, 1)
      images, labels = images.to(device), labels.float().view(-1, 1)
      #images, labels = model(images.to(device)), labels(images.to(device)).view(-1, 1)
      #images, labels = images.to(device), labels.view(-1,1)
      optimizer.zero_grad()
      outputs = model(images)

      loss = criterion(outputs, labels)
      loss.backward()
      optimizer.step()
      running_loss += loss.item()

    print(f'Epoch {epoch+1}, Loss: {running_loss/len(train_loader)}')


torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 1])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 1])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 1])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 1])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 1])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 1])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 1])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 1])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 1])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 1])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 169, 3, 224, 224])
torch.Size([2, 1])
torch.Size([2, 169, 3, 224, 224])
torch.Siz

In [None]:
import torch.optim as optim

criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, labels in train_loader:
      print(images,"     ",labels)
      """images, labels = images.to('cuda'), labels.to('cuda').float().view(-1, 1)
      optimizer.zero_grad()
      outputs = model(images)
      loss = criterion(outputs, labels)
      loss.backward()
      optimizer.step()
      running_loss += loss.item()"""

    #print(f'Epoch {epoch+1}, Loss: {running_loss/len(train_loader)}')


 評估模型

In [None]:
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to('cuda'), labels.to('cuda').float().view(-1, 1)
        outputs = model(images)
        predicted = (outputs > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Accuracy: {100 * correct / total}%')
