In [None]:
## Mount Google Drive Data (If using Google Colaboratory)
try:
    from google.colab import drive
    drive.mount('/content/gdrive')
except:
    print("Mounting Failed.")

Mounted at /content/gdrive


In [None]:
import os
import cv2
import matplotlib.pyplot as plt

In [None]:
def get_subdirectories(folder_path):
    subdirectories = [f for f in os.listdir(folder_path) if os.path.isdir(os.path.join(folder_path, f))]
    return subdirectories

folder_path = "/content/gdrive/MyDrive/video_to_image"

name_list = get_subdirectories(folder_path)
name_list.sort()
print(name_list)

['clip_000000', 'clip_000001', 'clip_000002', 'clip_000003', 'clip_000004', 'clip_000005', 'clip_000006', 'clip_000007', 'clip_000008', 'clip_000009', 'clip_000010', 'clip_000011', 'clip_000012', 'clip_000013', 'clip_000014', 'clip_000015', 'clip_000016', 'clip_000017', 'clip_000018', 'clip_000019', 'clip_000020', 'clip_000021', 'clip_000022', 'clip_000023', 'clip_000024', 'clip_000025', 'clip_000026', 'clip_000027', 'clip_000028', 'clip_000029', 'clip_000030', 'clip_000031', 'clip_000032', 'clip_000033', 'clip_000034', 'clip_000035', 'clip_000036', 'clip_000037', 'clip_000038', 'clip_000039', 'clip_000040', 'clip_000041', 'clip_000042', 'clip_000043', 'clip_000044', 'clip_000045', 'clip_000046', 'clip_000047', 'clip_000048', 'clip_000049', 'clip_000050', 'clip_000051', 'clip_000052', 'clip_000053', 'clip_000054', 'clip_000055', 'clip_000056', 'clip_000057', 'clip_000058', 'clip_000059', 'clip_000060', 'clip_000061', 'clip_000062', 'clip_000063', 'clip_000064', 'clip_000065', 'clip_000

In [None]:
from torch import nn
import torch
from torchvision import models
import torchvision
from torch.nn import functional as F

def conv3x3(in_, out):
    return nn.Conv2d(in_, out, 3, padding=1)


class ConvRelu(nn.Module):
    def __init__(self, in_: int, out: int):
        super(ConvRelu, self).__init__()
        self.conv = conv3x3(in_, out)
        self.activation = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.conv(x)
        x = self.activation(x)
        return x


class DecoderBlock(nn.Module):

    def __init__(self, in_channels, middle_channels, out_channels, is_deconv=True):
        super(DecoderBlock, self).__init__()
        self.in_channels = in_channels

        if is_deconv:
            self.block = nn.Sequential(
                ConvRelu(in_channels, middle_channels),
                nn.ConvTranspose2d(middle_channels, out_channels, kernel_size=4, stride=2,
                                   padding=1),
                nn.ReLU(inplace=True)
            )
        else:
            self.block = nn.Sequential(
                nn.Upsample(scale_factor=2, mode='bilinear'),
                ConvRelu(in_channels, middle_channels),
                ConvRelu(middle_channels, out_channels),
            )

    def forward(self, x):
        return self.block(x)
class UNet16(nn.Module):
    def __init__(self, num_classes=1, num_filters=32, pretrained=False):

        super().__init__()
        self.num_classes = num_classes

        self.pool = nn.MaxPool2d(2, 2)

        self.encoder = torchvision.models.vgg16(pretrained=pretrained).features

        self.relu = nn.ReLU(inplace=True)

        self.conv1 = nn.Sequential(self.encoder[0],
                                   self.relu,
                                   self.encoder[2],
                                   self.relu)

        self.conv2 = nn.Sequential(self.encoder[5],
                                   self.relu,
                                   self.encoder[7],
                                   self.relu)

        self.conv3 = nn.Sequential(self.encoder[10],
                                   self.relu,
                                   self.encoder[12],
                                   self.relu,
                                   self.encoder[14],
                                   self.relu)

        self.conv4 = nn.Sequential(self.encoder[17],
                                   self.relu,
                                   self.encoder[19],
                                   self.relu,
                                   self.encoder[21],
                                   self.relu)

        self.conv5 = nn.Sequential(self.encoder[24],
                                   self.relu,
                                   self.encoder[26],
                                   self.relu,
                                   self.encoder[28],
                                   self.relu)

        self.center = DecoderBlock(512, num_filters * 8 * 2, num_filters * 8)

        self.dec5 = DecoderBlock(512 + num_filters * 8, num_filters * 8 * 2, num_filters * 8)
        self.dec4 = DecoderBlock(512 + num_filters * 8, num_filters * 8 * 2, num_filters * 8)
        self.dec3 = DecoderBlock(256 + num_filters * 8, num_filters * 4 * 2, num_filters * 2)
        self.dec2 = DecoderBlock(128 + num_filters * 2, num_filters * 2 * 2, num_filters)
        self.dec1 = ConvRelu(64 + num_filters, num_filters)
        self.final = nn.Conv2d(num_filters, num_classes, kernel_size=1)

    def forward(self, x):
        conv1 = self.conv1(x)
        conv2 = self.conv2(self.pool(conv1))
        conv3 = self.conv3(self.pool(conv2))
        conv4 = self.conv4(self.pool(conv3))
        conv5 = self.conv5(self.pool(conv4))

        center = self.center(self.pool(conv5))

        dec5 = self.dec5(torch.cat([center, conv5], 1))

        dec4 = self.dec4(torch.cat([dec5, conv4], 1))
        dec3 = self.dec3(torch.cat([dec4, conv3], 1))
        dec2 = self.dec2(torch.cat([dec3, conv2], 1))
        dec1 = self.dec1(torch.cat([dec2, conv1], 1))

        if self.num_classes > 1:
            x_out = F.log_softmax(self.final(dec1), dim=1)
        else:
            x_out = self.final(dec1)

        return x_out


In [None]:
model_binary = UNet16(1)
model_binary_path="/content/gdrive/MyDrive/unet16/unet16_binary_20/model_0.pt"
state = torch.load(str(model_binary_path))
state = {key.replace('module.', ''): value for key, value in state['model'].items()}
model_binary.load_state_dict(state)
model_binary.eval()
model_binary = model_binary.cuda()




In [None]:
model_parts = UNet16(4)
model_parts_path="/content/gdrive/MyDrive/unet16/unet16_parts_20/model_0.pt"
state = torch.load(str(model_parts_path))
state = {key.replace('module.', ''): value for key, value in state['model'].items()}
model_parts.load_state_dict(state)
model_parts.eval()
model_parts = model_parts.cuda()

In [None]:
from albumentations import Compose, RandomCrop, VerticalFlip, HorizontalFlip, Normalize, CenterCrop, Resize
from albumentations.pytorch.functional import img_to_tensor
def preprocess_img(p=1):
    return Compose([
        CenterCrop(height=640,width=900),
        Resize(height=960, width=1280, p=1),
    ], p=p)
def normalization_transform(p=1):
    return Compose([
        Normalize(p=1)
    ], p=p)

In [None]:
import pandas as pd
dataframe = pd.read_csv("/content/gdrive/MyDrive/labels.csv")

In [None]:
print(dataframe)

       Unnamed: 0    clip_name  \
0               0  clip_000000   
1               1  clip_000001   
2               2  clip_000002   
3               3  clip_000003   
4               4  clip_000004   
...           ...          ...   
24690       24716  clip_024716   
24691       24717  clip_024717   
24692       24718  clip_024718   
24693       24719  clip_024719   
24694       24720  clip_024720   

                                           tools_present  
0      [needle driver, nan, needle driver, cadiere fo...  
1      [needle driver, nan, needle driver, cadiere fo...  
2      [needle driver, nan, needle driver, cadiere fo...  
3      [needle driver, nan, needle driver, cadiere fo...  
4      [needle driver, nan, needle driver, cadiere fo...  
...                                                  ...  
24690  [bipolar forceps, nan, monopolar curved scisso...  
24691  [bipolar forceps, nan, monopolar curved scisso...  
24692  [bipolar forceps, nan, monopolar curved scisso...  
2

In [None]:
for name in name_list:
  name = name.replace("clip_","")
  name = int(name)
  print(name,dataframe.iloc[name,2])

0 [needle driver, nan, needle driver, cadiere forceps]
1 [needle driver, nan, needle driver, cadiere forceps]
2 [needle driver, nan, needle driver, cadiere forceps]
3 [needle driver, nan, needle driver, cadiere forceps]
4 [needle driver, nan, needle driver, cadiere forceps]
5 [needle driver, nan, needle driver, cadiere forceps]
6 [needle driver, nan, needle driver, cadiere forceps]
7 [needle driver, nan, needle driver, cadiere forceps]
8 [needle driver, nan, needle driver, cadiere forceps]
9 [needle driver, nan, needle driver, cadiere forceps]
10 [needle driver, nan, needle driver, cadiere forceps]
11 [needle driver, nan, needle driver, cadiere forceps]
12 [needle driver, nan, needle driver, cadiere forceps]
13 [needle driver, nan, needle driver, cadiere forceps]
14 [needle driver, nan, needle driver, cadiere forceps]
15 [needle driver, nan, needle driver, cadiere forceps]
16 [needle driver, nan, needle driver, cadiere forceps]
17 [needle driver, nan, needle driver, cadiere forceps]
18

In [None]:
mapping = {
    'bipolar dissector': 1,
    'bipolar forceps': 2,
    'cadiere forceps': 3,
    'clip applier': 4,
    'force bipolar': 5,
    'grasping retractor': 6,
    'monopolar curved scissors': 7,
    'needle driver': 8,
    'permanent cautery hook/spatula': 9,
    'prograsp forceps': 10,
    'stapler': 11,
    'suction irrigator': 12,
    'tip-up fenestrated grasper': 13,
    'vessel sealer': 14
}

In [None]:
special_list=['grasping retractor',
    'monopolar curved scissors',
    'stapler',
    'suction irrigator',
    'tip-up fenestrated grasper']

In [None]:
import numpy as np
import math
import ast
import re
def reassemble_label(file_name, dbscan_labels, non_zero_coords,instruments_list=None):
  if instruments_list==None:
    number = int(file_name.replace("clip_",""))
    instruments = dataframe.iloc[number,2]
    desired_elements = re.findall(r'\b(\w+(\s\w+)*)\b', instruments)
    instruments_list = [item[0].strip() for item in desired_elements if item[0].lower() != 'nan']
  combined_data = list(zip(non_zero_coords, dbscan_labels))
  grouped_data = {}

  for entry, label in combined_data:
      if label not in grouped_data:
          grouped_data[label] = [entry]
      else:
          grouped_data[label].append(entry)

  min_y_values = {}
  for label, group in grouped_data.items():
      sorted_group = sorted(group, key=lambda x: x[1])
      min_y_values[label] = sorted_group[0][1]
  if -1 in min_y_values.keys():
    del min_y_values[-1]

  sorted_class_info = sorted(min_y_values.items(), key=lambda x: x[1])

  position_list={}
  for position,(label,_) in enumerate(sorted_class_info):
    position_list[label] = position
  a,b = 960,1280
  dbscan_segmented_image = np.zeros([3,a,b])
  for i, label in enumerate(dbscan_labels):
        x, y = non_zero_coords[i]
        if label != -1:
          tmp = position_list.get(label)
          name = instruments_list[int(tmp)]
          pixel_number = int(mapping.get(name))/14*255
          dbscan_segmented_image[tmp, x, y] = int(pixel_number)
  return dbscan_segmented_image,instruments_list

In [None]:
def load_image(path):
    img = cv2.imread(str(path))
    return cv2.cvtColor(img, cv2.COLOR_BGR2RGB)


In [None]:
import os
folder_path = "/content/gdrive/MyDrive/video_to_image"
target_path = "/content/gdrive/MyDrive/training_data"
os.makedirs(target_path,exist_ok=True)

In [None]:
new_name_list = name_list[983:]
print(new_name_list)

['clip_000983', 'clip_000984', 'clip_000985', 'clip_000986', 'clip_000987', 'clip_000988', 'clip_000989', 'clip_000990', 'clip_000991', 'clip_000992', 'clip_000993', 'clip_000994', 'clip_000995', 'clip_000996', 'clip_000997', 'clip_000998', 'clip_000999', 'clip_001000', 'clip_001001', 'clip_001002', 'clip_001003', 'clip_001004', 'clip_001005', 'clip_001006', 'clip_001007', 'clip_001008', 'clip_001009', 'clip_001010', 'clip_001011', 'clip_001012', 'clip_001013', 'clip_001014', 'clip_001015', 'clip_001016', 'clip_001017', 'clip_001018', 'clip_001019', 'clip_001020', 'clip_001021', 'clip_001022', 'clip_001023', 'clip_001024', 'clip_001025', 'clip_001026', 'clip_001027', 'clip_001028', 'clip_001029', 'clip_001030', 'clip_001031', 'clip_001032', 'clip_001033', 'clip_001034', 'clip_001035', 'clip_001036', 'clip_001037', 'clip_001038', 'clip_001039', 'clip_001040', 'clip_001041', 'clip_001042', 'clip_001043', 'clip_001044', 'clip_001045', 'clip_001046', 'clip_001047', 'clip_001048', 'clip_001

In [None]:
from sklearn.cluster import DBSCAN
import numpy as np
import cv2

for name in new_name_list:
  img_path = os.path.join(folder_path, name)
  target_path_name = os.path.join(target_path, name)
  os.makedirs(target_path,exist_ok=True)
  index = 0
  print("Processing image: ", img_path)
  for i in range(11):
    img_path_i = os.path.join(img_path, "frame_"+str(i)+".png")

    try:
      image = load_image(img_path_i)
    except:
      print()
      print("Error: ", img_path_i)
      continue
    image = preprocess_img(p=1)(image=image)['image']

    dbscan = DBSCAN(eps=5, min_samples=50)

    with torch.no_grad():
      input_image = torch.unsqueeze(img_to_tensor(normalization_transform(p=1)(image=image)['image']).cuda(), dim=0)
    binary_mask = model_binary(input_image)
    binary_mask_array = binary_mask.data[0].cpu().numpy()[0]
    binary_gray_feature_map=(binary_mask_array > 0).astype(int)

    non_zero_coords = np.column_stack(np.where(binary_gray_feature_map!= 0))
    if len(non_zero_coords)==0:
      continue
    dbscan_labels = dbscan.fit_predict(non_zero_coords)

    num_instruments = np.max(dbscan_labels)+1

    if num_instruments != 3:
      continue

    dbscan_segmented_image,instruments_list = reassemble_label(name, dbscan_labels, non_zero_coords)
    parts_image = model_parts(input_image).squeeze()
    parts_image = parts_image.data.cpu().numpy()
    parts_image = (np.argmax(parts_image[0:], axis=0))
    ROI = np.zeros([960,1280])
    for i in range(3):
      instrument = instruments_list[i]
      if instrument not in special_list:
        tmp = (parts_image==2).astype(int)
      else:
        tmp = ((parts_image==2) | (parts_image==3)).astype(int)
      ROI += tmp*dbscan_segmented_image[i]

    parts_image = parts_image/3*255

    target_path_i = os.path.join(target_path_name, str(index))
    os.makedirs(target_path_i,exist_ok=True)
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    cv2.imwrite(os.path.join(target_path_i, 'original_image.png'), image)
    cv2.imwrite(os.path.join(target_path_i, 'instruments_image.png'), dbscan_segmented_image.sum(axis=0))
    cv2.imwrite(os.path.join(target_path_i, 'parts_image.png'), parts_image)
    cv2.imwrite(os.path.join(target_path_i, 'ROI_image.png'), ROI)
    index += 1

[1;30;43m流式输出内容被截断，只能显示最后 5000 行内容。[0m
Error:  /content/gdrive/MyDrive/video_to_image/clip_001784/frame_6.png

Error:  /content/gdrive/MyDrive/video_to_image/clip_001784/frame_7.png

Error:  /content/gdrive/MyDrive/video_to_image/clip_001784/frame_8.png

Error:  /content/gdrive/MyDrive/video_to_image/clip_001784/frame_9.png

Error:  /content/gdrive/MyDrive/video_to_image/clip_001784/frame_10.png
Processing image:  /content/gdrive/MyDrive/video_to_image/clip_001785

Error:  /content/gdrive/MyDrive/video_to_image/clip_001785/frame_0.png

Error:  /content/gdrive/MyDrive/video_to_image/clip_001785/frame_1.png

Error:  /content/gdrive/MyDrive/video_to_image/clip_001785/frame_2.png

Error:  /content/gdrive/MyDrive/video_to_image/clip_001785/frame_3.png

Error:  /content/gdrive/MyDrive/video_to_image/clip_001785/frame_4.png

Error:  /content/gdrive/MyDrive/video_to_image/clip_001785/frame_5.png

Error:  /content/gdrive/MyDrive/video_to_image/clip_001785/frame_6.png

Error:  /content/gdrive/

In [None]:
from sklearn.cluster import DBSCAN
def restore_img(name,i,instruments_list,index):
  img_path = os.path.join(folder_path, name)
  target_path_name = os.path.join(target_path, name)
  img_path_i = os.path.join(img_path, "frame_"+str(i)+".png")
  try:
    image = load_image(img_path_i)
  except:
    print()
    print("Error: ", img_path_i)
    return
  image = preprocess_img(p=1)(image=image)['image']

  dbscan = DBSCAN(eps=5, min_samples=50)

  with torch.no_grad():
    input_image = torch.unsqueeze(img_to_tensor(normalization_transform(p=1)(image=image)['image']).cuda(), dim=0)
  binary_mask = model_binary(input_image)
  binary_mask_array = binary_mask.data[0].cpu().numpy()[0]
  binary_gray_feature_map=(binary_mask_array > 0).astype(int)

  non_zero_coords = np.column_stack(np.where(binary_gray_feature_map!= 0))
  if len(non_zero_coords)==0:
    return
  dbscan_labels = dbscan.fit_predict(non_zero_coords)

  num_instruments = np.max(dbscan_labels)+1

  if num_instruments != 3:
    return

  dbscan_segmented_image,instruments_list = reassemble_label(name, dbscan_labels, non_zero_coords,instruments_list=instruments_list)
  parts_image = model_parts(input_image).squeeze()
  parts_image = parts_image.data.cpu().numpy()
  parts_image = (np.argmax(parts_image, axis=0))
  ROI = np.zeros([960,1280])
  for i in range(3):
    instrument = instruments_list[i]
    if instrument not in special_list:
      tmp = (parts_image==2).astype(int)
    else:
      tmp = ((parts_image==2) | (parts_image==3)).astype(int)
    ROI += tmp*dbscan_segmented_image[i]

  parts_image = parts_image/3*255

  target_path_i = os.path.join(target_path_name, str(index))
  os.makedirs(target_path_i,exist_ok=True)
  image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
  cv2.imwrite(os.path.join(target_path_i, 'original_image.png'), image)
  cv2.imwrite(os.path.join(target_path_i, 'instruments_image.png'), dbscan_segmented_image.sum(axis=0))
  cv2.imwrite(os.path.join(target_path_i, 'parts_image.png'), parts_image)
  cv2.imwrite(os.path.join(target_path_i, 'ROI_image.png'), ROI)
  return

In [None]:
restore_img("clip_000468",10,["bipolar forceps","cadiere forceps","monopolar curved scissors"],7)


Error:  /content/gdrive/MyDrive/video_to_image/clip_000468/frame_10.png


In [None]:
from sklearn.cluster import DBSCAN
def do_img(name,i,instruments_list,index):
  img_path = os.path.join(folder_path, name)
  target_path_name = os.path.join(target_path, name)
  img_path_i = os.path.join(img_path, "frame_"+str(i)+".png")
  try:
    image = load_image(img_path_i)
  except:
    print()
    print("Error: ", img_path_i)
    return
  image = preprocess_img(p=1)(image=image)['image']

  dbscan = DBSCAN(eps=5, min_samples=50)

  with torch.no_grad():
    input_image = torch.unsqueeze(img_to_tensor(normalization_transform(p=1)(image=image)['image']).cuda(), dim=0)
  binary_mask = model_binary(input_image)
  binary_mask_array = binary_mask.data[0].cpu().numpy()[0]
  binary_gray_feature_map=(binary_mask_array > 0).astype(int)

  non_zero_coords = np.column_stack(np.where(binary_gray_feature_map!= 0))
  if len(non_zero_coords)==0:
    return
  dbscan_labels = dbscan.fit_predict(non_zero_coords)

  num_instruments = np.max(dbscan_labels)+1

  if num_instruments != 3:
    return

  dbscan_segmented_image,instruments_list = reassemble_label(name, dbscan_labels, non_zero_coords,instruments_list=instruments_list)
  parts_image = model_parts(input_image).squeeze()
  parts_image = parts_image.data.cpu().numpy()
  parts_image = (np.argmax(parts_image, axis=0))
  ROI = np.zeros([960,1280])
  for i in range(3):
    instrument = instruments_list[i]
    if instrument not in special_list:
      tmp = (parts_image==2).astype(int)
    else:
      tmp = ((parts_image==2) | (parts_image==3)).astype(int)
    ROI += tmp*dbscan_segmented_image[i]

  parts_image = parts_image/3*255

  target_path_i = os.path.join(target_path_name, str(index))
  os.makedirs(target_path_i,exist_ok=True)
  image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
  cv2.imwrite(os.path.join(target_path_i, 'original_image.png'), image)
  cv2.imwrite(os.path.join(target_path_i, 'instruments_image.png'), dbscan_segmented_image.sum(axis=0))
  cv2.imwrite(os.path.join(target_path_i, 'parts_image.png'), parts_image)
  cv2.imwrite(os.path.join(target_path_i, 'ROI_image.png'), ROI)
  return