Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is it possible to transfer raw bytes within node? #75

Open
funshine opened this issue Jan 16, 2018 · 3 comments
Open

Is it possible to transfer raw bytes within node? #75

funshine opened this issue Jan 16, 2018 · 3 comments

Comments

@funshine
Copy link

First of all, Thank you for the great project.
I like the way the data is processed.

I have a device(high speed adc) produce mass data every second, almost 50MBytes/s.

when I use transfermode='plaindata', the data flow can only up to 5MBytes/s.

only one node get data from udp socket and another one write data to file(RawRecorder).
I look into the code, find out that in file 'plaindatastream.py' send function(line 33):

        # serialize
        dtype = data.dtype
        shape = data.shape
        buf, offset, strides = decompose_array(data)    # this line consume a lot time
        
        # compress
        comp = self.params['compression']
        buf = compress(buf, comp, data.itemsize)

is time consuming.

Is there a way I can transfer raw bytes other than numpy.ndarray,
then I can write raw data to disk very quickly.

@samuelgarcia
Copy link
Contributor

Thanks for teh feed back.

Yes pyacq is supposed to send raw data with no copy/transformation.
50Mbytes is a lot for playing with sockets but it should able to deal with it.

We design a data transfer that should be flexible. Able to send data without transformtion or with re-stride. decompose_array is calling_normalized_array that play with strides. Normally this should imply data copy so it should be fast.

Could you send us params you put on the output of the device node ?
What do you have for:

  • transfermode='plaindata' or 'sharedmem'
    *axisorder
  • shape
  • compression

Are you on the same machine or do you send data througth network ?

I will try to reproduce such a big data flow to test the behavior but I won't be able to do it befor february.

@funshine
Copy link
Author

Thank you for the quick reply

currently my output dev is like this

    dev = UdpFpgaAdcDac()
    dev.configure(ip='127.0.0.1', port=8080, cmd=cmd, modeADC=False, sample_interval_adc = 0.000002)
    # dev.outputs['signals'].configure(protocol='tcp', interface='127.0.0.1', transfermode='sharedmem', buffer_size=acqSize*1024*1024)
    dev.outputs['signals'].configure(protocol='tcp', interface='127.0.0.1', transfermode='plaindata')
    dev.initialize()

I also tried sharedmem mode.

to help to reproduce, some code here
the fake device get cmd from host, configure something,
like produce random number to mimic ADC, or Increaced Number for test.
when ACQTRIG cmd is received, the fake device send back 1 or more pack back

this is the fake device:

import socket
import time
import numpy as np

cmdType = {
    "ACQTRIG": b'\x5A\xDD\x04\x06',
    "INC": b'\x5A\xEE\x40\x01',
    "ADC": b'\x5A\xFF\x40\x01'
}

UDP_IP = "127.0.0.1"
UDP_PORT = 8080
MESSAGE = "Hello, World!"

print("UDP target IP:", UDP_IP)
print("UDP target port:", UDP_PORT)
print("message:", MESSAGE)

packageCount = 0
packageSize = 712
packageTotal = 0

dataNum = 0
dataCount = 0
started = False
adc = True
start = 0

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  # UDP

sock.bind((UDP_IP, UDP_PORT + 1))
while True:
    msg, addr = sock.recvfrom(2048)
    # print("received:", msg, "from", addr)
    if not msg:
        print("client has exist")
        break
    if msg[0:2] == cmdType["ADC"][0:2]:
        # ADC
        print("ADC Started")
        adc = True
        packageCount = 0
        packageTotal = 0
        dataCount = 0
        started = True
        dataNum = int.from_bytes(msg[2:3], byteorder='big') * 2 * 1024 * 1024
    elif msg[0:2] == cmdType["INC"][0:2]:
        # INC
        print("INC Started")
        adc = False
        start = 0
        packageCount = 0
        packageTotal = 0
        dataCount = 0
        started = True
        dataNum = int.from_bytes(msg[2:3], byteorder='big') * 2 * 1024 * 1024
    elif msg[0:2] == cmdType["ACQTRIG"][0:2]:
        if not started:
            time.sleep(0.001)
            continue
        packageCount = 0
        packageSize = int.from_bytes(msg[2:3], byteorder='big') * 128
        packageFrame = int.from_bytes(msg[3:4], byteorder='big')
        if packageFrame == 0:
            packageFrame = dataNum / packageSize
        while packageCount < packageFrame:
            if adc:
                pack = np.random.randint(-10, 10, size=packageSize, dtype=np.int16)
                pack = pack.byteswap()
            else:
                pack = np.arange(start, start+packageSize, dtype=np.dtype('>i2'))
                start += packageSize
                if start >= 65536:
                    start -= 65536
            sock.sendto(pack.tobytes(), (UDP_IP, UDP_PORT))
            packageCount += 1
            packageTotal += 1
            dataCount += packageSize
            if dataCount > dataNum:
                if started:
                    started = False
                    print("Stopped, send {0} packages".format(packageTotal))
                    packageTotal = 0
                break
            time.sleep(0.000001)
    else:
        pass

