In [None]:
pip install trimesh

In [None]:
pip install scikit-image

In [None]:
pip install scipy

In [1]:
import os
import numpy as np
import trimesh
import scipy
import tensorflow as tf
from skimage import measure
from scipy.ndimage import zoom
import random

In [2]:
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("✅ GPU 메모리 점진적 할당 활성화됨")
    except RuntimeError as e:
        print(e)

✅ GPU 메모리 점진적 할당 활성화됨


In [5]:
# [환경 설정] 학습, 데이터, 모델 관련 설정을 한 곳에서 관리
config = {
    # 데이터 경로
    'train_dir': 'C:/Users/konyang/Desktop/project2025/npy 256 파일/merged',   # 훈련 데이터 경로 (.obj 및 .npy/.eseg 포함)
    'test_dir': 'C:/Users/konyang/Desktop/project2025/npy 256 파일/merged',     # 테스트 데이터 경로

    # 모델 하이퍼파라미터
    'input_dim': 3,                      # 정점 특징 수 (기본은 x, y, z)
    'hidden_dim': 64,                    # GNN 중간 계층 차원 수
    'output_dim': 2,                     # 출력 클래스 수 (예: 결절/비결절)

    # 학습 설정
    'lr': 1e-4,                          # 학습률 Ehsms 5e-5
    'batch_size': 1,                     # GNN은 sample-by-sample이 일반적
    'epochs': 10,                        # 학습 반복 횟수
    'seed': 42,                          # 재현성 보장용 시드

    # 입력 크기 정규화 관련 (선택)
    'target_shape': (128, 256, 256),     # .npy 정규화 기준 (3D CT 볼륨)

    # 전처리 설정
    'resize_order': 1,                   # 1: 선형, 0: 최근접 (마스크에 적합)
    'normalize_skip_if_same': True       # 타겟 shape이면 리사이즈 생략 여부
}



'''
# 시드 고정 - 랜덤성을 줄여 결과 재현 가능
tf.random.set_seed(config['seed'])
np.random.seed(config['seed'])
random.seed(config['seed'])
'''

"\n# 시드 고정 - 랜덤성을 줄여 결과 재현 가능\ntf.random.set_seed(config['seed'])\nnp.random.seed(config['seed'])\nrandom.seed(config['seed'])\n"

In [6]:
#GPU 연결 확인
print(tf.config.list_physical_devices('GPU')) 

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [14]:
# [edge_index 생성 함수] .obj 파일에서 face 정보를 기반으로 edge_index 생성

def extract_edge_index_from_faces(mesh):
    """
    trimesh 객체에서 면(face) 정보를 이용하여 edge_index 생성
    반환 형식: TensorFlow Tensor of shape [2, E]
    """
    faces = np.array(mesh.faces)  # 각 face는 정점 3개로 구성됨 (N, 3)

    # 각 face의 세 변을 edge로 간주
    edges = np.vstack([
        faces[:, [0, 1]],
        faces[:, [1, 2]],
        faces[:, [2, 0]],
    ])  # shape (3 * num_faces, 2)

    # 중복 제거를 위해 정렬하고 unique 처리
    edges = np.sort(edges, axis=1)          # 각 edge 내 정점 번호 정렬
    edges = np.unique(edges, axis=0)        # 전체에서 중복 edge 제거

    # (2, E) 형식으로 전치 후 Tensor로 반환
    return tf.convert_to_tensor(edges.T, dtype=tf.int32)


In [15]:
def downsample_graph(vertices, labels, edge_index, max_nodes=5000):
    """
    정점 수가 max_nodes보다 많으면 downsampling 수행
    - vertices: (N, 3)
    - labels: (N,)
    - edge_index: (2, E)
    """

    num_nodes = vertices.shape[0]
    if num_nodes <= max_nodes:
        return vertices, labels, edge_index  # 그대로 반환

    #무작위로 정점 선택
    selected_indices = np.random.choice(num_nodes, size=max_nodes, replace=False)
    selected_indices.sort()  # 정렬하면 디버깅과 유지보수에 유리함

    #정점 및 라벨 downsample
    vertices_ds = vertices[selected_indices]
    labels_ds = labels[selected_indices]

    #index 매핑 테이블 생성 (원래 idx → 새 idx)
    old_to_new = {old: new for new, old in enumerate(selected_indices)}

    #edge_index 필터링: 두 정점 모두 선택된 경우만 유지
    edge_index_np = edge_index.numpy() if hasattr(edge_index, 'numpy') else edge_index
    src, tgt = edge_index_np[0], edge_index_np[1]

    mask = np.isin(src, selected_indices) & np.isin(tgt, selected_indices)
    src_filtered = src[mask]
    tgt_filtered = tgt[mask]

    #새 인덱스로 edge 재매핑
    src_mapped = np.array([old_to_new[i] for i in src_filtered])
    tgt_mapped = np.array([old_to_new[i] for i in tgt_filtered])
    edge_index_ds = np.stack([src_mapped, tgt_mapped], axis=0)  # shape [2, E']

    return vertices_ds, labels_ds, edge_index_ds

