In [40]:
import os
import re
import math
import json
from pathlib import Path
from typing import List, Tuple, Dict

import pandas as pd
import numpy as np
from PIL import Image
from tqdm import tqdm

import torch
import torch.nn as nn
import torchvision.transforms as T
from torchvision import models

from sklearn.neighbors import KNeighborsRegressor

PROJECT_DIR = Path(r"c:\\Users\\Charlotte\\Desktop\\dissertation\\US_new")
IMAGES_DIR = PROJECT_DIR / "High_quality_images"
ANNOTATION_FILE = PROJECT_DIR / "annotation_points_summary2.xlsx"
OUTPUT_FILE = PROJECT_DIR / "annotation_points_predicted.xlsx"

assert IMAGES_DIR.exists(), f"Image directory not found: {IMAGES_DIR}"
assert ANNOTATION_FILE.exists(), f"Annotation Excel not found: {ANNOTATION_FILE}"

df_raw = pd.read_excel(ANNOTATION_FILE)
print("Original annotation headers:", list(df_raw.columns))
print("Original row count (long format, one line segment per row):", len(df_raw))

is_long_format = 'Label' in df_raw.columns and df_raw.groupby('Filename').size().max() > 1

if is_long_format:
    print("Long format data detected, converting to wide format (one row per image with all 6 points)...")
    
    def get_segment_order(label_str):
        label_lower = str(label_str).lower()
        if 'medial' in label_lower:
            return 1
        elif 'femoral' in label_lower:
            return 2
        elif 'lateral' in label_lower:
            return 3
        else:
            return 0
    
    df_raw['__seg_order__'] = df_raw['Label'].map(get_segment_order)
    df_raw = df_raw.sort_values(['Filename', '__seg_order__'])
    
    grouped = df_raw.groupby('Filename')
    
    records = []
    for fn, group in grouped:
        if len(group) != 3:
            print(f"Warning: {fn} has {len(group)} line segments (expected 3), skipping")
            continue
        
        row_dict = {'Filename': fn}
        for seg_idx, (_, seg_row) in enumerate(group.iterrows(), 1):
            row_dict[f'x{seg_idx*2-1}'] = seg_row['x1']
            row_dict[f'y{seg_idx*2-1}'] = seg_row['y1']
            row_dict[f'x{seg_idx*2}'] = seg_row['x2']
            row_dict[f'y{seg_idx*2}'] = seg_row['y2']
            if seg_idx <= 3:
                row_dict[f'seg{seg_idx}_label'] = seg_row['Label']
        
        records.append(row_dict)
    
    df = pd.DataFrame(records)
    print(f"Converted to wide format: {len(df)} images, each with 12 coordinate columns (x1,y1,...,x6,y6)")
    print("Converted columns:", list(df.columns))
else:
    df = df_raw
    print("Data is already in wide format, no conversion needed")

possible_image_cols = [
    'Filename', 'filename', 'image', 'img', 'image_path', 'path', 'Image', 'ImageName', 'FileName'
]
image_col = None
for c in df.columns:
    if c in possible_image_cols:
        image_col = c
        break
if image_col is None:
    for c in df.columns:
        if df[c].astype(str).str.contains(r"\.(jpg|jpeg|png|bmp|webp)$", case=False, na=False).any():
            image_col = c
            break
assert image_col is not None, "Cannot automatically identify image file column, please specify image_col manually"
print("Identified image column:", image_col)

x_cols = [f'x{i}' for i in range(1, 7) if f'x{i}' in df.columns]
y_cols = [f'y{i}' for i in range(1, 7) if f'y{i}' in df.columns]

pairs: List[Tuple[str, str]] = []
if len(x_cols) == len(y_cols) and len(x_cols) == 6:
    for i in range(6):
        pairs.append((x_cols[i], y_cols[i]))
    target_cols: List[str] = [p for xy in pairs for p in xy]
    print(f"Successfully identified 6 point pairs: {pairs}")
else:
    numeric_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])]
    x_like = [c for c in df.columns if re.search(r"(^|[^a-zA-Z])(x|X)(\d+)?$|(_x$)|(_X$)", str(c))]
    y_like = [c for c in df.columns if re.search(r"(^|[^a-zA-Z])(y|Y)(\d+)?$|(_y$)|(_Y$)", str(c))]
    
    if x_like and y_like:
        def idx_of(col: str) -> int:
            m = re.search(r"(\d+)$", col)
            return int(m.group(1)) if m else -1
        x_sorted = sorted(x_like, key=lambda c: (idx_of(str(c)), str(c)))
        y_sorted = sorted(y_like, key=lambda c: (idx_of(str(c)), str(c)))
        for x_c, y_c in zip(x_sorted, y_sorted):
            pairs.append((x_c, y_c))
        target_cols: List[str] = [p for xy in pairs for p in xy]
    else:
        exclude_like = {'id', 'index', 'idx'}
        target_cols = [c for c in numeric_cols if str(c).lower() not in exclude_like]

assert len(target_cols) > 0, "No valid annotation numeric columns found, please check Excel column names"
print("Annotation columns for regression (should be 12 columns, 6 point pairs):", target_cols)
print(f"Number of point pairs: {len(pairs)}")

def normalize_filename(name: str) -> str:
    name = str(name)
    name = Path(name).stem
    name = name.replace("_", " ")
    name = re.sub(r"\s+", " ", name).strip()
    name = re.sub(r"[\s_]*\(?(\d+)\)?$", r" (\1)", name)
    return name.lower()

image_index = {}
for p in IMAGES_DIR.rglob("*"):
    if p.suffix.lower() in {".jpg", ".jpeg", ".png", ".bmp", ".webp"}:
        key = normalize_filename(p.stem)
        image_index[key] = p