sock.close()

and the device node:

import numpy as np

from pyacq.core import Node, register_node_type

import ipaddress
from PyQt5.QtCore import QTimer, QThread, QMutex, QMutexLocker, pyqtSignal
from PyQt5.QtNetwork import QUdpSocket, QHostAddress, QAbstractSocket

cmdType = {
    "ACQTRIG": b'\x5A\xDD\x04\x06',
    "INC": b'\x5A\xEE\x40\x01',
    "ADC": b'\x5A\xFF\x40\x01'
}

class UdpThread(QThread):
    def __init__(self, out_stream, ip='127.0.0.1', port=8080, cmd=cmdType, timeout=2000, adc=True, parent=None):
        QThread.__init__(self, parent=parent)
        self.lock = QMutex()
        self.running = False
        self.count = 0
        self.out_stream = out_stream
        self.ip = ip
        self.port = port
        self.loopback = self.ip == '127.0.0.1'
        self.cmd = cmd
        self.timeout = timeout
        self.timeoutCount = 0
        self.modeADC = adc
        
        self.udpSocket = QUdpSocket(self)
        self.udpTimer = QTimer()
        
        self.udpTimer.timeout.connect(self.udpTimerEvent)
        self.udpTimer.setSingleShot(True)
        self.udpSocket.readyRead.connect(self.dataReceive)

    def trig(self):
        self.udpSocket.bind(self.port)
        # send ADC/INC cmd
        if self.modeADC:
            self.udpSocket.writeDatagram(self.cmd["ADC"] * 2, QHostAddress(
                self.ip), self.port + (1 if self.loopback else 0))
        else:
            self.udpSocket.writeDatagram(self.cmd["INC"] * 2, QHostAddress(
                self.ip), self.port + (1 if self.loopback else 0))

        self.msleep(10)
        # start ACQ cmd
        self.newTransfer()

    def run(self):
        with QMutexLocker(self.lock):
            self.running = True
        while True:
            with QMutexLocker(self.lock):
                if not self.running:
                    break
            if self.udpSocket is not None and self.udpSocket.state() == QAbstractSocket.ConnectedState:
                self.udpSocket.waitForReadyRead()
            else:
                self.msleep(100)

    def stop(self):
        if self.udpSocket is not None:
            self.udpSocket.close()
        self.udpTimer.stop()
        print("Total points: {0}".format(self.count))
        with QMutexLocker(self.lock):
            self.running = False

    def dataReceive(self):
        while self.udpSocket.hasPendingDatagrams():
            msg, host, port = self.udpSocket.readDatagram(2048)
            if msg:
                self.udpTimer.stop()
                self.timeoutCount = 0
                points = int(len(msg)/2)
                self.count += points
                sigs = np.frombuffer(msg, dtype=np.int16)
                sigs = sigs.reshape(points, 1)
                self.out_stream.send(sigs, index=self.count)
                self.udpTimer.start(self.timeout)
        # fire another run
        self.udpTimer.stop()
        self.newTransfer()

    def newTransfer(self):
        self.udpSocket.writeDatagram(
            self.cmd["ACQTRIG"] * 2, QHostAddress(self.ip), self.port + (1 if self.loopback else 0))
        self.udpTimer.start(self.timeout)

    def udpTimerEvent(self):
        self.timeoutCount += 1
        if self.timeoutCount > 5:
            self.timeoutCount = 0
            self.stop()
            return
        print("Timeout {0}".format(self.timeoutCount))
        self.newTransfer()


