In [None]:
import ctypes
import time
import struct
import time
import datetime

In [None]:

# 載入 CH375 DLL
ch375dll = ctypes.WinDLL("CH375DLL64.dll")

# 常數
DEVICE_INDEX = 0
ENDPOINT_IN = 1
BUF_SIZE = 8*1024*1024  # 2MB

# 函式對應定義
ch375dll.CH375OpenDevice.argtypes = [ctypes.c_ulong]
ch375dll.CH375OpenDevice.restype = ctypes.c_void_p

ch375dll.CH375CloseDevice.argtypes = [ctypes.c_ulong]

ch375dll.CH375ReadEndP.argtypes = [ctypes.c_ulong, ctypes.c_ulong, ctypes.c_void_p, ctypes.POINTER(ctypes.c_ulong)]
ch375dll.CH375ReadEndP.restype = ctypes.c_bool

def open_device():
    handle = ch375dll.CH375OpenDevice(DEVICE_INDEX)
    if handle == 0 or handle == ctypes.c_void_p(-1).value:
        print("CH375OpenDevice failed.")
        return None
    print("CH375OpenDevice success.")
    return handle

def close_device():
    ch375dll.CH375CloseDevice(DEVICE_INDEX)

def read_endpoint(ep: int, buffer_len: int = BUF_SIZE):
    read_buffer = (ctypes.c_ubyte * buffer_len)()
    length = ctypes.c_ulong(buffer_len)
    success = ch375dll.CH375ReadEndP(DEVICE_INDEX, ep, read_buffer, ctypes.byref(length))
    return success, bytes(read_buffer[:length.value]), length.value


In [3]:

def clear_endpoint_buffer(ep: int = ENDPOINT_IN, max_clear_bytes: int = 2 * 1024 * 1024):
    handle = open_device()
    if handle is None:
        return
    
    print("Clearing endpoint buffer...")
    cleared = 0
    while cleared < max_clear_bytes:
        chunk_size = min(1024 * 1024, max_clear_bytes - cleared)
        success, data, length = read_endpoint(ep, chunk_size)
        if not success or length == 0:
            break
        cleared += length
        print(f"Cleared {length} bytes, total: {cleared}/{max_clear_bytes}")
    print(f"Done clearing. Total cleared: {cleared} bytes.")


def read_and_save_exact_8mb(filename: str, total_size: int = BUF_SIZE):
    handle = open_device()
    if handle is None:
        return

    print(f"Start reading exactly {total_size} bytes...")
    received = 0
    all_data = bytearray()

    while received < total_size:
        remaining = total_size - received
        chunk_size = min(total_size, remaining)  # 每次最多讀 64KB，可依裝置調整
        success, data, length = read_endpoint(ENDPOINT_IN, chunk_size)
        if not success:
            print("CH375ReadEndP error.")
            break

        all_data.extend(data[:length])
        received += length
        print(f"Received {length} bytes, total: {received}/{total_size}")

    if received == total_size:
        
        with open(filename, "wb") as f:
            f.write(all_data)
        print(f"✅ Saved {received} bytes to {filename}")
    else:
        print(f"❌ Failed to receive full 8MB, only got {received} bytes")

    close_device()

# input("press enter to clear")
clear_endpoint_buffer()
input("press enter to read")
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
filename = f"output_{timestamp}.bin"
# filename = "output.bin"
read_and_save_exact_8mb(f"./data/{filename}" , total_size=8*1024*1024)
print("finish !")

KeyboardInterrupt: 

In [None]:

# import struct
# file_path = './data/output.bin'

# with open(file_path, 'rb') as f:
#     data = f.read()
# print(len(data) , type(data))
# def parse_bytes_to_uint_list(data_bytes):
#     result = []
#     for i in range(0, len(data_bytes), 4):
#         chunk = data_bytes[i:i+4]
#         if len(chunk) < 4:
#             break  # 不足4 byte 就略過
#         value = struct.unpack('<I', chunk)[0]
#         result.append(value)
#     return result




# # data_bytes = b'\x01\x00\x00\x00\x02\x00\x00\x00' * 2
# list = parse_bytes_to_uint_list(data)
# list

# err = 0
# for i in range(len(list)-1):
#     if i!=0 and list[i]+1 != list[i+1]:
#         err = err +1
#         print(f"err i: {i}, {i+1} {list[i]:08x} {list[i+1]:08x}")
        
        
# print(f"err {err}")



