### 图像数据预处理

In [None]:
import cv2
import numpy as np
from shapely.geometry import Polygon
from shapely.ops import unary_union
from sklearn.metrics.pairwise import euclidean_distances, cosine_similarity

# # 通过 Otsu 阈值法从图像中提取主要区域的轮廓
# def segment_tissue(img):
#     img_hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV)
#     mthresh = 7
#     img_med = cv2.medianBlur(img_hsv[:, :, 1], mthresh)
#     _, img_prepped = cv2.threshold(img_med, 0, 255, cv2.THRESH_OTSU + cv2.THRESH_BINARY)

#     close = 4
#     kernel = np.ones((close, close), np.uint8)
#     img_prepped = cv2.morphologyEx(img_prepped, cv2.MORPH_CLOSE, kernel)

#     # Find and filter contours
#     contours, hierarchy = cv2.findContours(
#         img_prepped, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_NONE
#     )
#     return contours, hierarchy


# # 从轮廓中分离区域的外边界和内部空洞
# def detect_foreground(contours, hierarchy):
#     hierarchy = np.squeeze(hierarchy, axis=(0,))[:, 2:]

#     # find foreground contours (parent == -1)
#     hierarchy_1 = np.flatnonzero(hierarchy[:, 1] == -1)
#     foreground_contours = [contours[cont_idx] for cont_idx in hierarchy_1]

#     all_holes = []
#     for cont_idx in hierarchy_1:
#         all_holes.append(np.flatnonzero(hierarchy[:, 1] == cont_idx))

#     hole_contours = []
#     for hole_ids in all_holes:
#         holes = [contours[idx] for idx in hole_ids]
#         hole_contours.append(holes)

#     return foreground_contours, hole_contours


# # 构建组织区域的多边形表示，剔除无效或过小区域
# def construct_polygon(foreground_contours, hole_contours, min_area):
#     polys = []
#     for foreground, holes in zip(foreground_contours, hole_contours):
#         # We remove all contours that consist of fewer than 3 points, as these won't work with the Polygon constructor.
#         if len(foreground) < 3:
#             continue

#         # remove redundant dimensions from the contour and convert to Shapely Polygon
#         poly = Polygon(np.squeeze(foreground))

#         # discard all polygons that are considered too small
#         if poly.area < min_area:
#             continue

#         if not poly.is_valid:
#             # This is likely becausee the polygon is self-touching or self-crossing.
#             # Try and 'correct' the polygon using the zero-length buffer() trick.
#             # See https://shapely.readthedocs.io/en/stable/manual.html#object.buffer
#             poly = poly.buffer(0)

#         # Punch the holes in the polygon
#         for hole_contour in holes:
#             if len(hole_contour) < 3:
#                 continue

#             hole = Polygon(np.squeeze(hole_contour))

#             if not hole.is_valid:
#                 continue

#             # ignore all very small holes
#             if hole.area < min_area:
#                 continue

#             poly = poly.difference(hole)

#         polys.append(poly)

#     if len(polys) == 0:
#         raise Exception("Raw tissue mask consists of 0 polygons")

#     # If we have multiple polygons, we merge any overlap between them using unary_union().
#     # This will result in a Polygon or MultiPolygon with most tissue masks.
#     return unary_union(polys)


# 生成指定大小的矩形瓦片列表，用于切分图像
def generate_tiles(tile_width_pix, tile_height_pix, img_width, img_height, offsets=[(0, 0)]):
    # Generate tiles covering the entire image.
    # Provide an offset (x,y) to create a stride-like overlap effect.
    # Add an additional tile size to the range stop to prevent tiles being cut off at the edges.
    range_stop_width = int(np.ceil(img_width + tile_width_pix))
    range_stop_height = int(np.ceil(img_height + tile_height_pix))

    rects = []
    for xmin, ymin in offsets:
        cols = range(int(np.floor(xmin)), range_stop_width, tile_width_pix)
        rows = range(int(np.floor(ymin)), range_stop_height, tile_height_pix)
        for x in cols:
            for y in rows:
                rect = Polygon(
                    [
                        (x, y),
                        (x + tile_width_pix, y),
                        (x + tile_width_pix, y - tile_height_pix),
                        (x, y - tile_height_pix),
                    ]
                )
                rects.append(rect)
    return rects


# 基于组织区域生成有效的瓦片集合
# img_width: 图像的宽度（像素）,img_height: 图像的高度（像素）
# tissue_mask_scaled: 组织区域的多边形表示, tile_size_pix: 瓦片大小（像素）, offsets_pix: 瓦片偏移（像素）
# def create_tissue_tiles(img_width, img_height, tissue_mask_scaled, tile_size_pix, offsets_pix=None):
def create_tissue_tiles(img_width, img_height, tile_size_pix, offsets_pix=None):
    # Use the tissue mask bounds as base offsets (+ a margin of a few tiles) to avoid wasting CPU power creating tiles that are never going
    # to be inside the tissue mask.
    # # 图像边界增加缓冲区，防止 tiles 超出边界
    # margin_pix = tile_size_pix * 2
    # minx, miny = 0 - margin_pix, 0 - margin_pix
    # maxx, maxy = img_width + margin_pix, img_height + margin_pix

    if offsets_pix is None:
        offsets_pix = [(0,0)]

    # Generate tiles covering the entire WSI
    # 这里可以考虑将offsets设置为类似offsets = [(0, 0), (64, 64)]，增加重叠，当前不重叠
    all_tiles = []
    all_tiles = generate_tiles(
        tile_size_pix,
        tile_size_pix,
        img_width,
        img_height,
        offsets=offsets_pix,
    )

    # # Retain only the tiles that sit within the tissue mask polygon
    # filtered_tiles = [rect for rect in all_tiles if tissue_mask_scaled.intersects(rect)]

    # return filtered_tiles
    return all_tiles


