In [1]:
import numpy as np
import cv2
from pypylon import pylon
import time
import threading
from pymodbus.client import ModbusTcpClient

In [2]:
# 创建并打开相机
camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice())
camera.Open()

# 曝光时间（微秒）
camera.ExposureTime.Value = 12490 

# 设置帧率
camera.AcquisitionFrameRate.SetValue(1)

# 增益（dB)
camera.Gain.SetValue(10)

# 分辨率
# camera.Width.SetValue(1080)  # 设置图像宽度
# camera.Height.SetValue(720)  # 设置图像高度

# 触发模式
# camera.TriggerMode.SetValue("On")  # 启用触发模式
# camera.TriggerSource.SetValue("Line1")  # 设置触发源为 Line1

In [3]:
# hyperparameters setting
conveyer_len = 500 # 近PLC边界距离，单位mm
conveyer_rate = 50 # mm/s
calib_time = 0 # s, 校准时间
blowing_duration = 0.5 # 持续吹气时间

plc_ip = '192.168.0.100'
plc_port = 502  # PLC的Modbus端口
output_address = 0 # 吹嘴输出地址

In [4]:
def start_blowing(client, output_address, delay_time):
    """
    :param client: Modbus TCP客户端
    :param output_address: PLC中控制吹嘴的输出地址
    :param delay_time: 吹气延迟时间，单位：秒
    :param blowing_duration: 吹气持续时间，单位：秒
    """
    time.sleep(delay_time)  # 等待延迟时间
    print(f"开始吹气，延迟了{delay_time}秒，持续{blowing_duration}秒")
    
    # 启动吹气
    # client.write_coil(output_address, True)
    time.sleep(blowing_duration)  # 持续吹气指定时间
    
    # 停止吹气
    # client.write_coil(output_address, False)
    print("吹气完成，停止吹气")

import random
def defect_diagnose(image):
    """
    defection alg.
    : return is_fault: boolean, True是有故障
    : return coord_list: list, 故障胶囊中心轴距离近PLC端FOV边缘距离，mm
    """
    x = [random.randint(300,500) for _ in range(5)]
    time.sleep(5)
    return True, x

In [5]:
# 连接到PLC
# client = ModbusTcpClient(plc_ip, port=plc_port)
# if not client.connect():
#     print("无法连接到PLC")
# return

# 保持主线程运行，等待子线程执行完毕
# while True:
#     time.sleep(1)

camera.StopGrabbing()
# 启动图像采集
camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly) # 只抓取最新的一帧（未处理完的时候不采集其他图片）
# camera.StartGrabbing(pylon.GrabStrategy_OneByOne) # 处理完当前图片才再次抓取

count = 0 # 处理图像数量统计
# 获取和处理图像
while camera.IsGrabbing():
    # 获取图像帧
    grab_result = camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
    
    if grab_result.GrabSucceeded():
        image = grab_result.Array
        grab_time = time.time()

        is_fault,fault_coord = defect_diagnose(image) # 判断函数 is_fault为true代表有问题，fault_coord：list，record坐标
        process_time = time.time()

        # while is_fault:
        #     for coord in fault_coord:
        #         delay_time = (conveyer_len+coord)/conveyer_rate - (process_time-grab_time) + calib_time
        #         threading.Thread(target=start_blowing, args=(client, output_address, delay_time), daemon=True).start()
        # cv2.imshow("Processed Image", image)

        print(count)
        count += 1
        
        # 检测按键，按'ESC'键退出
        key = cv2.waitKey(1)
        if key == 27:  # ESC键
            break
    grab_result.Release()

# 关闭相机并释放资源
camera.StopGrabbing()
camera.Close()
cv2.destroyAllWindows()

2276941329
0
2276901693
1
2276907710
2
2276941196
3
2276839990
4
2276831095
5
2276823112
6
2276648336
7
2278765962
8
2278767983
9
2278768031
10
2278768651
11
2278767859
12
2278772698
13
2276843480
14


KeyboardInterrupt: 