In [1]:
import cv2
import os
import numpy as np
import matplotlib.pyplot as plt
from io import BytesIO
from loguru import logger
import json

input_folder_path = r'E:\record\Drop Impact on Rough Surfaces_1\Drop Impact on Rough Surfaces_1\result\S1-W-18G-30cm-R-1_C001H001S0001'
output_label_path = './output/label'
output_feature_path = './output/feature'
output_data_path = './output/data'

In [2]:
dataset_dirs = []


def list_directories(path):
    directories = [d for d in os.listdir(path)
                   if os.path.isdir(os.path.join(path, d))]
    return directories


dataset_dirs = list_directories(input_folder_path)
len(dataset_dirs)

0

In [3]:
surface_type_keys = ['S1', 'S2', 'S3', 'S4', 'S5', 'S6']
surface_type_vals = [(i+1) / (len(surface_type_keys)+1) for i in range(len(surface_type_keys))]
surface_type_dict = dict(zip(surface_type_keys, surface_type_vals))

fall_point_type_keys = ['OG1', 'OG', 'OG2', 'OR1', 'OR', 'OR2', 'C', 'R', 'G']

fall_point_type_vals = [(i+1) / (len(fall_point_type_keys)+1) for i in range(len(fall_point_type_keys))]
fall_point_type_dict = dict(zip(fall_point_type_keys, fall_point_type_vals))

liquid_type_keys = ['W']
liquid_type_vals = [(i+1) / (len(fall_point_type_keys)+1)
                    for i in range(len(fall_point_type_keys))]
liquid_type_dict = dict(zip(liquid_type_keys, liquid_type_vals))

surface_type_dict, fall_point_type_dict, liquid_type_dict

({'S1': 0.14285714285714285,
  'S2': 0.2857142857142857,
  'S3': 0.42857142857142855,
  'S4': 0.5714285714285714,
  'S5': 0.7142857142857143,
  'S6': 0.8571428571428571},
 {'OG1': 0.1,
  'OG': 0.2,
  'OG2': 0.3,
  'OR1': 0.4,
  'OR': 0.5,
  'OR2': 0.6,
  'C': 0.7,
  'R': 0.8,
  'G': 0.9},
 {'W': 0.1})

In [4]:
frame_id_range = [0, 40]
height_range = [10, 40]
diameter_range = [18, 25]
sm1_range = [3, 7]

In [5]:
def get_smooth1_data(surface_type):
    sm1 =  6.2

    if surface_type == 'S1': sm1 = 3.914
    elif surface_type == 'S2': sm1 = 7.5
    
    return (sm1 - sm1_range[0]) / (sm1_range[1] - sm1_range[0])

def get_delta_elevation(surface_type):
    delta_elevation = 0

    if surface_type == 'S4': delta_elevation = 0.33
    elif surface_type == 'S5': delta_elevation = 0.656
    elif surface_type == 'S6': delta_elevation = 1

    return delta_elevation

In [6]:
def analyze_conditions(path, frame_id):
    label = path.split('\\')[-1]
    seg = label.split('-')
    data = {}
    if len(seg) == 6:
        data['surface_type'] = surface_type_dict[seg[0]]
        data['liquid_type'] = liquid_type_dict[seg[1]]
        data['diameter'] = (int(seg[2][:2]) - diameter_range[0]) / (diameter_range[1] - diameter_range[0])
        data['height']  = (int(seg[3][:2]) - height_range[0]) / (height_range[1] - height_range[0])
        data['fall_point_type'] = fall_point_type_dict[seg[4]]
        data['frame_id'] = (frame_id - frame_id_range[0]) / (frame_id_range[1] - frame_id_range[0])
        data['sm1'] = get_smooth1_data(seg[0])
        data['sm2'] = 0.429
        data['delta_elevation'] = get_delta_elevation(seg[0])

    return data