In [16]:
def load_obj_mesh(filepath):
    try:
        mesh = trimesh.load(filepath, process=False)
        vertices = np.array(mesh.vertices, dtype=np.float32)
        labels = np.arange(vertices.shape[0], dtype=np.int32)
        edge_index = extract_edge_index_from_faces(mesh)
        vertices, labels, edge_index = downsample_graph(vertices, labels, edge_index, max_nodes=5000)
        return vertices, labels, edge_index
    except Exception as e:
        print(f"파일 로딩 실패: {filepath}")
        print(f"오류: {e}")
        return None, None, None

# [데이터셋 로딩 함수 정의] 지정된 폴더에서 .obj 파일들을 불러와 (정점, 레이블) 쌍으로 구성된 리스트를 반환
def load_dataset(folder):
    data = []
    for fname in sorted(os.listdir(folder)):
        if fname.endswith('.obj'):
            path = os.path.join(folder, fname)
            v, l, e = load_obj_mesh(path)
            if v is not None:
                data.append((v, l, e))  
    print(f"총 {len(data)}개의 .obj 파일이 성공적으로 로딩됨")
    return data
'''
sample_path = os.path.join(config['train_dir'], os.listdir(config['train_dir'])[0])
v, l = load_obj_mesh(sample_path)
print(f"정점 수: {len(v)}")'''

# [데이터셋 로딩 실행] 훈련/테스트 데이터셋을 지정된 폴더에서 불러오기
train_data = load_dataset(config['train_dir'])  # 훈련용 .obj 파일들을 불러와 (정점, 레이블) 쌍 리스트로 저장
#test_data = load_dataset(config['test_dir'])    # 테스트용 .obj 파일들을 동일 방식으로 불러

총 122개의 .obj 파일이 성공적으로 로딩됨


In [17]:
# [정점 레이블 로딩 함수] .eseg 또는 .npy 파일에서 정점별 레이블 불러오기

def load_vertex_labels(label_path):
    """
    정점별 레이블 파일(.eseg 또는 .npy)을 로딩하여 Tensor 반환
    """
    if label_path.endswith('.npy'):
        labels = np.load(label_path)  # (N,) shape
    elif label_path.endswith('.eseg'):
        with open(label_path, 'r') as f:
            labels = np.array([int(line.strip()) for line in f if line.strip().isdigit()])
    else:
        raise ValueError("지원되지 않는 레이블 파일 형식입니다: .npy 또는 .eseg만 지원")

    return tf.convert_to_tensor(labels, dtype=tf.int32)  # (N,) shape


In [70]:
class FeaStConvTF(tf.Module):
    def __init__(self, in_channels, out_channels, heads=8, t_inv=True, name=None):
        super().__init__(name=name)
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.heads = heads
        self.t_inv = t_inv

        initializer = tf.initializers.GlorotUniform()

        self.weight = tf.Variable(initializer([in_channels, heads * out_channels]), trainable=True, name="weight")
        self.u = tf.Variable(initializer([in_channels, heads]), trainable=True, name="u")
        self.c = tf.Variable(tf.zeros([heads]), trainable=True, name="c")
        self.bias = tf.Variable(tf.zeros([out_channels]), trainable=True, name="bias")

        if not t_inv:
            self.v = tf.Variable(initializer([in_channels, heads]), trainable=True, name="v")

    def __call__(self, x, edge_index):
        row, col = edge_index[0], edge_index[1]
        x_i = tf.gather(x, row)
        x_j = tf.gather(x, col)

        if self.t_inv:
            q = tf.matmul(x_i - x_j, self.u) + self.c
        else:
            q = tf.matmul(x_i, self.u) + tf.matmul(x_j, self.v) + self.c

        q = tf.nn.softmax(q, axis=1)
        xj_proj = tf.matmul(x_j, self.weight)
        xj_proj = tf.reshape(xj_proj, [-1, self.heads, self.out_channels])

        out = xj_proj * tf.expand_dims(q, axis=-1)
        out = tf.reshape(out, [-1, self.heads * self.out_channels])
        out = tf.math.unsorted_segment_mean(out, row, num_segments=tf.shape(x)[0])
        out = tf.reshape(out, [-1, self.heads, self.out_channels])
        out = tf.reduce_sum(out, axis=1)

        return out + self.bias


