Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -184,5 +184,4 @@ output_*.json
*.jsonl
.scienv
synapse_data*

.synapse_deploy_cache.json
3 changes: 2 additions & 1 deletion synapse/cli/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys

from importlib import metadata
from synapse.cli import discover, rpc, streaming, offline_plot, files, deploy
from synapse.cli import discover, rpc, streaming, offline_plot, files, deploy, taps
from rich.logging import RichHandler
from rich.console import Console
from synapse.utils.discover import find_device_by_name
Expand Down Expand Up @@ -64,6 +64,7 @@ def main():
streaming.add_commands(subparsers)
offline_plot.add_commands(subparsers)
files.add_commands(subparsers)
taps.add_commands(subparsers)
deploy.add_commands(subparsers)
args = parser.parse_args()

Expand Down
56 changes: 56 additions & 0 deletions synapse/cli/taps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from synapse.client.taps import Tap

from rich.console import Console
from rich.pretty import pprint
from rich.table import Table


def add_commands(subparsers):
tap_parser = subparsers.add_parser("taps", help="Interact with taps on the network")

tap_subparsers = tap_parser.add_subparsers(title="Tap Commands")

# Now add the list parser to the tap_subparsers
list_parser = tap_subparsers.add_parser("list", help="list the taps for a device")
list_parser.set_defaults(func=list_taps)

stream_parser = tap_subparsers.add_parser("stream", help="Stream a tap")
stream_parser.add_argument("tap_name", type=str)
stream_parser.set_defaults(func=stream_taps)


def list_taps(args):
tap = Tap(args.uri, args.verbose)

console = Console()

taps = tap.list_taps()
table = Table(title="Available Taps", show_lines=True)
table.add_column("Name", style="cyan")
table.add_column("Message Type", style="green")
table.add_column("Endpoint", style="green")

for tap in taps:
table.add_row(tap.name, tap.message_type, tap.endpoint)

console.print(table)


def stream_taps(args):
tap = Tap(args.uri, args.verbose)
taps = tap.list_taps()

if args.tap_name not in [tap.name for tap in taps]:
print(f"Tap {args.tap_name} not found")
return

tap.connect(args.tap_name)

console = Console()
console.print(f"[bold cyan]Streaming tap:[/] [green]{args.tap_name}[/]")

for message in tap.stream():
message_size = len(str(message))
console.print(f"[bold]Message Size:[/] [cyan]{message_size} bytes[/]")
pprint(message, expand_all=False)
console.print("---")
172 changes: 172 additions & 0 deletions synapse/client/taps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import logging
import zmq
from typing import Optional, Generator

from synapse.api.query_pb2 import QueryRequest
from synapse.api.status_pb2 import StatusCode


class Tap(object):
def __init__(self, uri, verbose=False):
"""Initialize a Tap client to connect to the Synapse device.

Args:
uri (str): The URI of the Synapse device.
verbose (bool, optional): Whether to enable verbose logging. Defaults to False.
"""
self.uri = uri
self.verbose = verbose
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG if verbose else logging.INFO)

# ZMQ context (will be initialized upon connection)
self.zmq_context = None
self.zmq_socket = None
self.connected_tap = None

def list_taps(self):
"""List all available taps on the device.

Returns:
list: List of TapConnection objects.
"""
from synapse.client.device import Device

device = Device(self.uri, self.verbose)

request = QueryRequest()
request.query_type = QueryRequest.kListTaps
request.list_taps_query.SetInParent()

response = device.query(request)

if not response or response.status.code != StatusCode.kOk:
self.logger.error(
f"Failed to list taps: {response.status.message if response else 'No response'}"
)
return []

return response.list_taps_response.taps

def connect(self, name: str) -> bool:
"""Connect to a specific tap by name.

Args:
name (str): The name of the tap to connect to.

Returns:
bool: True if connected successfully, False otherwise.
"""
taps = self.list_taps()

# Find the tap with the specified name
selected_tap = None
for tap in taps:
if tap.name == name:
selected_tap = tap
break

if not selected_tap:
self.logger.error(f"Tap '{name}' not found")
return False

# Store the connected tap
self.connected_tap = selected_tap

# Initialize ZMQ context and socket
self.zmq_context = zmq.Context()
self.zmq_socket = self.zmq_context.socket(zmq.SUB)

# Replace the endpoint with our device URI if needed
endpoint = selected_tap.endpoint
if "://" in endpoint:
# Extract the protocol and port
protocol, address = endpoint.split("://")
_, port = address.split(":")

# Use the device URI with the same port
endpoint = f"{protocol}://{self.uri.split(':')[0]}:{port}"

try:
print(f"Connecting to tap '{name}' at {endpoint}")
self.zmq_socket.connect(endpoint)
self.zmq_socket.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all messages
return True
except zmq.ZMQError as e:
self.logger.error(f"Failed to connect to tap: {e}")
self._cleanup()
return False

def read(self, timeout_ms: int = 1000) -> Optional[bytes]:
"""Read raw data from the tap with timeout.

Args:
timeout_ms (int, optional): Timeout in milliseconds. Defaults to 1000.

Returns:
Optional[bytes]: Raw message data or None if timeout/error.
"""
if not self.zmq_socket:
self.logger.error("Not connected to any tap")
return None

try:
# Set socket timeout
self.zmq_socket.setsockopt(zmq.RCVTIMEO, timeout_ms)

# Receive data (will timeout if no data available)
return self.zmq_socket.recv()
except zmq.Again:
# Timeout occurred
return None
except zmq.ZMQError as e:
self.logger.error(f"Error receiving message: {e}")
return None

def stream(self, timeout_ms: int = 1000) -> Generator[bytes, None, None]:
"""Stream raw data from the tap.

Args:
timeout_ms (int, optional): Timeout between messages in milliseconds. Defaults to 1000.

Yields:
Generator[bytes, None, None]: Stream of raw message data.
"""
if not self.zmq_socket:
self.logger.error("Not connected to any tap")
return

# Set socket timeout
self.zmq_socket.setsockopt(zmq.RCVTIMEO, timeout_ms)

try:
while True:
try:
data = self.zmq_socket.recv()
yield data
except zmq.Again:
# Timeout occurred, continue to next iteration
continue
except KeyboardInterrupt:
self.logger.info("Stream interrupted")
except zmq.ZMQError as e:
self.logger.error(f"Error streaming messages: {e}")
finally:
# Don't close the socket here, let the user call disconnect()
pass

def disconnect(self):
"""Disconnect from the tap."""
self._cleanup()

def _cleanup(self):
"""Clean up ZMQ resources."""
if self.zmq_socket:
self.zmq_socket.close()
self.zmq_socket = None

if self.zmq_context:
self.zmq_context.term()
self.zmq_context = None

self.connected_tap = None