In [1]:
# [1] Start Sources
# [2] Start Node Manager
# [3] Start Cluster Manager

# This API is described at https://git.psi.ch/sf_daq/ch.psi.daq.dispatcherrest

import requests
import json

In [2]:
base_url = 'http://localhost:8080'
base_url = 'http://dispatcher-api/sf'

In [3]:
# Configure a source
config = {"sources": [{"stream": "tcp://localhost:9999"}]}
headers = {'content-type': 'application/json'}
response = requests.post(base_url+'/sources', data=json.dumps(config), headers=headers)
print(response)
print(response.text)

<Response [200]>



In [4]:
response.ok

True

In [6]:
# Get currently configured sources
response = requests.get(base_url+'/sources')
print(response)
print(response.text)

<Response [200]>
[{"stream":"tcp://SINDI01-CVME-LLRF1:20000"},{"stream":"tcp://SINDI01-CVME-ILK:9999"},{"stream":"tcp://S30-CVME-DBPM339:9999"},{"stream":"tcp://S10CB01-CVME-ILK:9999"},{"stream":"tcp://SARUN-CVME-DBPM541:9999"},{"stream":"tcp://SIN-CVME-LAS0041:9999"},{"stream":"tcp://SIN-CVME-LAS0042:9999"},{"stream":"tcp://SINXB01-CVME-ILK:9999"},{"stream":"tcp://SINSB04-CVME-ILK:9999"},{"stream":"tcp://SINSB04-CVME-LLRF1:20000"},{"stream":"tcp://SARMA01-CVME-DICT482:9999"},{"stream":"tcp://SARBD01-CVME-DICT599:9999"},{"stream":"tcp://SINEG01-CVME-LLRF2:20000"},{"stream":"tcp://SINEG01-CVME-LLRF1:20000"},{"stream":"tcp://S10CB01-CVME-LLRF2:20000"},{"stream":"tcp://SINSB01-CVME-LLRF2:20000"},{"stream":"tcp://SINXB01-CVME-LLRF2:20000"},{"stream":"tcp://SINDI01-CVME-LLRF2:20000"},{"stream":"tcp://SINSB01-CVME-LLRF1:20000"},{"stream":"tcp://SINEG01-CVME-ILK:9999"},{"stream":"tcp://SINXB01-CVME-LLRF1:20000"},{"stream":"tcp://SINSB02-CVME-LLRF1:20000"},{"stream":"tcp://SINSB04-CVME-LLRF2:2

In [9]:
# Get currently incomming channels
response = requests.get(base_url+'/channels/live')

In [28]:
# Combine all channels going to different backends
channel_list = []
for backend in response.json():
    channel_list.extend(backend['channels'])
channel_list

[{'backend': 'sf-databuffer',
  'modulo': 1,
  'name': 'S10BC01-DBPM010:Q1',
  'offset': 0,
  'shape': [1],
  'source': 'tcp://S10-CVME-DBPM143:9999',
  'type': 'float64'},
 {'backend': 'sf-databuffer',
  'modulo': 1,
  'name': 'S10BC01-DBPM010:Q1-VALID',
  'offset': 0,
  'shape': [1],
  'source': 'tcp://S10-CVME-DBPM143:9999',
  'type': 'uint16'},
 {'backend': 'sf-databuffer',
  'modulo': 1,
  'name': 'S10BC01-DBPM010:X1',
  'offset': 0,
  'shape': [1],
  'source': 'tcp://S10-CVME-DBPM143:9999',
  'type': 'float64'},
 {'backend': 'sf-databuffer',
  'modulo': 1,
  'name': 'S10BC01-DBPM010:X1-VALID',
  'offset': 0,
  'shape': [1],
  'source': 'tcp://S10-CVME-DBPM143:9999',
  'type': 'uint16'},
 {'backend': 'sf-databuffer',
  'modulo': 1,
  'name': 'S10BC01-DBPM010:Y1',
  'offset': 0,
  'shape': [1],
  'source': 'tcp://S10-CVME-DBPM143:9999',
  'type': 'float64'},
 {'backend': 'sf-databuffer',
  'modulo': 1,
  'name': 'S10BC01-DBPM010:Y1-VALID',
  'offset': 0,
  'shape': [1],
  'source':

In [6]:
# Delete source
config = {"sources": [{"stream": "tcp://localhost:9999"}]}
headers = {'content-type': 'application/json'}
response = requests.delete(base_url+'/sources', data=json.dumps(config), headers=headers)
print(response)
print(response.text)

<Response [200]>



In [24]:
# Request stream
config = {"channels":[{"name":"Int16Waveform"},{"name":"UInt16Waveform"}], "streamType":"push_pull", "compression": "none"}
headers = {'content-type': 'application/json'}
response = requests.post(base_url+'/stream', data=json.dumps(config), headers=headers)
print(response)
print(response.text)

<Response [200]>
{"stream":"tcp://pineapple.psi.ch:51923","configuration":{"compression":"none","streamType":"push_pull","channels":[{"name":"Int16Waveform","backend":"sf-databuffer"},{"name":"UInt16Waveform","backend":"sf-databuffer"}]}}


In [30]:
config = {"channels":[{"name":"Int16Waveform"},{"name":"UInt16Waveform"}], "streamType":"push_pull", "compression": "none"}
json.dumps(config)

'{"streamType": "push_pull", "channels": [{"name": "Int16Waveform"}, {"name": "UInt16Waveform"}], "compression": "none"}'

In [7]:
# Get streams currently requested
response = requests.get(base_url+'/streams')
print(response)
print(response.text)

<Response [200]>
[]


In [4]:
# Delete stream

config = "tcp://pineapple.psi.ch:53645"
headers = {'content-type': 'application/json'}
response = requests.delete(base_url+'/stream', data=json.dumps(config), headers=headers)

# config = "tcp://pineapple.psi.ch:50727"
# headers = {'content-type': 'text/plain'}
# response = requests.delete(base_url+'/stream', data=config, headers=headers)
print(response)
print(response.text)

<Response [200]>



In [103]:
# Receiver Stream
import bsread
import zmq
import sys

def receive(source):
    receiver = bsread.Bsread(mode=zmq.PULL)
    receiver.connect(address=source, conn_type="connect", )

    for i in range(10):
        message_data = receiver.receive()
        if message_data['header']['pulse_id'] % 10 == 0:
            sys.stderr.write("\x1b[2J\x1b[H")
        if "data_header" in message_data:
            print("Data Header: ", message_data['data_header'])
        print(message_data['data'],  message_data['timestamp'], message_data['header'])

ImportError: No module named 'bsread'

In [96]:
source_ = response.json()['stream']  # 'tcp://gfa-lc6-64:9999'
receive(source_)

NameError: name 'receive' is not defined

In [19]:
import bsread.dispatcher

ImportError: No module named 'bsread'