In [71]:
class FeaStNetTF(tf.Module):
    def __init__(self, input_dim, hidden_dim, t_inv=True, name=None):
        super().__init__(name=name)
        self.conv1 = FeaStConvTF(input_dim, hidden_dim, t_inv=t_inv, name="conv1")
        self.conv2 = FeaStConvTF(hidden_dim, hidden_dim, t_inv=t_inv, name="conv2")

        # 정점별 검출기
        self.seg_fc = tf.Variable(tf.random.normal([hidden_dim, 1], stddev=0.1), trainable=True, name="seg_fc")
        self.seg_bias = tf.Variable(tf.zeros([1]), trainable=True, name="seg_bias")

    def __call__(self, x, edge_index, training=False):
        h = tf.nn.relu(self.conv1(x, edge_index))
        h = tf.nn.relu(self.conv2(h, edge_index))

        seg_logits = tf.matmul(h, self.seg_fc) + self.seg_bias  # shape: (N, 1)
        return tf.squeeze(seg_logits, axis=1)  # shape: (N,)

In [72]:
# [모델 및 최적화기 초기화] FeaStNetTF 모델 생성 및 Adam 옵티마이저 설정

# Adam 옵티마이저 정의 (학습률은 config에서 설정한 값 사용)
optimizer = tf.optimizers.Adam(learning_rate=config['lr'])

# 모델 인스턴스 생성: 입력 차원, 은닉 차원, 출력 클래스 수를 전달
model = FeaStNetTF(
    input_dim=config['input_dim'],
    hidden_dim=config['hidden_dim'],
    t_inv=True
)

In [73]:
def get_model_variables(model):
    variables = []
    for attr in model.__dict__.values():
        if isinstance(attr, tf.Variable):
            variables.append(attr)
        elif isinstance(attr, tf.Module):  # conv1, conv2 내부도 포함
            variables.extend(get_model_variables(attr))
    return variables

In [74]:
def evaluate(dataset, threshold=0.5):
    all_preds, all_labels = [], []

    for v_np, l_np, edge_index in dataset:
        N = len(v_np)
        l_np_cleaned = l_np[l_np < N]
        l_np_binary = np.zeros(N, dtype=np.float32)
        l_np_binary[l_np_cleaned] = 1.0

        x = tf.convert_to_tensor(v_np, dtype=tf.float32)
        y = tf.convert_to_tensor(l_np_binary, dtype=tf.float32)

        logits = model(x, edge_index, training=False)
        probs = tf.sigmoid(logits)
        preds = tf.cast(probs > threshold, tf.float32)

        all_preds.append(preds)
        all_labels.append(y)

    preds_concat = tf.concat(all_preds, axis=0)
    labels_concat = tf.concat(all_labels, axis=0)

    acc = tf.reduce_mean(tf.cast(tf.equal(preds_concat, labels_concat), tf.float32)).numpy()
    return acc

In [75]:
# [학습 및 평가 함수 정의] 1 epoch 학습 및 정확도 평가를 수행하는 함수들

def train_epoch(dataset):
    total_loss = 0

    for v_np, l_np, edge_index in dataset:
        # 정점 수 확인
        N = len(v_np)

        # ⚠️ 라벨 이진화: l_np는 label=1인 정점들의 index (ID)
        l_np_cleaned = l_np[l_np < N]
        l_np_binary = np.zeros(N, dtype=np.float32)
        l_np_binary[l_np_cleaned] = 1.0

        x = tf.convert_to_tensor(v_np, dtype=tf.float32)
        y = tf.convert_to_tensor(l_np_binary, dtype=tf.float32)

        with tf.GradientTape() as tape:
            logits = model(x, edge_index, training=True)  # (N,)
            loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels=y, logits=logits))

        variables = get_model_variables(model)
        grads = tape.gradient(loss, variables)
        optimizer.apply_gradients(zip(grads, variables))

        total_loss += float(loss)

    return total_loss / len(dataset)


