<a href="https://colab.research.google.com/github/itshihi/OBDFlow/blob/main/DS_CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn

In [None]:
class DSCNNWithDownsampling(nn.Module):
    def __init__(self, in_channels, num_classes):
        super(DSCNNWithDownsampling, self).__init__()

        # --- Block 1 (64 channels, 32x32) ---
        # 크기 변화 없음
        self.block1 = nn.Sequential(
            nn.Conv2d(in_channels, in_channels, kernel_size=3, stride=2, padding=1, groups=in_channels, bias=False),
            nn.BatchNorm2d(in_channels),
            nn.ReLU(),
            nn.Conv2d(in_channels, 64, kernel_size=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU()
        )

        # --- Block 2 (128 channels, 16x16) ---
        # stride=2를 통해 크기를 절반으로 줄임
        self.block2 = nn.Sequential(
            # Depthwise Conv에서 stride=2로 설정하여 Downsampling 수행!
            nn.Conv2d(64, 64, kernel_size=3,stride=2, padding=1, groups=64, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            # Pointwise Conv에서 채널 수를 128로 변경
            nn.Conv2d(64, 128, kernel_size=1, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU()
        )

        # --- Block 3 (128 channels, 8x8) ---
        # Conv 연산 후 Max Pooling으로 크기를 절반으로 줄임
        self.block3 = nn.Sequential(
            # 여기서는 Conv의 stride는 1 (크기 유지)
            nn.Conv2d(128, 128, kernel_size=3, padding=1, stride=2,groups=128, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=1, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            # # Max Pooling 레이어를 추가하여 Downsampling
            # nn.MaxPool2d(kernel_size=2, stride=2)
        )
        # --- 후처리 레이어 ---
        self.avgpool = nn.AdaptiveMaxPool2d((1, 1))
        self.fc = nn.Linear(128, num_classes)

    def forward(self, x):
        print(f"Initial shape: \t{x.shape}")

        x = self.block1(x)
        print(f"After Block 1: \t{x.shape}") # [batch, 64, 32, 32]

        x = self.block2(x)
        print(f"After Block 2: \t{x.shape}") # [batch, 128, 16, 16]

        x = self.block3(x)
        print(f"After Block 3: \t{x.shape}") # [batch, 128, 8, 8]


        x = self.avgpool(x)
        print(f"After AVGPool: \t{x.shape}") # [batch, 128, 1, 1]

        x = torch.flatten(x, 1)
        x = self.fc(x)
        print(f"After FCL: \t{x.shape}") # [batch, 128, 1, 1]

        return x


In [None]:
# --- 모델 생성 및 테스트 ---
model = DSCNNWithDownsampling(in_channels=64, num_classes=8)
print(model)
dummy_input = torch.randn(1, 64, 64, 64) # 입력 이미지 크기 32x32
output = model(dummy_input)

print(f"\nFinal output shape: {output.shape}")

DSCNNWithDownsampling(
  (block1): Sequential(
    (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=64, bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU()
  )
  (block2): Sequential(
    (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=64, bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (4): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU()
  )
  (block3): Sequential(
    (0): Conv2d(128, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=128, bias=False)
    (1): BatchNorm2d(128, eps=1e-05,

In [None]:
import pandas as pd

In [None]:
rawdata = pd.read_csv('./drive/MyDrive/1_data_with_weather/data_with_weather.csv')
rawdata.head()

raw_time_series_data = rawdata
raw_time_series_data.set_index('timestamp')

print(f"원본 데이터 shape: {raw_time_series_data.shape}")


원본 데이터 shape: (3031, 28)


In [None]:
import numpy as np
import torch
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

In [None]:
# --- 1. 하이퍼파라미터 정의 ---
window_size = 128      # 한 번에 볼 데이터의 길이 (시간 스텝)
step_size = 64         # 윈도우를 다음으로 이동시킬 칸 수
num_features = 16      # 피처(센서 채널)의 개수

# --- 2. 입력 데이터 준비 (가상 데이터 생성) ---
print("--- 데이터 준비 ---")
# 10000개의 시간 스텝을 가진 16개 피처의 시계열 데이터 생성
# raw_time_series_data = np.random.randn(10000, num_features)
# 각 시간 스텝에 해당하는 레이블 생성 (0, 1, 2 중 하나)
# labels = np.random.randint(0, 3, size=(10000,))






# --- 3. 슬라이딩 윈도우를 이용한 데이터 분할 ---
print("--- 슬라이딩 윈도우 적용 ---")
segments = []
segment_labels = []

for i in range(0, len(raw_time_series_data) - window_size, step_size):
    # 현재 위치에서 window_size 만큼의 데이터 조각을 추출
    # Pytorch Conv2d는 (채널, 높이, 너비) 순서를 선호하므로 (피처, 시간) 순으로 저장
    current_segment = raw_time_series_data[i : i + window_size].T # .T로 축을 바꿔 (16, 128) 형태로 저장
    segments.append(current_segment)

    # 해당 조각의 레이블 결정 (윈도우의 마지막 시점 레이블 사용)
    current_label = labels[i + window_size - 1]
    segment_labels.append(current_label)

# 리스트를 numpy 배열로 변환
X = np.array(segments)
y = np.array(segment_labels)
print(f"분할된 데이터 shape (X): {X.shape}") # (샘플 수, 피처 수, 윈도우 크기)
print(f"분할된 레이블 shape (y): {y.shape}\n")


# --- 4. 데이터셋 분리 및 정규화 ---
print("--- 데이터셋 분리 및 정규화 ---")
# 1. 학습/테스트 데이터 분리 (85% 학습+검증, 15% 테스트)
X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=0.15, random_state=42)
# 2. 학습/검증 데이터 분리 (70% 학습, 15% 검증)
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=(0.15/0.85), random_state=42)

print(f"학습 데이터 shape: {X_train.shape}")
print(f"검증 데이터 shape: {X_val.shape}")
print(f"테스트 데이터 shape: {X_test.shape}\n")

# 정규화: StandardScaler 사용
# Scaler는 2D 데이터만 받으므로, 3D 데이터를 2D로 변환 후 다시 3D로 복원해야 함
# 원본 shape: (샘플 수, 피처 수, 윈도우 크기)
scaler = StandardScaler()

# train 데이터 기준으로 scaler 학습
nsamples, nfeatures, nsteps = X_train.shape
X_train_2d = X_train.reshape(nsamples, -1)
scaler.fit(X_train_2d)

# 모든 데이터에 scaler 적용
X_train_scaled_2d = scaler.transform(X_train_2d)
X_val_scaled_2d = scaler.transform(X_val.reshape(X_val.shape[0], -1))
X_test_scaled_2d = scaler.transform(X_test.reshape(X_test.shape[0], -1))

# 다시 원래의 3D shape으로 복원
X_train_scaled = X_train_scaled_2d.reshape(nsamples, nfeatures, nsteps)
X_val_scaled = X_val_scaled_2d.reshape(X_val.shape[0], nfeatures, nsteps)
X_test_scaled = X_test_scaled_2d.reshape(X_test.shape[0], nfeatures, nsteps)
print("정규화 완료\n")


# --- 5. 모델 입력 형태(PyTorch Tensor)로 변환 ---
print("--- PyTorch 텐서로 변환 ---")
# (샘플 수, 채널, 높이, 너비) -> (샘플 수, 1, 피처 수, 윈도우 크기)
# unsqueeze(1)을 사용해 채널 차원(C=1)을 추가
train_final = torch.from_numpy(X_train_scaled).float().unsqueeze(1)
val_final = torch.from_numpy(X_val_scaled).float().unsqueeze(1)
test_final = torch.from_numpy(X_test_scaled).float().unsqueeze(1)

# 레이블도 텐서로 변환
y_train_final = torch.from_numpy(y_train).long()
y_val_final = torch.from_numpy(y_val).long()
y_test_final = torch.from_numpy(y_test).long()



--- 데이터 준비 ---
--- 슬라이딩 윈도우 적용 ---


NameError: name 'labels' is not defined

In [None]:
print(f"최종 학습 데이터 shape: {train_final.shape}")
print(f"최종 검증 데이터 shape: {val_final.shape}")
print(f"최종 테스트 데이터 shape: {test_final.shape}")