class UdpFpgaAdcDac(Node):
    """
    FPGA device with ADC and DAC, comunicate via a ethernet port using the UDP.
    """

    # _input_specs = {'signals': dict(streamtype='analogsignal',dtype='int16',
    #                                             shape=(-1, 16), compression ='', timeaxis=0,
    #                                             sample_rate =100000000.
    #                                             )}

    _output_specs = {'signals': dict(streamtype='analogsignal',dtype='int16',
                                                shape=(-1, 16), compression ='', timeaxis=0,
                                                sample_rate =50000000.
                                                )}
    def __init__(self, **kargs):
        Node.__init__(self, **kargs)
    

    def _configure(self, ip='127.0.0.1', port=8080, cmd=cmdType, timeout=2000, modeADC=True, nb_channel_adc=1, sample_interval_adc=2e-8, chunksize=256):
        self.ip = ip
        self.port = port
        self.cmd = cmd.copy()
        self.timeout = timeout
        self.modeADC = modeADC
        self.nb_channel_adc = nb_channel_adc
        self.sample_interval_adc = sample_interval_adc
        self.chunksize = chunksize
        
        self.output.spec['shape'] = (-1, nb_channel_adc)
        self.output.spec['sample_rate'] = 1./sample_interval_adc
        
        self.output.spec['dtype'] = 'int16'
    
    def _initialize(self):
        pass
        
    def _start(self):
        self._thread = UdpThread(self.output, self.ip, self.port, self.cmd, self.timeout, self.modeADC)
        self._thread.start()
        self._thread.trig()

    def _stop(self):
        self._thread.stop()
        self._thread.wait()
    
    def _close(self):
        pass

register_node_type(UdpFpgaAdcDac)

and the test:

from pyacq import create_manager
from pyacq.viewers import QOscilloscope
from pyacq.rec import RawRecorder
from devices import UdpFpgaAdcDac
import numpy as np

from PyQt5 import QtCore, QtGui

import os
import shutil
import datetime

cmdType = {
    "ACQTRIG": b'\x5A\xDD\x04\x06',
    "INC": b'\x5A\xEE\x40\x01',
    "ADC": b'\x5A\xFF\x40\x01'
}

def test_DataRecorder():
    # man = create_manager('local', auto_close_at_exit=True)
    # ng = man.create_nodegroup()
    # in main App
    app = QtGui.QApplication([])

    cmd = cmdType.copy()
    acqSize = 128
    packageInterval = 7
    packageSize = 4
    packageNum = 8

    cmd["ADC"] = cmd["ADC"][0:2] + (acqSize).to_bytes(
        1, byteorder='big') + (packageInterval).to_bytes(1, byteorder='big')
    cmd["INC"] = cmd["INC"][0:2] + (acqSize).to_bytes(
        1, byteorder='big') + (packageInterval).to_bytes(1, byteorder='big')
    cmd["ACQTRIG"] = cmd["ACQTRIG"][0:2] + (packageSize).to_bytes(
        1, byteorder='big') + (packageNum).to_bytes(1, byteorder='big')
    
    dev = UdpFpgaAdcDac()
    dev.configure(ip='127.0.0.1', port=8080, cmd=cmd, modeADC=False, sample_interval_adc = 0.000002)
    # dev.outputs['signals'].configure(protocol='tcp', interface='127.0.0.1', transfermode='sharedmem', buffer_size=acqSize*1024*1024)
    dev.outputs['signals'].configure(protocol='tcp', interface='127.0.0.1', transfermode='plaindata')
    dev.initialize()
    
    viewer = QOscilloscope()
    # viewer = ng.create_node("QOscilloscope", name="QViewer")
    viewer.configure(with_user_dialog = True)

    viewer.configure()
    viewer.input.connect(dev.output)
    viewer.initialize()
    viewer.show()
    viewer.params['decimation_method'] = 'min_max'

    dirname = './test_rec'
    if os.path.exists(dirname):
        shutil.rmtree(dirname)
    
    rec = RawRecorder()
    # rec = ng1.create_node('RawRecorder')
    rec.configure(streams=[dev.output], autoconnect=True, dirname=dirname)
    rec.initialize()
    
    # rec.add_annotations(yep='abc', yop=12.5, yip=1)

    def terminate():
        dev.stop()
        dev.close()
        viewer.stop()
        viewer.close()
        rec.stop()
        rec.close()
        app.quit()
    

    rec.start()
    viewer.start()
    dev.start()
    
    # start for a while
    # timer = QtCore.QTimer(singleShot=True, interval=60000)
    # timer.timeout.connect(terminate)
    # timer.start()
    
    app.exec_()

    # man.close()


if __name__ == '__main__':
    test_DataRecorder()

@funshine
Copy link
Author

by the way, the RawRecorder is from https://github.com/samuelgarcia/pyacq/tree/exp_intra
which is not merged to here right now, thank you @samuelgarcia

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants