<a href="https://colab.research.google.com/github/JSKimGitHub/testfile/blob/main/0916.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# --------------------------------------
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
import time

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

Device: cuda


In [None]:
class ObstacleDetector:
    def find_red_area(self, frame):
        """
        frame: BGR 이미지 (OpenCV로 읽은 이미지)
        return: 빨간색 물체의 넓이 (픽셀 수), 없으면 0
        """
        # 1. BGR -> HSV 변환
        hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

        # 2. 빨간색 HSV 범위 정의
        lower_red1 = np.array([0, 120, 70])
        upper_red1 = np.array([10, 255, 255])
        lower_red2 = np.array([170, 120, 70])
        upper_red2 = np.array([180, 255, 255])

        # 3. 마스크 생성
        mask1 = cv2.inRange(hsv, lower_red1, upper_red1)
        mask2 = cv2.inRange(hsv, lower_red2, upper_red2)
        mask = mask1 + mask2

        # 4. 노이즈 제거
        mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((5,5), np.uint8))
        mask = cv2.morphologyEx(mask, cv2.MORPH_DILATE, np.ones((5,5), np.uint8))

        # 5. 컨투어 찾기
        contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        if len(contours) == 0:
            return 0  # 빨간색 물체 없음 → 넓이 0 반환

        # 6. 가장 큰 컨투어의 면적 계산
        c = max(contours, key=cv2.contourArea)
        area = cv2.contourArea(c)

        return area

In [None]:
class LaneDetector:
    def __init__(self):
        self.prev_lanes = [None, None]  # [왼쪽 차선, 오른쪽 차선]
        self.img_center = None
        self.margin = 50  # 상태 판정 margin

    def process_frame(self, frame):
        height, width = frame.shape[:2]
        self.img_center = width // 2

        # ----------------------
        # 1️⃣ Gray + Blur
        # ----------------------
        hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

        # 흰색 범위 (밝은 영역)
        lower_white = np.array([0, 0, 200])
        upper_white = np.array([180, 30, 255])
        mask_white = cv2.inRange(hsv, lower_white, upper_white)

        # 노란색 범위
        lower_yellow = np.array([15, 80, 100])
        upper_yellow = np.array([35, 255, 255])
        mask_yellow = cv2.inRange(hsv, lower_yellow, upper_yellow)

        # 두 마스크 합치기
        mask = cv2.bitwise_or(mask_white, mask_yellow)

        # 원본에서 색상만 추출
        result = cv2.bitwise_and(frame, frame, mask=mask)

        gray = cv2.cvtColor(result, cv2.COLOR_BGR2GRAY)
        blur = cv2.GaussianBlur(gray, (5,5), 0)

        # ----------------------
        # 2️⃣ Canny 엣지
        # ----------------------
        edges = cv2.Canny(blur, 50, 150)

        # ----------------------
        # 3️⃣ ROI 적용
        # ----------------------
        mask = np.zeros_like(edges)
        roi = np.array([[
            (0, height),
            (0, int(height*0.6)),
            (width, int(height*0.6)),
            (width, height)
        ]], np.int32)
        cv2.fillPoly(mask, roi, 255)
        edges_roi = cv2.bitwise_and(edges, mask)

        # ----------------------
        # 4️⃣ 허프 직선
        # ----------------------
        lines = cv2.HoughLinesP(edges_roi, 1, np.pi/180, 50, minLineLength=20, maxLineGap=100)

        # ----------------------
        # 5️⃣ slope 기준 left/right 분류
        # ----------------------
        left_lines, right_lines = [], []
        if lines is not None:
            for x1, y1, x2, y2 in lines[:, 0]:
                slope = (y2 - y1) / (x2 - x1 + 1e-6)
                if slope < -0.5:
                    left_lines.append((x1, y1, x2, y2))
                elif slope > 0.5:
                    right_lines.append((x1, y1, x2, y2))

        # ----------------------
        # 6️⃣ 화면 중심에 가장 가까운 안쪽 선 선택
        # ----------------------
        left_inner = max(left_lines, key=lambda l: (l[0]+l[2])/2) if left_lines else None
        right_inner = min(right_lines, key=lambda l: (l[0]+l[2])/2) if right_lines else None

        lanes = [None, None]  # [left, right]

        for line_inner, idx in [(left_inner, 0), (right_inner, 1)]:
            if line_inner is not None:
                x1, y1, x2, y2 = line_inner
                lane = [(x1, y1), (x2, y2)]  # 검출된 직선 그대로 사용

                # 이전 프레임과 스무딩
                if self.prev_lanes[idx] is not None:
                    lane = [(
                        (lane[i][0] + self.prev_lanes[idx][i][0]) // 2,
                        (lane[i][1] + self.prev_lanes[idx][i][1]) // 2
                    ) for i in range(len(lane))]

                self.prev_lanes[idx] = lane
                lanes[idx] = lane  # 상태 계산용

        # ----------------------
        # 6️⃣ 상태 계산
        # ----------------------
        lane_state = 1  # 기본 center

        if lanes[0] is not None and lanes[1] is not None:
            left_center = (lanes[0][0][0] + lanes[0][1][0]) // 2
            right_center = (lanes[1][0][0] + lanes[1][1][0]) // 2
            lane_center = (left_center + right_center) // 2

            if abs(lane_center - self.img_center) < self.margin:
                lane_state = 1  # center
            elif lane_center < self.img_center:
                lane_state = 0  # left
            else:
                lane_state = 2  # right

        return lanes, lane_state