def resolve_image_path(name: str) -> Path:
    key = normalize_filename(name)
    return image_index.get(key, None)

df["__image_path__"] = df[image_col].map(resolve_image_path)
exists_mask = df["__image_path__"].notnull() & df["__image_path__"].map(lambda p: p.exists())
print(f"Found {exists_mask.sum()} valid image matches.")

if not exists_mask.all():
    missing_count = (~exists_mask).sum()
    print(f"Warning: {missing_count} records in the annotation table cannot find corresponding images and will be ignored.")

df_labeled = df[exists_mask].copy()
print("Available annotated samples:", len(df_labeled))

all_images = [
    p for p in IMAGES_DIR.rglob("*") 
    if p.suffix.lower() in {".jpg", ".jpeg", ".png", ".bmp", ".webp"}
]
set_labeled = set(df_labeled["__image_path__"].map(lambda p: p.resolve()))
unlabeled_images = [p for p in all_images if p.resolve() not in set_labeled]
print("Number of unlabeled images:", len(unlabeled_images))

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

transform = T.Compose([
    T.Resize(256),
    T.CenterCrop(224),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

resnet = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
backbone = nn.Sequential(*list(resnet.children())[:-1]).to(device)
backbone.eval()

@torch.inference_mode()
def image_to_embedding(img_path: Path) -> np.ndarray:
    try:
        img = Image.open(img_path).convert('RGB')
        x = transform(img).unsqueeze(0).to(device)
        feat = backbone(x)
        feat = feat.view(feat.size(0), -1).cpu().numpy()[0]
        return feat.astype(np.float32)
    except Exception as e:
        print(f"Feature extraction failed: {img_path} -> {e}")
        return np.zeros((512,), dtype=np.float32)

print("Initialization completed.")

Original annotation headers: ['Filename', 'Label', 'x1', 'y1', 'x2', 'y2', 'Pixel_Distance']
Original row count (long format, one line segment per row): 1374
Long format data detected, converting to wide format (one row per image with all 6 points)...
Converted to wide format: 458 images, each with 12 coordinate columns (x1,y1,...,x6,y6)
Converted columns: ['Filename', 'x1', 'y1', 'x2', 'y2', 'seg1_label', 'x3', 'y3', 'x4', 'y4', 'seg2_label', 'x5', 'y5', 'x6', 'y6', 'seg3_label']
Identified image column: Filename
Successfully identified 6 point pairs: [('x1', 'y1'), ('x2', 'y2'), ('x3', 'y3'), ('x4', 'y4'), ('x5', 'y5'), ('x6', 'y6')]
Annotation columns for regression (should be 12 columns, 6 point pairs): ['x1', 'y1', 'x2', 'y2', 'x3', 'y3', 'x4', 'y4', 'x5', 'y5', 'x6', 'y6']
Number of point pairs: 6
Found 458 valid image matches.
Available annotated samples: 458
Number of unlabeled images: 721
Using device: cpu
Initialization completed.


In [41]:
from time import time
from typing import Tuple

CACHE_DIR = PROJECT_DIR / "emb_cache"
CACHE_DIR.mkdir(exist_ok=True)
LABELED_CACHE = CACHE_DIR / "labeled_embeddings.npz"
UNLABELED_CACHE = CACHE_DIR / "unlabeled_embeddings.npz"

def get_image_size(p: Path) -> Tuple[int, int]:
    try:
        with Image.open(p) as im:
            return im.size  # (width, height)
    except Exception as e:
        print(f"Failed to read image size: {p} -> {e}")
        return (1, 1)

# Add dimensions to labeled samples
sizes_labeled = df_labeled["__image_path__"].map(get_image_size)
df_labeled["__w__"] = sizes_labeled.map(lambda s: s[0])
df_labeled["__h__"] = sizes_labeled.map(lambda s: s[1])

# Dimensions of unlabeled samples
sizes_unlabeled = [get_image_size(p) for p in unlabeled_images]

def compute_embeddings(paths: List[Path]) -> np.ndarray:
    feats = []
    for p in tqdm(paths, desc="Extracting embeddings"):
        feats.append(image_to_embedding(p))
    return np.stack(feats, axis=0) if len(feats) else np.zeros((0, 512), dtype=np.float32)

# Cache or load labeled embeddings
if LABELED_CACHE.exists():
    z = np.load(LABELED_CACHE, allow_pickle=True)
    emb_labeled = z["embeddings"]
    cached_paths = list(z["paths"])  # as list of strings
    cur_paths = [str(p) for p in df_labeled["__image_path__"].tolist()]
    # Recompute if cached paths don't match current paths or dimensions mismatch
    if cached_paths != cur_paths or emb_labeled.shape[0] != len(cur_paths):
        emb_labeled = compute_embeddings(df_labeled["__image_path__"].tolist())
        np.savez_compressed(LABELED_CACHE, embeddings=emb_labeled, paths=np.array(cur_paths, dtype=object))
else:
    emb_labeled = compute_embeddings(df_labeled["__image_path__"].tolist())
    np.savez_compressed(LABELED_CACHE, embeddings=emb_labeled, paths=np.array([str(p) for p in df_labeled["__image_path__"].tolist()], dtype=object))

# Cache or load unlabeled embeddings
if UNLABELED_CACHE.exists():
    z = np.load(UNLABELED_CACHE, allow_pickle=True)
    emb_unlabeled = z["embeddings"]
    cached_paths_un = list(z["paths"])  # as list of strings
    cur_paths_un = [str(p) for p in unlabeled_images]
    # Recompute if cached paths don't match current paths or dimensions mismatch
    if cached_paths_un != cur_paths_un or emb_unlabeled.shape[0] != len(cur_paths_un):
        emb_unlabeled = compute_embeddings(unlabeled_images)
        np.savez_compressed(UNLABELED_CACHE, embeddings=emb_unlabeled, paths=np.array(cur_paths_un, dtype=object))
else:
    emb_unlabeled = compute_embeddings(unlabeled_images)
    np.savez_compressed(UNLABELED_CACHE, embeddings=emb_unlabeled, paths=np.array([str(p) for p in unlabeled_images], dtype=object))

print("Labeled embeddings shape:", emb_labeled.shape, "Unlabeled embeddings shape:", emb_unlabeled.shape)
print("Sample path:", df_labeled["__image_path__"].iloc[0] if len(df_labeled) else None)

Extracting embeddings: 100%|█████████████████████████████████████████████████████████| 458/458 [00:33<00:00, 13.67it/s]


Labeled embeddings shape: (458, 512) Unlabeled embeddings shape: (721, 512)
Sample path: c:\Users\Charlotte\Desktop\dissertation\US_new\High_quality_images\Abbey 001_v1_contralateral_base (1).jpg


In [42]:
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import NearestNeighbors

has_pairs = len(pairs) > 0

def normalize_targets(df_like: pd.DataFrame, target_cols: List[str], pairs: List[Tuple[str, str]]):
    Y = df_like[target_cols].to_numpy(dtype=float)
    if not has_pairs:
        return Y, None
    Yn = Y.copy()
    for i, (xc, yc) in enumerate(pairs):
        x_idx = target_cols.index(xc)
        y_idx = target_cols.index(yc)
        w = df_like["__w__"].to_numpy(dtype=float)
        h = df_like["__h__"].to_numpy(dtype=float)
        Yn[:, x_idx] = Y[:, x_idx] / np.clip(w, 1.0, None)
        Yn[:, y_idx] = Y[:, y_idx] / np.clip(h, 1.0, None)
    return Yn, ("paired",)

Yn_labeled, _ = normalize_targets(df_labeled, target_cols, pairs)

knn = KNeighborsRegressor(n_neighbors=5, weights='distance', metric='cosine')
knn.fit(emb_labeled, Yn_labeled)

MAX_COSINE_DISTANCE = 0.6

nbrs = NearestNeighbors(n_neighbors=5, metric='cosine').fit(emb_labeled)
dists, _ = nbrs.kneighbors(emb_unlabeled, return_distance=True)
median_dist = np.median(dists, axis=1) if dists.size else np.array([])

Ypred_norm = knn.predict(emb_unlabeled) if emb_unlabeled.shape[0] > 0 else np.zeros((0, len(target_cols)))

if has_pairs and len(unlabeled_images) > 0:
    widths = np.array([w for (w, h) in sizes_unlabeled], dtype=float)
    heights = np.array([h for (w, h) in sizes_unlabeled], dtype=float)
    Ypred = Ypred_norm.copy()
    for (xc, yc) in pairs:
        x_idx = target_cols.index(xc)
        y_idx = target_cols.index(yc)
        Ypred[:, x_idx] = Ypred_norm[:, x_idx] * np.clip(widths, 1.0, None)
        Ypred[:, y_idx] = Ypred_norm[:, y_idx] * np.clip(heights, 1.0, None)
else:
    Ypred = Ypred_norm

if median_dist.size:
    far_mask = median_dist > MAX_COSINE_DISTANCE
    if far_mask.any():
        Ypred[far_mask, :] = np.nan
        print(f"{int(far_mask.sum())} unlabeled samples are too dissimilar to labeled samples, set to NaN to avoid misleading predictions.")

unlabeled_names = [p.name for p in unlabeled_images]
res_df = pd.DataFrame({"filename": unlabeled_names})
for i, col in enumerate(target_cols):
    res_df[col] = Ypred[:, i] if Ypred.shape[0] else []

# Add three label columns (fixed labels since order is confirmed: seg1=medial, seg2=femoral, seg3=lateral)
res_df["seg1_label"] = "Medial condyle"
res_df["seg2_label"] = "Femoral condyle"
res_df["seg3_label"] = "lateral_thickness"

out_path = OUTPUT_FILE
if out_path.exists():
    stem = out_path.stem
    out_path = out_path.with_name(f"{stem}_v{int(time())}.xlsx")

res_df.to_excel(out_path, index=False)
print("Prediction results saved to:", out_path)

print("Completed: KNN-based coordinate prediction using visual similarity reference.")

Prediction results saved to: c:\Users\Charlotte\Desktop\dissertation\US_new\annotation_points_predicted_v1762770463.xlsx
Completed: KNN-based coordinate prediction using visual similarity reference.


In [43]:
assert 'Ypred' in globals(), "Please run the previous cell first to generate Ypred"
assert 'unlabeled_images' in globals(), "Variable unlabeled_images does not exist"

num_pairs = len(pairs) if len(pairs) > 0 else (len(target_cols) // 2)
if num_pairs != 6:
    print(f"Note: Currently detected {num_pairs} point pairs, which is inconsistent with the expected 3 line segments (=6 points). Will export as much as possible in the current order.")

# Construct line segment column names
seg_cols = []
for seg_idx in range(3):
    seg_cols.extend([
        f"seg{seg_idx+1}_x1", f"seg{seg_idx+1}_y1", f"seg{seg_idx+1}_x2", f"seg{seg_idx+1}_y2"
    ])

# Extract first 12 columns from Ypred (fill with NaN if insufficient, truncate if excess)
P = np.full((Ypred.shape[0], 12), np.nan, dtype=float)
use_cols = min(12, Ypred.shape[1])
P[:, :use_cols] = Ypred[:, :use_cols]

seg_df = pd.DataFrame({"filename": [p.name for p in unlabeled_images]})
for i, col in enumerate(seg_cols):
    seg_df[col] = P[:, i]

# Add three label columns
seg_df["seg1_label"] = "Medial condyle"
seg_df["seg2_label"] = "Femoral condyle"
seg_df["seg3_label"] = "lateral_thickness"

# Optional: Calculate length of each line segment (in pixels)
for seg_idx in range(3):
    x1 = seg_df[f"seg{seg_idx+1}_x1"].to_numpy(dtype=float)
    y1 = seg_df[f"seg{seg_idx+1}_y1"].to_numpy(dtype=float)
    x2 = seg_df[f"seg{seg_idx+1}_x2"].to_numpy(dtype=float)
    y2 = seg_df[f"seg{seg_idx+1}_y2"].to_numpy(dtype=float)
    seg_df[f"seg{seg_idx+1}_length"] = np.sqrt((x2-x1)**2 + (y2-y1)**2)

out2 = OUTPUT_FILE.with_name(OUTPUT_FILE.stem + "_segments.xlsx")
seg_df.to_excel(out2, index=False)
print("Line segment results saved to:", out2)

Line segment results saved to: c:\Users\Charlotte\Desktop\dissertation\US_new\annotation_points_predicted_segments.xlsx


## Accuracy

### resnet+KNN

In [45]:
import pandas as pd
import re

# 文件路径
pred_file = r"c:\Users\Charlotte\Desktop\dissertation\US_new\annotation_points_predicted_segments.xlsx"
output_file = r"c:\Users\Charlotte\Desktop\dissertation\US_new\annotation_points_predicted_summary.xlsx"

# 转换参数
pixel_to_mm = 0.003985
linear_slope = 0.5752
linear_intercept = 0.0743

# 读取预测数据
df = pd.read_excel(pred_file)
df['seg1_cm_corrected'] = df['seg1_length'] * 0.003985 * linear_slope + linear_intercept
df['seg2_cm_corrected'] = df['seg2_length'] * 0.003985 * linear_slope + linear_intercept
df['seg3_cm_corrected'] = df['seg3_length'] * 0.003985 * linear_slope + linear_intercept

# 先转换长度：像素 -> mm -> 线性校正
for i in range(1,4):
    df[f'seg{i}_mm_raw'] = df[f'seg{i}_length'] * pixel_to_mm
    df[f'seg{i}_mm_corrected'] = df[f'seg{i}_mm_raw'] * linear_slope + linear_intercept

# 定义函数提取 Patient / Stage / Location / Part
def extract_info(filename, label):
    fn = str(filename)
    lb = str(label).lower()

    # Patient: Abbey 024_v1 → Abbey 024
    patient_match = re.search(r"Abbey[_\s]*\d{1,3}", fn, re.I)
    patient = patient_match.group(0).replace("_", " ").strip() if patient_match else None

    # Stage: v1/v2/v3/v4/v5 → base/3/4/5
    stage = None
    if "v1" in fn.lower():
        stage = "base"
    elif "v3" in fn.lower():
        stage = "3"
    elif "v4" in fn.lower():
        stage = "4"
    elif "v5" in fn.lower():
        stage = "5"
    else:
        stage = "base"

    # Location
    location = "contralateral" if "contralateral" in fn.lower() else ("treat" if "treat" in fn.lower() else None)

    # Part
    if "medial" in lb:
        part = "medial"
    elif "femoral" in lb:
        part = "femoral"
    elif "lateral" in lb:
        part = "lateral"
    else:
        part = None

    return patient, stage, location, part

# 提取信息
records = []
for idx, row in df.iterrows():
    for i in range(1,4):
        patient, stage, location, part = extract_info(row['filename'], row[f'seg{i}_label'])
        value = row[f'seg{i}_mm_corrected']
        records.append({
            'Patient': patient,
            'Stage': stage,
            'Location': location,
            'Part': part,
            'Predicted_mm': value
        })

pred_df_long = pd.DataFrame(records)

# 分组平均（同患者/阶段/部位可能有多张图）
grouped = pred_df_long.groupby(['Patient','Stage','Location','Part'])['Predicted_mm'].mean().reset_index()

# 透视表
pivot = grouped.pivot_table(index='Patient', columns=['Part','Location','Stage'], values='Predicted_mm')

# 重命名列
pivot.columns = [f"US_{p}_{l}_{s}" for (p,l,s) in pivot.columns]
pivot.reset_index(inplace=True)

# 固定列顺序（保持和你之前标注表一样）
cols = ["Patient",
    "US_medial_treat_base","US_femoral_treat_base","US_lateral_treat_base",
    "US_medial_contralateral_base","US_femoral_contralateral_base","US_lateral_contralateral_base",
    "US_medial_treat_3","US_femoral_treat_3","US_lateral_treat_3",
    "US_medial_contralateral_3","US_femoral_contralateral_3","US_lateral_contralateral_3",
    "US_medial_treat_4","US_femoral_treat_4","US_lateral_treat_4",
    "US_medial_contralateral_4","US_femoral_contralateral_4","US_lateral_contralateral_4",
    "US_medial_treat_5","US_femoral_treat_5","US_lateral_treat_5",
    "US_medial_contralateral_5","US_femoral_contralateral_5","US_lateral_contralateral_5"
]

for c in cols:
    if c not in pivot.columns:
        pivot[c] = None
pivot = pivot[cols]

# 保存结果
pivot.to_excel(output_file, index=False)
print(f"Predicted summary saved as {output_file}")


Predicted summary saved as c:\Users\Charlotte\Desktop\dissertation\US_new\annotation_points_predicted_summary.xlsx


In [46]:
import pandas as pd
import re

df = pd.read_excel("annotation_points_summary.xlsx")

def extract_info(filename, label):
    fn = str(filename)
    lb = str(label).lower()

    # Patient: Abbey_001 → Abbey 001
    patient_match = re.search(r"Abbey[_\s]*\d{1,3}", fn, re.I)
    patient = patient_match.group(0).replace("_", " ").strip() if patient_match else None

    # Stage: 匹配 base 或 v3/v4/v5
    stage_match = re.search(r"(base|v\d+)", fn, re.I)
    # Stage: 修正版
    stage = None
    if "v1" in fn.lower():
        stage = "base"
    elif "v3" in fn.lower():
        stage = "3"
    elif "v4" in fn.lower():
        stage = "4"
    elif "v5" in fn.lower():
        stage = "5"
    else:
        # 兜底匹配 base
        if "base" in fn.lower():
            stage = "base"


    # Location
    location = "contralateral" if "contralateral" in fn.lower() else ("treat" if "treat" in fn.lower() else None)

    # Part from Label
    if "medial" in lb:
        part = "medial"
    elif "femoral" in lb:
        part = "femoral"
    elif "lateral" in lb:
        part = "lateral"
    else:
        part = None

    return patient, stage, location, part

# 提取
df[["Patient", "Stage", "Location", "Part"]] = df.apply(lambda x: pd.Series(extract_info(x["Filename"], x["Label"])), axis=1)

# 分组平均
grouped = df.groupby(["Patient", "Stage", "Location", "Part"])["Pixel_Distance"].mean().reset_index()

# 透视
pivot = grouped.pivot_table(index="Patient", columns=["Part","Location","Stage"], values="Pixel_Distance")
pivot.columns = [f"US_{p}_{l}_{s}" for (p,l,s) in pivot.columns]
pivot.reset_index(inplace=True)

# 固定列顺序
cols = ["Patient",
    "US_medial_treat_base","US_femoral_treat_base","US_lateral_treat_base",
    "US_medial_contralateral_base","US_femoral_contralateral_base","US_lateral_contralateral_base",
    "US_medial_treat_3","US_femoral_treat_3","US_lateral_treat_3",
    "US_medial_contralateral_3","US_femoral_contralateral_3","US_lateral_contralateral_3",
    "US_medial_treat_4","US_femoral_treat_4","US_lateral_treat_4",
    "US_medial_contralateral_4","US_femoral_contralateral_4","US_lateral_contralateral_4",
    "US_medial_treat_5","US_femoral_treat_5","US_lateral_treat_5",
    "US_medial_contralateral_5","US_femoral_contralateral_5","US_lateral_contralateral_5"
]

for c in cols:
    if c not in pivot.columns:
        pivot[c] = None
pivot = pivot[cols]

pivot.to_excel("annotation_points_summary_averaged_formatted.xlsx", index=False)
print("Book-format summary saved as annotation_points_summary_averaged_formatted.xlsx")

Book-format summary saved as annotation_points_summary_averaged_formatted.xlsx


In [47]:
import pandas as pd
import numpy as np
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from scipy.stats import pearsonr

# 文件路径
pred_file = r"c:\Users\Charlotte\Desktop\dissertation\US_new\annotation_points_predicted_summary.xlsx"
manual_file = r"c:\Users\Charlotte\Desktop\dissertation\US_new\Book2.xlsx"
output_file = r"c:\Users\Charlotte\Desktop\dissertation\US_new\prediction_vs_manual_metrics_filtered.xlsx"

# 读取数据
pred_df = pd.read_excel(pred_file)
manual_df = pd.read_excel(manual_file)

# 只保留 Book2 中存在的患者
common_patients = manual_df['Patient'].unique()
pred_df = pred_df[pred_df['Patient'].isin(common_patients)].copy()

# 找到 US_* 列（预测表和手工表都可能存在空列）
pred_cols = [c for c in pred_df.columns if c.startswith("US_")]
manual_cols = [c for c in manual_df.columns if c.startswith("US_")]

# 取两者的交集列
common_cols = list(set(pred_cols).intersection(set(manual_cols)))

if len(common_cols) == 0:
    raise ValueError("No matching US_* columns found between predicted and manual data!")

# 合并预测和手工表
merged_df = pd.merge(pred_df[['Patient'] + common_cols], manual_df[['Patient'] + common_cols], 
                     on='Patient', how='inner', suffixes=('_pred', '_manual'))

# 创建结果列表
results = []

for col in common_cols:
    pred_col = col + '_pred'
    manual_col = col + '_manual'
    
    # 过滤掉缺失值
    mask = (~merged_df[pred_col].isna()) & (~merged_df[manual_col].isna())
    if mask.sum() == 0:
        continue
    
    y_pred = merged_df.loc[mask, pred_col]
    y_true = merged_df.loc[mask, manual_col]
    
    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    r2 = r2_score(y_true, y_pred)
    r, _ = pearsonr(y_true, y_pred)
    
    results.append({
        'US_part': col,
        'MAE_cm': mae,
        'RMSE_cm': rmse,
        'R2': r2,
        'Pearson_r': r
    })

import pandas as pd
import numpy as np
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from scipy.stats import pearsonr

# 文件路径
pred_file = r"c:\Users\Charlotte\Desktop\dissertation\US_new\annotation_points_predicted_summary.xlsx"
manual_file = r"c:\Users\Charlotte\Desktop\dissertation\US_new\Book2.xlsx"
output_file = r"c:\Users\Charlotte\Desktop\dissertation\US_new\prediction_vs_manual_metrics_filtered.xlsx"

# 读取数据
pred_df = pd.read_excel(pred_file)
manual_df = pd.read_excel(manual_file)

# 只保留 Book2 中存在的患者
common_patients = manual_df['Patient'].unique()
pred_df = pred_df[pred_df['Patient'].isin(common_patients)].copy()

# 找到 US_* 列（预测表和手工表都可能存在空列）
pred_cols = [c for c in pred_df.columns if c.startswith("US_")]
manual_cols = [c for c in manual_df.columns if c.startswith("US_")]

# 取两者的交集列
common_cols = list(set(pred_cols).intersection(set(manual_cols)))

if len(common_cols) == 0:
    raise ValueError("No matching US_* columns found between predicted and manual data!")

# 合并预测和手工表
merged_df = pd.merge(pred_df[['Patient'] + common_cols], manual_df[['Patient'] + common_cols], 
                     on='Patient', how='inner', suffixes=('_pred', '_manual'))

# 创建结果列表
results = []

for col in common_cols:
    pred_col = col + '_pred'
    manual_col = col + '_manual'
    
    # 过滤掉缺失值
    mask = (~merged_df[pred_col].isna()) & (~merged_df[manual_col].isna())
    if mask.sum() == 0:
        continue
    
    y_pred = merged_df.loc[mask, pred_col]
    y_true = merged_df.loc[mask, manual_col]
    
    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    r2 = r2_score(y_true, y_pred)
    r, _ = pearsonr(y_true, y_pred)
    
    results.append({
        'US_part': col,
        'MAE_cm': mae,
        'RMSE_cm': rmse,
        'R2': r2,
        'Pearson_r': r
    })

metrics_df = pd.DataFrame(results)
#metrics_df.to_excel(output_file, index=False)
#print(f"Accuracy metrics saved to {output_file}")

# 输出整体平均指标
if len(metrics_df) > 0:
    print("Overall average:")
    print(metrics_df[['MAE_cm','RMSE_cm','R2','Pearson_r']].mean())
else:
    print("No metrics computed, check if predicted summary has any data for these patients.")

Overall average:
MAE_cm       0.047041
RMSE_cm      0.059311
R2          -0.698896
Pearson_r    0.085707
dtype: float64


# 模型优化方案

当前KNN回归模型可能无法很好地捕捉坐标之间的复杂关系，导致预测的三个点坐标差距不明显。以下是几种优化方案：


In [10]:
# 方案1：使用随机森林回归（更强大的非线性模型）
from sklearn.ensemble import RandomForestRegressor

print("训练随机森林回归模型...")
rf = RandomForestRegressor(
    n_estimators=200, max_depth=15, min_samples_split=5, min_samples_leaf=2,
    random_state=42, n_jobs=-1
)
rf.fit(emb_labeled, Yn_labeled)

Ypred_rf_norm = rf.predict(emb_unlabeled) if emb_unlabeled.shape[0] > 0 else np.zeros((0, len(target_cols)))

# 反归一化
if has_pairs and len(unlabeled_images) > 0:
    Ypred_rf = Ypred_rf_norm.copy()
    for (xc, yc) in pairs:
        x_idx = target_cols.index(xc)
        y_idx = target_cols.index(yc)
        Ypred_rf[:, x_idx] = Ypred_rf_norm[:, x_idx] * np.clip(widths, 1.0, None)
        Ypred_rf[:, y_idx] = Ypred_rf_norm[:, y_idx] * np.clip(heights, 1.0, None)
else:
    Ypred_rf = Ypred_rf_norm

# 导出随机森林预测结果
res_df_rf = pd.DataFrame({"filename": unlabeled_names})
for i, col in enumerate(target_cols):
    res_df_rf[col] = Ypred_rf[:, i] if Ypred_rf.shape[0] else []

res_df_rf["seg1_label"] = "Medial condyle"
res_df_rf["seg2_label"] = "Femoral condyle"
res_df_rf["seg3_label"] = "lateral_thickness"

out_rf = OUTPUT_FILE.with_name(OUTPUT_FILE.stem + "_rf.xlsx")
res_df_rf.to_excel(out_rf, index=False)
print("随机森林预测结果已保存:", out_rf)


训练随机森林回归模型...
随机森林预测结果已保存: c:\Users\Charlotte\Desktop\dissertation\US_new\annotation_points_predicted_rf.xlsx


In [11]:
# 方案2：使用XGBoost回归（梯度提升，通常效果更好）
try:
    import xgboost as xgb
    
    print("训练XGBoost回归模型...")
    xgb_model = xgb.XGBRegressor(
        n_estimators=300, max_depth=8, learning_rate=0.05,
        subsample=0.8, colsample_bytree=0.8, random_state=42, n_jobs=-1
    )
    xgb_model.fit(emb_labeled, Yn_labeled)
    
    Ypred_xgb_norm = xgb_model.predict(emb_unlabeled) if emb_unlabeled.shape[0] > 0 else np.zeros((0, len(target_cols)))
    
    # 反归一化
    if has_pairs and len(unlabeled_images) > 0:
        Ypred_xgb = Ypred_xgb_norm.copy()
        for (xc, yc) in pairs:
            x_idx = target_cols.index(xc)
            y_idx = target_cols.index(yc)
            Ypred_xgb[:, x_idx] = Ypred_xgb_norm[:, x_idx] * np.clip(widths, 1.0, None)
            Ypred_xgb[:, y_idx] = Ypred_xgb_norm[:, y_idx] * np.clip(heights, 1.0, None)
    else:
        Ypred_xgb = Ypred_xgb_norm
    
    # 导出XGBoost预测结果
    res_df_xgb = pd.DataFrame({"filename": unlabeled_names})
    for i, col in enumerate(target_cols):
        res_df_xgb[col] = Ypred_xgb[:, i] if Ypred_xgb.shape[0] else []
    
    res_df_xgb["seg1_label"] = "Medial condyle"
    res_df_xgb["seg2_label"] = "Femoral condyle"
    res_df_xgb["seg3_label"] = "lateral_thickness"
    
    out_xgb = OUTPUT_FILE.with_name(OUTPUT_FILE.stem + "_xgb.xlsx")
    res_df_xgb.to_excel(out_xgb, index=False)
    print("XGBoost预测结果已保存:", out_xgb)
    
except ImportError:
    print("XGBoost未安装，跳过。安装命令: pip install xgboost")

XGBoost未安装，跳过。安装命令: pip install xgboost


In [12]:
# 方案3：集成方法 - 组合多个模型的预测结果（加权平均）
print("使用集成方法（KNN + 随机森林 + XGBoost）...")

predictions_norm = [Ypred_norm]  # KNN
if 'Ypred_rf_norm' in globals():
    predictions_norm.append(Ypred_rf_norm)
if 'Ypred_xgb_norm' in globals():
    predictions_norm.append(Ypred_xgb_norm)

# 加权平均（KNN权重较低，RF和XGBoost权重较高）
weights = [0.3, 0.35, 0.35] if len(predictions_norm) == 3 else ([0.4, 0.6] if len(predictions_norm) == 2 else [1.0])
if len(weights) > len(predictions_norm):
    weights = weights[:len(predictions_norm)]
weights = np.array(weights) / np.sum(weights)

Ypred_ensemble_norm = np.average(predictions_norm, axis=0, weights=weights)

# 反归一化
if has_pairs and len(unlabeled_images) > 0:
    Ypred_ensemble = Ypred_ensemble_norm.copy()
    for (xc, yc) in pairs:
        x_idx = target_cols.index(xc)
        y_idx = target_cols.index(yc)
        Ypred_ensemble[:, x_idx] = Ypred_ensemble_norm[:, x_idx] * np.clip(widths, 1.0, None)
        Ypred_ensemble[:, y_idx] = Ypred_ensemble_norm[:, y_idx] * np.clip(heights, 1.0, None)
else:
    Ypred_ensemble = Ypred_ensemble_norm

# 导出集成预测结果
res_df_ensemble = pd.DataFrame({"filename": unlabeled_names})
for i, col in enumerate(target_cols):
    res_df_ensemble[col] = Ypred_ensemble[:, i] if Ypred_ensemble.shape[0] else []

res_df_ensemble["seg1_label"] = "Medial condyle"
res_df_ensemble["seg2_label"] = "Femoral condyle"
res_df_ensemble["seg3_label"] = "lateral_thickness"

out_ensemble = OUTPUT_FILE.with_name(OUTPUT_FILE.stem + "_ensemble.xlsx")
res_df_ensemble.to_excel(out_ensemble, index=False)
print("集成预测结果已保存:", out_ensemble)
print(f"使用了 {len(predictions_norm)} 个模型的加权平均，权重: {weights}")


使用集成方法（KNN + 随机森林 + XGBoost）...
集成预测结果已保存: c:\Users\Charlotte\Desktop\dissertation\US_new\annotation_points_predicted_ensemble.xlsx
使用了 2 个模型的加权平均，权重: [0.4 0.6]


## 其他优化建议

1. **交叉验证调参**：使用交叉验证找到最优的模型超参数
2. **数据增强**：对训练图像进行旋转、翻转等增强，增加数据多样性
3. **几何约束后处理**：根据解剖学知识，对预测结果进行合理性检查
4. **半监督学习**：使用预测结果作为伪标签，迭代改进模型
5. **特征工程**：添加图像统计特征（如直方图、纹理特征）作为额外输入


In [None]:
# 方案4：使用ResNet50提取更丰富的特征 + 随机森林/XGBoost
# ResNet50提供2048维特征（vs ResNet18的512维），可能捕获更多细节

print("使用ResNet50提取更丰富的特征...")
resnet50 = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V2)
backbone50 = nn.Sequential(*list(resnet50.children())[:-1]).to(device)
backbone50.eval()

@torch.inference_mode()
def image_to_embedding_resnet50(img_path: Path) -> np.ndarray:
    try:
        img = Image.open(img_path).convert('RGB')
        x = transform(img).unsqueeze(0).to(device)
        feat = backbone50(x)  # [1, 2048, 1, 1]
        feat = feat.view(feat.size(0), -1).cpu().numpy()[0]
        return feat.astype(np.float32)
    except Exception as e:
        print(f"提取特征失败: {img_path} -> {e}")
        return np.zeros((2048,), dtype=np.float32)

# 提取ResNet50特征
print("提取已标注图像的ResNet50特征...")
emb_labeled_r50 = np.stack([image_to_embedding_resnet50(p) for p in tqdm(df_labeled["__image_path__"].tolist(), desc="ResNet50")], axis=0)

# 使用随机森林训练
rf_r50 = RandomForestRegressor(n_estimators=200, max_depth=15, random_state=42, n_jobs=-1)
rf_r50.fit(emb_labeled_r50, Yn_labeled)

# 提取未标注图像特征并预测
print("提取未标注图像的ResNet50特征...")
emb_unlabeled_r50 = np.stack([image_to_embedding_resnet50(p) for p in tqdm(unlabeled_images, desc="ResNet50")], axis=0)
Ypred_rf_r50_norm = rf_r50.predict(emb_unlabeled_r50)

# 反归一化
if has_pairs and len(unlabeled_images) > 0:
    Ypred_rf_r50 = Ypred_rf_r50_norm.copy()
    for (xc, yc) in pairs:
        x_idx = target_cols.index(xc)
        y_idx = target_cols.index(yc)
        Ypred_rf_r50[:, x_idx] = Ypred_rf_r50_norm[:, x_idx] * np.clip(widths, 1.0, None)
        Ypred_rf_r50[:, y_idx] = Ypred_rf_r50_norm[:, y_idx] * np.clip(heights, 1.0, None)
else:
    Ypred_rf_r50 = Ypred_rf_r50_norm

# 导出
res_df_rf_r50 = pd.DataFrame({"filename": unlabeled_names})
for i, col in enumerate(target_cols):
    res_df_rf_r50[col] = Ypred_rf_r50[:, i] if Ypred_rf_r50.shape[0] else []

res_df_rf_r50["seg1_label"] = "Medial condyle"
res_df_rf_r50["seg2_label"] = "Femoral condyle"
res_df_rf_r50["seg3_label"] = "lateral_thickness"

out_rf_r50 = OUTPUT_FILE.with_name(OUTPUT_FILE.stem + "_rf_resnet50.xlsx")
res_df_rf_r50.to_excel(out_rf_r50, index=False)
print("随机森林+ResNet50预测结果已保存:", out_rf_r50)


In [None]:
# 方案5：几何约束后处理 - 根据解剖学知识调整预测结果
# 如果三个点的坐标差距不明显，可以通过分析已标注数据的统计特征来调整

print("应用几何约束后处理...")

# 计算已标注数据中三个点之间的相对距离统计
if len(df_labeled) > 0:
    # 计算每条线段的长度
    seg_lengths_labeled = []
    for seg_idx in range(3):
        x1_col = f'x{seg_idx*2+1}'
        y1_col = f'y{seg_idx*2+1}'
        x2_col = f'x{seg_idx*2+2}'
        y2_col = f'y{seg_idx*2+2}'
        if all(c in df_labeled.columns for c in [x1_col, y1_col, x2_col, y2_col]):
            lengths = np.sqrt(
                (df_labeled[x2_col] - df_labeled[x1_col])**2 + 
                (df_labeled[y2_col] - df_labeled[y1_col])**2
            )
            seg_lengths_labeled.append(lengths)
    
    if len(seg_lengths_labeled) == 3:
        # 计算每条线段长度的均值和标准差
        seg_means = [np.mean(l) for l in seg_lengths_labeled]
        seg_stds = [np.std(l) for l in seg_lengths_labeled]
        
        print(f"已标注数据线段长度统计:")
        print(f"  seg1 (Medial): 均值={seg_means[0]:.2f}, 标准差={seg_stds[0]:.2f}")
        print(f"  seg2 (Femoral): 均值={seg_means[1]:.2f}, 标准差={seg_stds[1]:.2f}")
        print(f"  seg3 (Lateral): 均值={seg_means[2]:.2f}, 标准差={seg_stds[2]:.2f}")
        
        # 对预测结果进行后处理：如果预测的线段长度与统计值差异过大，进行缩放调整
        Ypred_adjusted = Ypred_ensemble.copy() if 'Ypred_ensemble' in globals() else Ypred.copy()
        
        for i in range(len(unlabeled_images)):
            for seg_idx in range(3):
                x1_idx = seg_idx * 2
                y1_idx = seg_idx * 2 + 1
                x2_idx = seg_idx * 2 + 2
                y2_idx = seg_idx * 2 + 3
                
                if all(idx < Ypred_adjusted.shape[1] for idx in [x1_idx, y1_idx, x2_idx, y2_idx]):
                    pred_length = np.sqrt(
                        (Ypred_adjusted[i, x2_idx] - Ypred_adjusted[i, x1_idx])**2 +
                        (Ypred_adjusted[i, y2_idx] - Ypred_adjusted[i, y1_idx])**2
                    )
                    
                    # 如果预测长度与均值差异超过2个标准差，进行缩放
                    if seg_means[seg_idx] > 0 and abs(pred_length - seg_means[seg_idx]) > 2 * seg_stds[seg_idx]:
                        scale_factor = seg_means[seg_idx] / max(pred_length, 1.0)
                        # 保持起点不变，调整终点
                        center_x = (Ypred_adjusted[i, x1_idx] + Ypred_adjusted[i, x2_idx]) / 2
                        center_y = (Ypred_adjusted[i, y1_idx] + Ypred_adjusted[i, y2_idx]) / 2
                        
                        Ypred_adjusted[i, x1_idx] = center_x - (Ypred_adjusted[i, x2_idx] - center_x) * scale_factor
                        Ypred_adjusted[i, y1_idx] = center_y - (Ypred_adjusted[i, y2_idx] - center_y) * scale_factor
                        Ypred_adjusted[i, x2_idx] = center_x + (Ypred_adjusted[i, x2_idx] - center_x) * scale_factor
                        Ypred_adjusted[i, y2_idx] = center_y + (Ypred_adjusted[i, y2_idx] - center_y) * scale_factor
        
        # 导出调整后的结果
        res_df_adjusted = pd.DataFrame({"filename": unlabeled_names})
        for i, col in enumerate(target_cols):
            res_df_adjusted[col] = Ypred_adjusted[:, i] if Ypred_adjusted.shape[0] else []
        
        res_df_adjusted["seg1_label"] = "Medial condyle"
        res_df_adjusted["seg2_label"] = "Femoral condyle"
        res_df_adjusted["seg3_label"] = "lateral_thickness"
        
        out_adjusted = OUTPUT_FILE.with_name(OUTPUT_FILE.stem + "_adjusted.xlsx")
        res_df_adjusted.to_excel(out_adjusted, index=False)
        print("几何约束调整后的预测结果已保存:", out_adjusted)
    else:
        print("无法计算线段长度统计，跳过几何约束后处理")
else:
    print("没有已标注数据，跳过几何约束后处理")