In [7]:
def count_jpg_files(folder_path):
    jpg_files = [f for f in os.listdir(folder_path)
                 if f.lower().endswith('.jpg')]
    return len(jpg_files)

In [8]:
def exec_one_folder(input_path, use_index_or_filename=True):
	def list_img_files():
		files = os.listdir(input_path)
		jpg_files = sorted([file for file in files if file.endswith('jpg') and file.startswith('raw')])
		return jpg_files
	img_files = list_img_files()
	logger.success(f'{input_path} listed, {len(img_files)} imgs found.')

	base_idx = count_jpg_files(output_feature_path)
	start_idx = 0
	cnt = 0
	for idx, path in enumerate(img_files):
		frame_id = int(path[-10:-4])
		if idx == 0 : start_idx = frame_id
		output_img = cv2.imread(f'{input_path}/{img_files[idx]}')
	
		data = analyze_conditions(input_path, frame_id - start_idx)
		values = list(data.values())
		pts = [[i, i] for i in range(len(values))] 
		matrix = np.zeros((len(values), len(values)))
		for value, pt in zip(values, pts):
			matrix[pt[0], pt[1]] = value
		plt.imshow(matrix, cmap='viridis', interpolation='nearest')
		plt.axis('off')
		
		buf = BytesIO()
		plt.savefig(buf, format='jpg', bbox_inches='tight', pad_inches=0, dpi=300)
		plt.close()
		buf.seek(0)
		input_img = np.frombuffer(buf.getvalue(), dtype=np.int8)
		input_img = cv2.imdecode(input_img, cv2.IMREAD_COLOR)
		height, width = output_img.shape[:2]
		input_img = cv2.resize(input_img, (width, height))

		if use_index_or_filename:
			real_idx = base_idx + cnt
			cv2.imwrite(f'{output_feature_path}/{real_idx}.jpg', input_img)
			cv2.imwrite(f'{output_label_path}/{real_idx}.jpg', output_img)
			with open(f'{output_data_path}/{real_idx}.json', 'w', encoding='utf-8') as fp:
				json.dump(data, fp, ensure_ascii=False, indent=4)
		else:
			cv2.imwrite(f'{output_feature_path}/{img_files[idx]}.jpg', input_img)
			cv2.imwrite(f'{output_label_path}/{img_files[idx]}.jpg', output_img)
			with open(f'{output_data_path}/{img_files[idx]}.json', 'w', encoding='utf-8') as fp:
				json.dump(data, fp, ensure_ascii=False, indent=4)
		
		logger.success(f'Finish handling {input_path} / {idx}.')
		cnt += 1

In [9]:
exec_one_folder(input_path=input_folder_path, use_index_or_filename=False)

