diff --git a/depthai-core b/depthai-core index 41de567ea..038d455e6 160000 --- a/depthai-core +++ b/depthai-core @@ -1 +1 @@ -Subproject commit 41de567ea52443368224145b9e54a4f858abaeb8 +Subproject commit 038d455e6caf2d4e7e96545b061a8e2d2f34e0f9 diff --git a/docs/source/components/nodes/image_manip.rst b/docs/source/components/nodes/image_manip.rst index 20c756464..3d3b69a6b 100644 --- a/docs/source/components/nodes/image_manip.rst +++ b/docs/source/components/nodes/image_manip.rst @@ -3,6 +3,8 @@ ImageManip ImageManip node can be used to crop, rotate rectangle area or perform various image transforms: rotate, mirror, flip, perspective transform. +For downscaling, ImageManip uses the bilinear/bicubic interpolation. + How to place it ############### diff --git a/utilities/device_manager.py b/utilities/device_manager.py index 0362a6bd9..80f200bf7 100644 --- a/utilities/device_manager.py +++ b/utilities/device_manager.py @@ -5,6 +5,7 @@ import tempfile import PySimpleGUI as sg import sys +from typing import Dict import platform import os @@ -21,8 +22,6 @@ CONF_INPUT_USB = ['usbTimeout', 'usbSpeed'] USB_SPEEDS = ["UNKNOWN", "LOW", "FULL", "HIGH", "SUPER", "SUPER_PLUS"] -devices = dict() - def PrintException(): exc_type, exc_obj, tb = sys.exc_info() f = tb.tb_frame @@ -60,7 +59,7 @@ def __init__(self): [sg.Text('Flashing progress: 0.0%', key='txt')], [sg.ProgressBar(1.0, orientation='h', size=(20,20), key='progress')], ] - self.window = sg.Window("Progress", layout, modal=True, finalize=True) + self.self.window = sg.Window("Progress", layout, modal=True, finalize=True) def update(self, val): self.window['progress'].update(val) self.window['txt'].update(f'Flashing progress: {val*100:.1f}%') @@ -159,110 +158,7 @@ def wait(self) -> dai.DeviceInfo: self.window.close() return deviceSelected -def unlockConfig(window, devType): - if devType == "POE": - for el in CONF_INPUT_POE: - window[el].update(disabled=False) - for el in CONF_TEXT_POE: - window[el].update(text_color="black") - else: - for el in CONF_INPUT_USB: - window[el].update(disabled=False) - for el in CONF_TEXT_USB: - window[el].update(text_color="black") - - window['Flash newest Bootloader'].update(disabled=False) - window['Flash configuration'].update(disabled=False) - window['Factory reset'].update(disabled=False) - # window['Clear flash'].update(disabled=False) - window['Flash DAP'].update(disabled=False) - window['recoveryMode'].update(disabled=False) - -def lockConfig(window): - for conf in [CONF_INPUT_POE, CONF_INPUT_USB]: - for el in conf: - window[el].update(disabled=True) - window[el].update("") - for conf in [CONF_TEXT_POE, CONF_TEXT_USB]: - for el in conf: - window[el].update(text_color="gray") - - window['Flash newest Bootloader'].update(disabled=True) - window['Flash configuration'].update(disabled=True) - window['Factory reset'].update(disabled=True) - window['Clear flash'].update(disabled=True) - window['Flash DAP'].update(disabled=True) - window['recoveryMode'].update(disabled=True) - - window.Element('devName').update("-name-") - window.Element('devNameConf').update("") - window.Element('newBoot').update("-version-") - window.Element('currBoot').update("-version-") - window.Element('version').update("-version-") - window.Element('commit').update("-version-") - window.Element('devState').update("-state-") - -def getDevices(window): - try: - listedDevices = [] - devices.clear() - deviceInfos = dai.XLinkConnection.getAllConnectedDevices() - if not deviceInfos: - window.Element('devices').update("No devices") - sg.Popup("No devices found.") - else: - for deviceInfo in deviceInfos: - deviceTxt = deviceInfo.getMxId() - listedDevices.append(deviceTxt) - devices[deviceTxt] = deviceInfo - window.Element('devices').update("Select device", values=listedDevices) - except Exception as ex: - PrintException() - sg.Popup(f'{ex}') - -def getConfigs(window, bl: dai.DeviceBootloader, devType, device: dai.DeviceInfo): - try: - conf = bl.readConfig() - except: - conf = dai.DeviceBootloader.Config() - - try: - if devType == "POE": - window.Element('ip').update(conf.getIPv4()) - window.Element('mask').update(conf.getIPv4Mask()) - window.Element('gateway').update(conf.getIPv4Gateway()) - window.Element('dns').update(conf.getDnsIPv4()) - window.Element('dnsAlt').update(conf.getDnsAltIPv4()) - window.Element('networkTimeout').update(int(conf.getNetworkTimeout().total_seconds() * 1000)) - window.Element('mac').update(conf.getMacAddress()) - for el in CONF_INPUT_USB: - window[el].update("") - else: - for el in CONF_INPUT_POE: - window[el].update("") - window.Element('usbTimeout').update(int(conf.getUsbTimeout().total_seconds() * 1000)) - window.Element('usbSpeed').update(str(conf.getUsbMaxSpeed()).split('.')[1]) - - window.Element('devName').update(device.name) - window.Element('devNameConf').update(device.getMxId()) - window.Element('newBoot').update(dai.DeviceBootloader.getEmbeddedBootloaderVersion()) - - # The "isEmbeddedVersion" tells you whether BL had to be booted, - # or we connected to an already flashed Bootloader. - if bl.isEmbeddedVersion(): - window.Element('currBoot').update('Not Flashed') - else: - window.Element('currBoot').update(bl.getVersion()) - - window.Element('version').update(dai.__version__) - window.Element('commit').update(dai.__commit__) - window.Element('devState').update(deviceStateTxt(device.state)) - except Exception as ex: - PrintException() - sg.Popup(f'{ex}') - -def flashBootloader(window, device: dai.DeviceInfo): - # FIXME - to flash bootloader, boot the same device again (from saved device info) but with allowFlashingBootloader = True +def flashBootloader(device: dai.DeviceInfo): try: sel = SelectBootloader(['AUTO', 'USB', 'NETWORK'], "Select bootloader type to flash.") ok, type = sel.wait() @@ -277,52 +173,12 @@ def flashBootloader(window, device: dai.DeviceInfo): if type == dai.DeviceBootloader.Type.AUTO: type = bl.getType() bl.flashBootloader(memory=dai.DeviceBootloader.Memory.FLASH, type=type, progressCallback=progress) - window.Element('currBoot').update(bl.getVersion()) pr.finish("Flashed newest bootloader version.") except Exception as ex: PrintException() sg.Popup(f'{ex}') -def flashConfig(values, device: dai.DeviceInfo, devType: str, staticIp: bool): - try: - bl = dai.DeviceBootloader(device, True) - conf = dai.DeviceBootloader.Config() - if devType == "POE": - if staticIp: - if check_ip(values['ip']) and check_ip(values['mask']) and check_ip(values['gateway']): - conf.setStaticIPv4(values['ip'], values['mask'], values['gateway']) - else: - if check_ip(values['ip']) and check_ip(values['mask']) and check_ip(values['gateway']): - conf.setDynamicIPv4(values['ip'], values['mask'], values['gateway']) - if values['dns'] != "" and values['dnsAlt'] != "": - conf.setDnsIPv4(values['dns'], values['dnsAlt']) - if values['networkTimeout'] != "": - if int(values['networkTimeout']) >= 0: - conf.setNetworkTimeout(timedelta(seconds=int(values['networkTimeout']) / 1000)) - else: - sg.Popup("Values can not be negative!") - if values['mac'] != "": - if check_mac(values['mac']): - conf.setMacAddress(values['mac']) - else: - if values['usbTimeout'] != "": - if int(values['usbTimeout']) >= 0: - conf.setUsbTimeout(timedelta(seconds=int(values['usbTimeout']) / 1000)) - else: - sg.Popup("Values can not be negative!") - if values['usbSpeed'] != "": - conf.setUsbMaxSpeed(getattr(dai.UsbSpeed, values['usbSpeed'])) - - success, error = bl.flashConfig(conf) - if not success: - sg.Popup(f"Flashing failed: {error}") - else: - sg.Popup("Flashing successful.") - except Exception as ex: - PrintException() - sg.Popup(f'{ex}') - -def factoryReset(device: dai.DeviceInfo): +def factoryReset(bl: dai.DeviceBootloader): sel = SelectBootloader(['USB', 'NETWORK'], "Select bootloader type used for factory reset.") ok, type = sel.wait() if not ok: @@ -334,7 +190,6 @@ def factoryReset(device: dai.DeviceInfo): # Clear 1 MiB for USB BL and 8 MiB for NETWORK BL mib = 1 if type == dai.DeviceBootloader.Type.USB else 8 blBinary = blBinary + ([0xFF] * ((mib * 1024 * 1024 + 512) - len(blBinary))) - bl = dai.DeviceBootloader(device, True) tmpBlFw = tempfile.NamedTemporaryFile(delete=False) tmpBlFw.write(bytes(blBinary)) @@ -348,19 +203,8 @@ def factoryReset(device: dai.DeviceInfo): PrintException() sg.Popup(f'{ex}') -def getDeviceType(bl: dai.DeviceBootloader) -> str: +def flashFromFile(file, bl: dai.DeviceBootloader): try: - if bl.getType() == dai.DeviceBootloader.Type.NETWORK: - return "POE" - else: - return "USB" - except Exception as ex: - PrintException() - sg.Popup(f'{ex}') - -def flashFromFile(file, device: dai.DeviceInfo): - try: - bl = dai.DeviceBootloader(device, True) if str(file)[-3:] == "dap": bl.flashDepthaiApplicationPackage(file) else: @@ -369,9 +213,8 @@ def flashFromFile(file, device: dai.DeviceInfo): PrintException() sg.Popup(f'{ex}') -def flashFromUsb(device: dai.DeviceInfo): +def recoveryMode(bl: dai.DeviceBootloader): try: - bl = dai.DeviceBootloader(device, True) bl.bootUsbRomBootloader() except Exception as ex: PrintException() @@ -517,93 +360,280 @@ def deviceStateTxt(state: dai.XLinkDeviceState) -> str: ] ] -devType = "" -bl = None - -window = sg.Window(title="Device Manager", - layout=layout, - size=(645, 380), - finalize=True # So we can do First search for devices - ) - -# First device search -getDevices(window) - -while True: - event, values = window.read() - if event == sg.WIN_CLOSED: - break - dev = values['devices'] - - if event == "devices": - if dev != "Select device": - # "allow flashing bootloader" boots latest bootloader first - # which makes the information of current bootloader, etc.. not correct (can be checked by "isEmbeddedVersion") - # So leave it to false, uncomment the isEmbeddedVersion below and only boot into latest bootlaoder upon the request to flash new bootloader - # bl = dai.DeviceBootloader(devices[values['devices']], False) - device = devices[values['devices']] - if deviceStateTxt(device.state) == "BOOTED": - # device is already booted somewhere else - sg.Popup("Device is already booted somewhere else!") + +class DeviceManager: + devices: Dict[str, dai.DeviceInfo] = dict() + values: Dict = None + + def __init__(self) -> None: + self.window = sg.Window(title="Device Manager", + icon="assets/icon.png", + layout=layout, + size=(645, 380), + finalize=True # So we can do First search for devices + ) + + self.bl: dai.DeviceBootloader = None + + # First device search + self.getDevices() + + def isPoE(self) -> bool: + try: + return self.bl.getType() == dai.DeviceBootloader.Type.NETWORK + except Exception as ex: + PrintException() + sg.Popup(f'{ex}') + + def isUsb(self) -> bool: + return not self.isPoE() + + def run(self) -> None: + while True: + event, self.values = self.window.read() + if event == sg.WIN_CLOSED: + break + dev = self.values['devices'] + + if event == "devices": + if dev != "Select device": + # "allow flashing bootloader" boots latest bootloader first + # which makes the information of current bootloader, etc.. not correct (can be checked by "isEmbeddedVersion") + # So leave it to false, uncomment the isEmbeddedVersion below and only boot into latest bootlaoder upon the request to flash new bootloader + # bl = dai.DeviceBootloader(devices[values['devices']], False) + device = self.device + if deviceStateTxt(device.state) == "BOOTED": + # device is already booted somewhere else + sg.Popup("Device is already booted somewhere else!") + else: + self.resetGui() + self.bl = connectToDevice(device) + if self.bl is None: continue + self.getConfigs() + self.unlockConfig() + else: + self.window.Element('progress').update("No device selected.") + elif event == "Search": + self.getDevices() # Re-search devices for dropdown + selDev = SearchDevice() + di = selDev.wait() + if di is not None: + self.resetGui() + self.addDeviceInfo(di) # Add device info to devices, if it's not there yet + self.window.Element('devices').update(di.getMxId()) + _, self.values = self.window.read(1) + self.bl = connectToDevice(di) + print('after connecting to device,', self.bl) + if self.bl is None: continue + self.getConfigs() + self.unlockConfig() + elif event == "Specify IP": + select = SelectIP() + ok, ip = select.wait() + if ok: + self.resetGui() + di = dai.DeviceInfo(ip) + di.state = dai.XLinkDeviceState.X_LINK_BOOTLOADER + di.protocol = dai.XLinkProtocol.X_LINK_TCP_IP + self.devices[ip] = di # Add to devices dict + self.window.Element('devices').update(ip) # Show to user + _, self.values = self.window.read(1) + self.bl = connectToDevice(di) + if self.bl is None: continue + self.getConfigs() + self.unlockConfig() + elif event == "Flash newest Bootloader": + self.closeDevice() # We will reconnect, as we need to set allowFlashingBootloader to True + flashBootloader(self.device) + self.window.Element('currBoot').update(self.bl.getVersion()) + elif event == "Flash configuration": + self.flashConfig() + self.getConfigs() + self.resetGui() + if self.isUsb(): + self.unlockConfig() + else: + self.devices.clear() + self.window.Element('devices').update("Search for devices", values=[]) + elif event == "Factory reset": + factoryReset(self.bl) + elif event == "Flash DAP": + file = sg.popup_get_file("Select .dap file", file_types=(('DepthAI Application Package', '*.dap'), ('All Files', '*.* *'))) + flashFromFile(file, self.bl) + elif event == "configReal": + self.window['-COL1-'].update(visible=False) + self.window['-COL2-'].update(visible=True) + elif event == "aboutReal": + self.window['-COL2-'].update(visible=False) + self.window['-COL1-'].update(visible=True) + elif event == "recoveryMode": + recoveryMode(self.bl) + self.window.close() + + @property + def device(self) -> dai.DeviceInfo: + """ + Get selected device + """ + return self.devices[self.values['devices']] + + def addDeviceInfo(self, deviceInfo: dai.DeviceInfo): + if deviceInfo.getMxId() not in self.devices: + # Add the new deviceInfo + self.devices[deviceInfo.getMxId()] = deviceInfo + + def getConfigs(self): + device = self.device + try: + conf = self.bl.readConfig() + except: + conf = dai.DeviceBootloader.Config() + + try: + if self.isPoE(): + self.window.Element('ip').update(conf.getIPv4()) + self.window.Element('mask').update(conf.getIPv4Mask()) + self.window.Element('gateway').update(conf.getIPv4Gateway()) + self.window.Element('dns').update(conf.getDnsIPv4()) + self.window.Element('dnsAlt').update(conf.getDnsAltIPv4()) + self.window.Element('networkTimeout').update(int(conf.getNetworkTimeout().total_seconds() * 1000)) + self.window.Element('mac').update(conf.getMacAddress()) + for el in CONF_INPUT_USB: + self.window[el].update("") else: - bl = connectToDevice(device) - devType = getDeviceType(bl) - getConfigs(window, bl, devType, device) - unlockConfig(window, devType) - else: - window.Element('progress').update("No device selected.") - elif event == "Search": - getDevices(window) # Re-search devices for dropdown - lockConfig(window) - selDev = SearchDevice() - di = selDev.wait() - if di is not None: - window.Element('devices').update(di.getMxId()) - bl = connectToDevice(di) - devType = getDeviceType(bl) - getConfigs(window, bl, devType, di) - unlockConfig(window, devType) - elif event == "Specify IP": - select = SelectIP() - ok, ip = select.wait() - if ok: - di = dai.DeviceInfo(ip) - di.state = dai.XLinkDeviceState.X_LINK_BOOTLOADER - di.protocol = dai.XLinkProtocol.X_LINK_TCP_IP - devices[ip] = di # Add to devices dict - window.Element('devices').update(ip) # Show to user - bl = connectToDevice(di) - devType = getDeviceType(bl) - getConfigs(window, bl, devType, di) - unlockConfig(window, devType) - elif event == "Flash newest Bootloader": - bl.close() - flashBootloader(window, devices[values['devices']]) - elif event == "Flash configuration": - bl.close() - flashConfig(values, devices[values['devices']], devType, values['staticBut']) - bl = connectToDevice(devices[values['devices']]) - getConfigs(window, bl, devType, devices[values['devices']]) - lockConfig(window) - if devType != "POE": - unlockConfig(window, devType) + for el in CONF_INPUT_POE: + self.window[el].update("") + self.window.Element('usbTimeout').update(int(conf.getUsbTimeout().total_seconds() * 1000)) + self.window.Element('usbSpeed').update(str(conf.getUsbMaxSpeed()).split('.')[1]) + + self.window.Element('devName').update(device.name) + self.window.Element('devNameConf').update(device.getMxId()) + self.window.Element('newBoot').update(dai.DeviceBootloader.getEmbeddedBootloaderVersion()) + + # The "isEmbeddedVersion" tells you whether BL had to be booted, + # or we connected to an already flashed Bootloader. + if self.bl.isEmbeddedVersion(): + self.window.Element('currBoot').update('Not Flashed') + else: + self.window.Element('currBoot').update(self.bl.getVersion()) + + self.window.Element('version').update(dai.__version__) + self.window.Element('commit').update(dai.__commit__) + self.window.Element('devState').update(deviceStateTxt(self.device.state)) + except Exception as ex: + PrintException() + sg.Popup(f'{ex}') + + def unlockConfig(self): + if self.bl is None: return + + if self.isPoE(): + for el in CONF_INPUT_POE: + self.window[el].update(disabled=False) + for el in CONF_TEXT_POE: + self.window[el].update(text_color="black") else: - devices.clear() - window.Element('devices').update("Search for devices", values=[]) - elif event == "Factory reset": - bl.close() - factoryReset(devices[values['devices']]) - elif event == "Flash DAP": - file = sg.popup_get_file("Select .dap file", file_types=(('DepthAI Application Package', '*.dap'), ('All Files', '*.* *'))) - bl = None - flashFromFile(file, devices[values['devices']]) - elif event == "configReal": - window['-COL1-'].update(visible=False) - window['-COL2-'].update(visible=True) - elif event == "aboutReal": - window['-COL2-'].update(visible=False) - window['-COL1-'].update(visible=True) - elif event == "recoveryMode": - bl = None - flashFromUsb(devices[values['devices']]) -window.close() + for el in CONF_INPUT_USB: + self.window[el].update(disabled=False) + for el in CONF_TEXT_USB: + self.window[el].update(text_color="black") + + self.window['Flash newest Bootloader'].update(disabled=False) + self.window['Flash configuration'].update(disabled=False) + self.window['Factory reset'].update(disabled=False) + # self.window['Clear flash'].update(disabled=False) + self.window['Flash DAP'].update(disabled=False) + self.window['recoveryMode'].update(disabled=False) + + def resetGui(self): + """ + Reset GUI to its original state + """ + for conf in [CONF_INPUT_POE, CONF_INPUT_USB]: + for el in conf: + self.window[el].update(disabled=True) + self.window[el].update("") + for conf in [CONF_TEXT_POE, CONF_TEXT_USB]: + for el in conf: + self.window[el].update(text_color="gray") + + self.window['Flash newest Bootloader'].update(disabled=True) + self.window['Flash configuration'].update(disabled=True) + self.window['Factory reset'].update(disabled=True) + self.window['Clear flash'].update(disabled=True) + self.window['Flash DAP'].update(disabled=True) + self.window['recoveryMode'].update(disabled=True) + + self.window.Element('devName').update("-name-") + self.window.Element('devNameConf').update("") + self.window.Element('newBoot').update("-version-") + self.window.Element('currBoot').update("-version-") + self.window.Element('version').update("-version-") + self.window.Element('commit').update("-version-") + self.window.Element('devState').update("-state-") + + def closeDevice(self): + if self.bl is not None: + self.bl.close() + self.bl = None + + def getDevices(self): + self.closeDevice() # If we are searching for new devices, first disconnect from the current one + try: + listedDevices = [] + self.devices.clear() + deviceInfos = dai.XLinkConnection.getAllConnectedDevices() + if not deviceInfos: + self.window.Element('devices').update("No devices") + sg.Popup("No devices found.") + else: + for deviceInfo in deviceInfos: + deviceTxt = deviceInfo.getMxId() + listedDevices.append(deviceTxt) + self.devices[deviceTxt] = deviceInfo + self.window.Element('devices').update("Select device", values=listedDevices) + except Exception as ex: + PrintException() + sg.Popup(f'{ex}') + + def flashConfig(self): + values = self.values + try: + conf = dai.DeviceBootloader.Config() + if self.isPoE: + if self.values['staticBut']: + if check_ip(values['ip']) and check_ip(values['mask']) and check_ip(values['gateway']): + conf.setStaticIPv4(values['ip'], values['mask'], values['gateway']) + else: + if check_ip(values['ip']) and check_ip(values['mask']) and check_ip(values['gateway']): + conf.setDynamicIPv4(values['ip'], values['mask'], values['gateway']) + if values['dns'] != "" and values['dnsAlt'] != "": + conf.setDnsIPv4(values['dns'], values['dnsAlt']) + if values['networkTimeout'] != "": + if int(values['networkTimeout']) >= 0: + conf.setNetworkTimeout(timedelta(seconds=int(values['networkTimeout']) / 1000)) + else: + sg.Popup("Values can not be negative!") + if values['mac'] != "": + if check_mac(values['mac']): + conf.setMacAddress(values['mac']) + else: + if values['usbTimeout'] != "": + if int(values['usbTimeout']) >= 0: + conf.setUsbTimeout(timedelta(seconds=int(values['usbTimeout']) / 1000)) + else: + sg.Popup("Values can not be negative!") + if values['usbSpeed'] != "": + conf.setUsbMaxSpeed(getattr(dai.UsbSpeed, values['usbSpeed'])) + + success, error = self.bl.flashConfig(conf) + if not success: + sg.Popup(f"Flashing failed: {error}") + else: + sg.Popup("Flashing successful.") + except Exception as ex: + PrintException() + sg.Popup(f'{ex}') + +app = DeviceManager() +app.run() diff --git a/utilities/requirements.txt b/utilities/requirements.txt index ef2dc7bf6..cbcfc70f7 100644 --- a/utilities/requirements.txt +++ b/utilities/requirements.txt @@ -1 +1 @@ -PySimpleGUI==4.57.0 \ No newline at end of file +PySimpleGUI==4.60.3 \ No newline at end of file