def compute_metrics(pred, target, threshold=0.5):
    pred_bin = tf.cast(pred > threshold, tf.float32)
    target = tf.cast(target, tf.float32)

    intersection = tf.reduce_sum(pred_bin * target)
    union = tf.reduce_sum(pred_bin) + tf.reduce_sum(target)
    iou = intersection / (union - intersection + 1e-8)
    dice = 2.0 * intersection / (tf.reduce_sum(pred_bin) + tf.reduce_sum(target) + 1e-8)
    acc = tf.reduce_mean(tf.cast(tf.equal(pred_bin, target), tf.float32))

    return {
        'IoU': iou.numpy(),
        'Dice': dice.numpy(),
        'Accuracy': acc.numpy()
    }

# [학습 루프 실행] 에폭 단위로 학습 및 정확도 평가 반복
for epoch in range(config['epochs']):
    loss = train_epoch(train_data)
    acc = evaluate(train_data)

    # 디버깅: 샘플에서 확인
    v_np, l_np, edge_index = train_data[0]
    N = len(v_np)
    l_np_cleaned = l_np[l_np < N]
    l_np_binary = np.zeros(N, dtype=np.float32)
    l_np_binary[l_np_cleaned] = 1.0

    x = tf.convert_to_tensor(v_np, dtype=tf.float32)
    y = tf.convert_to_tensor(l_np_binary, dtype=tf.float32)
    logits = model(x, edge_index, training=False)
    loss_debug = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels=y, logits=logits))

    print("logits mean:", tf.reduce_mean(logits).numpy())
    print("logits std:", tf.math.reduce_std(logits).numpy())
    print("loss (sample):", loss_debug.numpy())
    print("y unique values:", tf.unique(y).y.numpy())
    print(f"Epoch {epoch+1}: Loss={loss:.4f}, Accuracy={acc:.2%}")


logits mean: -1.211158
logits std: 0.739871
loss (sample): 0.3191718
y unique values: [0. 1.]
Epoch 1: Loss=0.5205, Accuracy=97.86%
logits mean: -2.5724735
logits std: 0.6421729
loss (sample): 0.14392419
y unique values: [0. 1.]
Epoch 2: Loss=0.2059, Accuracy=97.86%
logits mean: -3.2914374
logits std: 0.14875704
loss (sample): 0.118090734
y unique values: [0. 1.]
Epoch 3: Loss=0.1179, Accuracy=97.86%
logits mean: -3.6616876
logits std: 0.061561324
loss (sample): 0.11611211
y unique values: [0. 1.]
Epoch 4: Loss=0.1048, Accuracy=97.86%
logits mean: -3.779268
logits std: 0.06463014
loss (sample): 0.11636095
y unique values: [0. 1.]
Epoch 5: Loss=0.1035, Accuracy=97.86%
logits mean: -3.810223
logits std: 0.06643585
loss (sample): 0.11646592
y unique values: [0. 1.]
Epoch 6: Loss=0.1034, Accuracy=97.86%
logits mean: -3.8160946
logits std: 0.06517797
loss (sample): 0.116480544
y unique values: [0. 1.]
Epoch 7: Loss=0.1034, Accuracy=97.86%
logits mean: -3.8155613
logits std: 0.056374505
loss

In [None]:
print("train_data size:", len(train_data))
print("Train dir exists:", os.path.exists(config['train_dir']))
print("Files:", os.listdir(config['train_dir']))

In [None]:
files = os.listdir(train_data)
print(f"총 파일 수: {len(files)}")
print("예시 파일:", files[:5])

In [39]:
for v_np, l_np, edge_index in train_data:
    x = tf.convert_to_tensor(v_np, dtype=tf.float32)
    y = tf.convert_to_tensor(l_np, dtype=tf.float32)

    with tf.GradientTape() as tape:
        logits = model(x, edge_index, training=True)  # (N,)
        loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels=y, logits=logits))

    grads = tape.gradient(loss, model.get_variables())
    optimizer.apply_gradients(zip(grads, model.get_variables()))

    print("logits shape:", logits.shape)
    print("labels shape:", y.shape)


AttributeError: 'FeaStNetTF' object has no attribute 'get_variables'

In [77]:
for v_np, l_np, _ in train_data[:3]:
    print("l_np unique values:", np.unique(l_np)[:10])

l_np unique values: [ 46 108 138 215 256 372 384 452 465 466]
l_np unique values: [ 46  77  87 155 157 164 172 175 212 218]
l_np unique values: [ 26  40  73 164 203 290 447 452 474 487]


In [78]:
print("label 1 비율:", np.mean(l_np_binary))  # 너무 적으면 불균형


label 1 비율: 0.0248