# DQN data collector

In [None]:
class OfflineDataCollector:
    def __init__(self, lane_detector, obstacle_detector):
        self.lane_detector = lane_detector
        self.obstacle_detector = obstacle_detector

    def _get_state(self, frame, car_x):
        """주어진 프레임에서 상태(state) 계산"""
        # 장애물 정보
        area= self.obstacle_detector.find_red_area(frame)

        lanes, act= self.lane_detector.process_frame(frame)
        lanes = np.array(lanes, dtype=object)
        left_lane, right_lane = lanes

        left_x = min(left_lane[0][0], left_lane[1][0]) if left_lane is not None else 0
        right_x = max(right_lane[0][0], right_lane[1][0]) if right_lane is not None else frame.shape[1]

        state = np.array([
            left_x / frame.shape[1],
            right_x / frame.shape[1],
            (left_x + right_x) / (2 * frame.shape[1]),  # 차선 중앙
            car_x / frame.shape[1],
            area / (frame.shape[0] * frame.shape[1])  # 프레임에 대한 빨간색의 비율
       ], dtype=np.float32)

        return state, act

    def _calculate_reward(self, state):
        """주어진 상태에서 보상 계산"""
        reward = 0.0

        lane_center = state[2]
        car_position = state[3]
        distance_from_center = abs(car_position - lane_center)

        # 차선 중앙 유지 보상
        if distance_from_center < 0.1:
            reward += 10.0
        elif distance_from_center < 0.2:
            reward += 5.0
        else:
            reward -= 5.0

        # 차선 이탈 페널티
        if distance_from_center > 0.4:
            reward -= 20.0

        # 안정적 주행 기본 보상
        reward += 5.0

        # 장애물과의 거리 보상
        norm_area = state[4]
        if norm_area > 0.7:
            reward -= 70.0

        return reward

    def collect_from_frames(self, frames, car_x_init=None, actions_taken=None):
        """
        frames에서 state/action/reward/next_state/done 리스트 생성
        frames : 비디오 프레임 리스트
        car_x_init : 초기 차량 위치 (없으면 화면 중앙)
        actions_taken : 이미 결정된 action 리스트 (없으면 간단 규칙 적용)
        """
        state_list = []
        action_list = []
        reward_list = []
        next_state_list = []
        done_list = []

        car_x = frames[0].shape[1] // 2

        valid_indices = [i for i in range(len(frames)-1) if i % 4 == 0]
        for idx in valid_indices:
            frame = frames[idx]
            if(idx+4>=len(frames)):
                break
            next_frame = frames[idx + 4]

            # 현재 상태
            state, act= self._get_state(frame, car_x)
            #cv2.putText(draw, f"act: {act}", (10, 30),
             #     cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)

            # 다음 상태
            next_state, next_act= self._get_state(next_frame, car_x)

            # 보상 계산
            reward = self._calculate_reward(state)

            # done 여부
            done = False

            if abs(next_state[3] - next_state[2]) > 0.5:  # 차선 벗어나면 종료
                done = True

            if idx +4 >= len(frames):  # 마지막 프레임
                done = True

            # 리스트 저장
            state_list.append(state)
            action_list.append(act)
            reward_list.append(reward)
            next_state_list.append(next_state)
            done_list.append(done)


        return state_list, action_list, reward_list, next_state_list, done_list


In [None]:
class DQN(nn.Module):

    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(state_dim, 64),
            nn.ReLU(),
            nn.Dropout(0.2),  # 과적합 방지
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(64, action_dim)
        )

    def forward(self, x):
        return self.fc(x)