# 合并相邻且相似的瓦片，减少冗余
# def mergedpatch_gen(features, coords, dist_threshold=4, corr_threshold = 0.6):

#     # Get patch distance in pixels with rendered segmentation level. Note that each patch is squared and therefore same distance.
#     patch_dist = abs(coords[0,2] - coords[0,0]) 
#     print(patch_dist)
    
#     # Compute feature similarity (cosine) and nearby pacthes (L2 norm - only need the top left x,y coordinates)
#     cosine_matrix = cosine_similarity(features, features)
#     coordinate_matrix = euclidean_distances(coords[:,:2], coords[:,:2])

#     # NOTE: random selection for the first patch for patch merging might be less biased towards tissue orientation and size. 
#     indices_avail = np.arange(features.shape[0])
#     np.random.seed(0)  
#     np.random.shuffle(indices_avail)

#     # Merging together nearby patches and similar within pre-defined threshold. 
#     mergedfeatures = []
#     indices_used = []
#     for ref in indices_avail:

#         # This has been merged already
#         if ref not in indices_used:

#             # Making sure they won't be selected once more
#             if indices_used:
#                 coordinate_matrix[ref,indices_used] = [np.Inf]*len(indices_used)
#                 cosine_matrix[ref,indices_used] = [0.0]*len(indices_used)
            
#             indices_dist = np.where(coordinate_matrix[ref] < patch_dist*dist_threshold, 1 , 0)
#             indices_corr = np.where(cosine_matrix[ref] > corr_threshold, 1 , 0)
#             final_indices = indices_dist * indices_corr

#             # which includes already the ref patch
#             indices_used.extend(list(np.where(final_indices == 1)[0]))
#             mergedfeatures.append(tuple((features[final_indices==1,:], coords[final_indices==1,:])))
#         else:
#             continue
        
#     assert len(indices_used)==features.shape[0], f'Probably issue in contruscting merged features for graph {len(indices_used)}!={features.shape[0]}'

#     return mergedfeatures



In [None]:
# 使用上述函数进行预处理
import cv2
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt

# 将 CSV 数据表转换为灰度图像
def convert_csv_to_grayscale_image(input_csv_path, skip_rows=5):
    # 读取文件并跳过指定行数
    with open(input_csv_path, 'r') as file:
        rows = file.readlines()[skip_rows:]
    
    # 仅保留数字行并转换为整数
    numeric_rows = []
    for row in rows:
        try:
            numeric_rows.append([int(value) for value in row.strip().split(',') if value.strip().isdigit()])
        except ValueError:
            continue

    # 找到最长的行长度
    max_row_length = max(len(row) for row in numeric_rows)

    # 填充每一行使其长度一致
    padded_numeric_rows = [row + [0] * (max_row_length - len(row)) for row in numeric_rows]

    # 转换为二维数组
    array = np.array(padded_numeric_rows)

    # 将数组值归一化到 0-255 之间，用于生成灰度图像
    normalized_array = (array - array.min()) / (array.max() - array.min()) * 255
    normalized_array = normalized_array.astype(np.uint8)

    # 将归一化后的数组转换为图像
    image = Image.fromarray(normalized_array, mode='L')
    return image


def process_image(image, tile_size_pix=128, offsets_pix=None):
    image = np.array(image)
    # 图像预处理：灰度图像可以直接使用，不需要颜色转换
    _, binary_mask = cv2.threshold(image, 0, 255, cv2.THRESH_OTSU)  # 使用 Otsu 阈值法生成二值图像
    img_hsv = cv2.merge([binary_mask, binary_mask, binary_mask])  # 转为伪RGB以适配现有代码逻辑

    # # 提取感兴趣区域
    # contours, hierarchy = segment_tissue(img_hsv)
    # foreground_contours, hole_contours = detect_foreground(contours, hierarchy)
    # roi_mask = construct_polygon(foreground_contours, hole_contours, min_area=1000)

    # 生成瓦片
    img_width, img_height = img_hsv.shape[1], img_hsv.shape[0]
    tiles = create_tissue_tiles(img_width, img_height, tile_size_pix, offsets_pix)

    # 可视化瓦片
    output_img = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
    for tile in tiles:
        coords = np.array(tile.exterior.coords, dtype=np.int32)
        cv2.polylines(output_img, [coords], isClosed=True, color=(0, 255, 0), thickness=2)

    # 显示瓦片叠加图像
    plt.imshow(output_img, cmap="gray")
    plt.axis("off")
    plt.show()

    return tiles

# 执行流程
image_path = "ZafAt 1_Field of View.csv"
image = convert_csv_to_grayscale_image(image_path, skip_rows=5)
tiles = process_image(image, tile_size_pix=128, offsets_pix=[(0, 0),(64,64)])