[32m2025-05-05 22:00:59.702[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mexec_one_folder[0m:[36m7[0m - [32m[1mE:\record\Drop Impact on Rough Surfaces_1\Drop Impact on Rough Surfaces_1\result\S1-W-18G-30cm-R-1_C001H001S0001 listed, 22 imgs found.[0m
[32m2025-05-05 22:00:59.827[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mexec_one_folder[0m:[36m47[0m - [32m[1mFinish handling E:\record\Drop Impact on Rough Surfaces_1\Drop Impact on Rough Surfaces_1\result\S1-W-18G-30cm-R-1_C001H001S0001 / 0.[0m
[32m2025-05-05 22:00:59.930[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mexec_one_folder[0m:[36m47[0m - [32m[1mFinish handling E:\record\Drop Impact on Rough Surfaces_1\Drop Impact on Rough Surfaces_1\result\S1-W-18G-30cm-R-1_C001H001S0001 / 1.[0m
[32m2025-05-05 22:01:00.029[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mexec_one_folder[0m:[36m47[0m - [32m[1mFinish handling E:\record\Drop Impact on Rough Surfaces_1\Drop Impact on Rough Surfac

In [13]:
for dataset_dir in dataset_dirs:
	input_path = os.path.join(input_folder_path, dataset_dir)
	exec_one_folder(input_path=input_path)
	logger.success(f"Finish handling {input_path}")

[32m2025-05-05 21:32:46.265[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mexec_one_folder[0m:[36m7[0m - [32m[1mE:\record\Drop Impact on Rough Surfaces_1\Drop Impact on Rough Surfaces_1\result\S1-W-18G-30cm-C-1_C001H001S0001 listed, 17 imgs found.[0m
[32m2025-05-05 21:32:46.648[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mexec_one_folder[0m:[36m47[0m - [32m[1mFinish handling E:\record\Drop Impact on Rough Surfaces_1\Drop Impact on Rough Surfaces_1\result\S1-W-18G-30cm-C-1_C001H001S0001 / 0.[0m
[32m2025-05-05 21:32:46.778[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mexec_one_folder[0m:[36m47[0m - [32m[1mFinish handling E:\record\Drop Impact on Rough Surfaces_1\Drop Impact on Rough Surfaces_1\result\S1-W-18G-30cm-C-1_C001H001S0001 / 1.[0m
[32m2025-05-05 21:32:46.900[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mexec_one_folder[0m:[36m47[0m - [32m[1mFinish handling E:\record\Drop Impact on Rough Surfaces_1\Drop Impact on Rough Surfac

In [10]:
# split train/valid/test
import os
import random
import shutil

# 定义源文件夹路径和目标文件夹路径
source_folder = './output/feature'
train_folder = './output/feature/train'
val_folder = './output/feature/valid'
test_folder = './output/feature/test'

# 创建目标文件夹（如果不存在）
os.makedirs(train_folder, exist_ok=True)
os.makedirs(val_folder, exist_ok=True)
os.makedirs(test_folder, exist_ok=True)

images = [f for f in os.listdir(source_folder) if os.path.isfile(os.path.join(source_folder, f))]

random.shuffle(images)

# 定义训练集，验证集，测试集比例
train_ratio = 0.0
val_ratio = 0.0

# 80% train + 10% valid + 10% test
train_size = int(train_ratio * len(images))
val_size = int(val_ratio * len(images))
test_size = len(images) - train_size - val_size

train_set = images[:train_size]
val_set = images[train_size:train_size + val_size]
test_set = images[train_size + val_size:]

def move_files(file_list, destination_folder):
    for file_name in file_list:
        src_path = os.path.join(source_folder, file_name)
        dst_path = os.path.join(destination_folder, file_name)
        shutil.move(src_path, dst_path)

move_files(train_set, train_folder)
move_files(val_set, val_folder)
move_files(test_set, test_folder)

print(f"train set: {len(train_set)} imgs")
print(f"valid set: {len(val_set)} imgs")
print(f"test set: {len(test_set)} imgs")

train set: 0 imgs
valid set: 0 imgs
test set: 22 imgs


In [11]:
# split train/valid/test
import os
import random
import shutil

# 定义源文件夹路径和目标文件夹路径
source_folder = './output/label'
train_folder = './output/label/train'
val_folder = './output/label/valid'
test_folder = './output/label/test'

# 创建目标文件夹（如果不存在）
os.makedirs(train_folder, exist_ok=True)
os.makedirs(val_folder, exist_ok=True)
os.makedirs(test_folder, exist_ok=True)

images = [f for f in os.listdir(source_folder) if os.path.isfile(os.path.join(source_folder, f))]

def move_files(file_list, destination_folder):
    for file_name in file_list:
        src_path = os.path.join(source_folder, file_name)
        dst_path = os.path.join(destination_folder, file_name)
        shutil.move(src_path, dst_path)

move_files(train_set, train_folder)
move_files(val_set, val_folder)
move_files(test_set, test_folder)

print(f"train set: {len(train_set)} imgs")
print(f"valid set: {len(val_set)} imgs")
print(f"test set: {len(test_set)} imgs")

train set: 0 imgs
valid set: 0 imgs
test set: 22 imgs