In [None]:
def train_offline_dqn(state_list, action_list, reward_list, next_state_list, done_list,
                       epochs=100, batch_size=32):
    """오프라인 RL DQN 학습"""
    print("Starting offline DQN training...")

    state_dim = len(state_list[0])
    action_dim = 3 #액션 개수 3개

    # 네트워크 초기화
    policy_net = DQN(state_dim, action_dim)
    target_net = DQN(state_dim, action_dim)
    target_net.load_state_dict(policy_net.state_dict())

    optimizer = optim.Adam(policy_net.parameters(), lr=1e-3)
    gamma = 0.99
    update_frequency = 10

    # 전체 경험 리스트
    dataset = list(zip(state_list, action_list, reward_list, next_state_list, done_list))

    for epoch in range(epochs):
        # 무작위 배치 샘플링
        batch = random.sample(dataset, batch_size)

        states = torch.tensor([exp[0] for exp in batch], dtype=torch.float32)
        actions = torch.tensor([exp[1] for exp in batch], dtype=torch.long)
        rewards = torch.tensor([exp[2] for exp in batch], dtype=torch.float32)
        next_states = torch.tensor([exp[3] for exp in batch], dtype=torch.float32)
        dones = torch.tensor([exp[4] for exp in batch], dtype=torch.bool)

        # Q-러닝 업데이트
        current_q_values = policy_net(states).gather(1, actions.unsqueeze(1)).squeeze()

        with torch.no_grad():
            next_q_values = target_net(next_states).max(1)[0]
            target_q_values = rewards + gamma * (1 - dones.float()) * next_q_values

        loss = nn.MSELoss()(current_q_values, target_q_values)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # 타겟 네트워크 주기적 업데이트
        if epoch % update_frequency == 0:
            target_net.load_state_dict(policy_net.state_dict())

        # 학습 진행 출력
        #if epoch % 10 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item():.4f}")

    print("Offline DQN training completed!")
    return policy_net

In [None]:
# 1. 비디오에서 프레임 읽기
video_path = "/content/drive/MyDrive/jolup/2_2.mp4"
cap = cv2.VideoCapture(video_path)

frames = []
while True:
    ret, frame = cap.read()
    if not ret:
        break
    frames.append(frame)
cap.release()
print(f"총 {len(frames)} 프레임 읽음")

# 2. LaneDetector, ObstacleDetector 생성
lane_detector = LaneDetector()
obstacle_detector = ObstacleDetector()

# 3. OfflineDataCollector 생성
collector = OfflineDataCollector(lane_detector, obstacle_detector)

# 4. 데이터 수집
state_list, action_list, reward_list, next_state_list, done_list = collector.collect_from_frames(frames)

# 5. 결과 확인
print("video_path",video_path)
print(f"총 transition 수: {len(state_list)}")

총 638 프레임 읽음
샘플 state: [0.04453125 0.82109374 0.4328125  0.5        0.00096365]
샘플 action: 0
샘플 reward: 15.0
샘플 next_state: [0.05703125 0.846875   0.4519531  0.5        0.00094626]
샘플 done: False
총 transition 수: 159


In [None]:
policy_net = train_offline_dqn(
    state_list,
    action_list,
    reward_list,
    next_state_list,
    done_list,
    epochs=3,
    batch_size=32
)

Starting offline DQN training...
Epoch 0, Loss: 149.4779
Epoch 1, Loss: 167.9412
Epoch 2, Loss: 161.8736
Offline DQN training completed!


  states = torch.tensor([exp[0] for exp in batch], dtype=torch.float32)


In [None]:
# 예: 상태 입력으로 Q값 확인
for i in range(159):
  sample_state = torch.tensor(state_list[35], dtype=torch.float32)
  q_values = policy_net(sample_state.unsqueeze(0))  # 배치 차원 추가
  print("Sample Q-values:", q_values.detach().numpy())
  action = q_values.argmax().item()
  print("Sample action:", action)

Sample Q-values: [[ 0.0983123   0.27721366 -0.04909869]]
Sample action: 1
Sample Q-values: [[ 0.06658914  0.25669318 -0.08162481]]
Sample action: 1
Sample Q-values: [[ 0.05645216  0.2225334  -0.12108187]]
Sample action: 1
Sample Q-values: [[ 0.04353227  0.2856777  -0.13794778]]
Sample action: 1
Sample Q-values: [[ 0.13933015  0.20064373 -0.05212199]]
Sample action: 1
Sample Q-values: [[ 0.07662479  0.31854612 -0.10721801]]
Sample action: 1
Sample Q-values: [[ 0.10960927  0.13965955 -0.03002194]]
Sample action: 1
Sample Q-values: [[0.00314743 0.30153975 0.00142593]]
Sample action: 1
Sample Q-values: [[ 0.05124185  0.22152644 -0.0588759 ]]
Sample action: 1
Sample Q-values: [[0.04348784 0.2587208  0.00813333]]
Sample action: 1
Sample Q-values: [[-0.00830344  0.31969324 -0.03618594]]
Sample action: 1
Sample Q-values: [[-0.00394356  0.2602191  -0.07950318]]
Sample action: 1
Sample Q-values: [[ 0.11152954  0.20450477 -0.05634744]]
Sample action: 1
Sample Q-values: [[ 0.09664173  0.2412095  -

In [None]:
print(reward_list)

[15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 10.0, 15.0, 10.0, 10.0, 10.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 10.0, 10.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 0.0, 0.0, 0.0, 0.0, -20.0, 0.0, 0.0, -20.0, -20.0, 10.0, 10.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 10.0, 10.0, 10.0, 10.0, 15.0, 15.0, 15.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 15.0, 15.0, 15.0, 10.0, 10.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0, 15.0]
