In [2]:
from typing import Any
import usbtmc
import numpy as np
import time

class Oscilloscope:
    def __init__(self, vendor_id, product_id, timeout=2):
        """
        Initialize the oscilloscope instrument.

        :param vendor_id: The vendor ID of the oscilloscope.
        :param product_id: The product ID of the oscilloscope.
        :param timeout: Communication timeout in seconds.
        """
        self.state=False
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_value, traceback):
        pass
        # super().__exit__(exc_type, exc_value, traceback)
    
    def __getattr__(self, name: str) -> Any:
        # specific procedures to do before returning variable
        return name
    

    def fetch_x_data(self):
        """
        Fetch X-axis data (time data) from the oscilloscope.

        :return: Numpy array of X-axis data (time points).
        """
        return 'x_data'

    def fetch_y_data(self, channel=1):
        """
        Fetch Y-axis data (voltage data) from the oscilloscope for a specified channel.

        :param channel: The channel number to fetch data from (default is 1).
        :return: Numpy array of Y-axis data (voltage points).
        """
        return 'y_data'

    def triggered(self):
        return self.state

# Example usage:
# osc = Oscilloscope(vendor_id=0x1234, product_id=0x5678)
# x_data = osc.fetch_x_data()
# y_data = osc.fetch_y_data(channel=1)
# osc.close()


In [36]:
from multiprocess import Queue, Process, Event


class OscilloscopeProcessManager(Process):
    def __init__(self, vendor_id=0x0957, product_id=0x900d) -> None:
        super().__init__()
        self.task_queue = Queue()
        self.data_queue = Queue()

        # TODO: Create a pipe for __getattr__ method passthrough
        self.parent_connection, self.child_connection 

        self.vendor_id  = vendor_id
        self.product_id = product_id

        self.pause_event = Event()
        self.pause_event.set()
        self.stop_event  = Event()

        self.oscilloscope=None
    
    def __getattr__(self, name: str) -> Any:
        return getattr(self.oscilloscope,name)
    
    def get_state(self):
        self.task_queue.put(lambda:self.oscilloscope.state)

    def run(self):
        self.oscilloscope=Oscilloscope(self.vendor_id, self.product_id)
        while not self.stop_event.is_set():
            if not self.task_queue.empty():
                task = self.task_queue.get()
                task()
            else:
                if not self.pause_event.is_set():
                    if self.oscilloscope.triggered():
                        y=self.oscilloscope.fetch_y_data()
                        self.data_queue.put(y)
                        del y
                else:
                    self.task_queue.put(lambda:time.sleep(1.5))
    def pause(self):
        self.pause_event.set()

    def stop(self):
        self.stop_event.set()

In [37]:
p=OscilloscopeProcessManager()
p.start()

None
None
None
None
None
None
None
None


In [42]:
type(p.get_state())

NoneType

In [43]:
p.stop()

In [12]:
type(lambda:time.sleep(10))

function