In [1]:

import grpc
import acquisition_pb2
import acquisition_pb2_grpc
import io
from PIL import Image
import json
import time
import os


In [2]:

# Function to convert PIL Image to bytes
def image_to_bytes(img: Image.Image, format='PNG'):
    """Converts a PIL Image to a bytes object."""
    buf = io.BytesIO()
    img.save(buf, format=format)
    return buf.getvalue()

# Function to create a dummy image for testing
def create_dummy_image(width=100, height=100, color=(255, 0, 0)):
    """Creates a simple red square PIL image."""
    img = Image.new('RGB', (width, height), color)
    return img


In [3]:

def run():
    # Replace with your server's address and port
    # Assuming the server is running on localhost:8061 (default from your server code)
    channel = grpc.insecure_channel('Mac.Home:8061', options=[
        ('grpc.max_send_message_length', 512 * 1024 * 1024),
        ('grpc.max_receive_message_length', 512 * 1024 * 1024)
    ])
    stub = acquisition_pb2_grpc.AcquisitionServiceStub(channel)

    print("--- Testing Acquire Method ---")
    try:
        # For 'acquire' method, the server expects something to be in its 'data_acq_queue'.
        # In a real scenario, another part of your system or the Gradio UI would put data there.
        # For this test, we'll simulate the server having data ready.
        # Since the server's 'acquire' method simply gets from 'data_acq_queue',
        # we can only test the 'No data available' scenario unless we modify the server
        # to have initial data or interact with its Gradio interface first.

        # Let's assume the server has processed an image from its Gradio input
        # and it's available in its internal queue.
        # To make this test truly work with your server, you'd either:
        # 1. Manually upload an image to the Gradio UI on the server.
        # 2. Modify the server to initially populate `data_acq_queue` with dummy data for testing.
        # 3. (Less common for gRPC) Have a separate thread in the client that puts data into the server's queue.

        print("Attempting to acquire data...")
        acquire_request = acquisition_pb2.AcquireRequest()
        acquire_response = stub.acquire(acquire_request)

        if acquire_response.label and acquire_response.image:
            print(f"Acquire successful!")
            print(f"Received label: {acquire_response.label}")

            # Save the acquired image
            try:
                acquired_image_path = "acquired_image_from_grpc.png"
                with open(acquired_image_path, "wb") as f:
                    f.write(acquire_response.image)
                print(f"Acquired image saved to {acquired_image_path}")
                # Optional: Open the image to verify
                # Image.open(io.BytesIO(acquire_response.image)).show()
            except Exception as e:
                print(f"Error saving acquired image: {e}")
        else:
            print("Acquire returned an empty response. This might mean no data was available on the server.")

    except grpc.RpcError as e:
        if e.code() == grpc.StatusCode.NOT_FOUND:
            print(f"Acquire failed: {e.details}")
        else:
            print(f"Acquire failed with unexpected error: {e}")

    print("\n--- Testing Display Method ---")
    try:
        # Create a dummy image to send for display
        #dummy_image = create_dummy_image(width=200, height=150, color=(0, 255, 0)) # Green image
        #dummy_image_bytes = image_to_bytes(dummy_image, format='JPEG') # Send as JPEG for variety
        dummy_image_bytes=acquire_response.image

        # Create a dummy label (JSON string)
        dummy_label_data = {
            "session_id": "test_session_123",
            "timestamp": time.time(),
            "description": "Dummy image for display test"
        }
        dummy_label = json.dumps(dummy_label_data)
        dummy_label = acquire_response.label
        print("Sending image and label for display...")
        display_request = acquisition_pb2.DisplayRequest(
            label=dummy_label,
            image=dummy_image_bytes
        )
        display_response = stub.display(display_request)
        print("Display request sent. Server should process and potentially display this.")
        print("Display method returned an empty response as expected.")

        # In a real application, you might have another mechanism to verify
        # if the display action on the server was successful (e.g., logging,
        # a callback, or checking a visual output).
        
    except grpc.RpcError as e:
        print(f"Display failed: {e}")

    channel.close()


In [None]:
for i in range(1,10):
    run()
    print(i)

New service display and acquisition in separate ports: display 8161 and acquisition on 8061

In [9]:
import grpc
import acquisition_pb2
import acquisition_pb2_grpc
from PIL import Image
import io
import json
import time

# --- Helper Functions ---
def bytes_to_image(image_bytes: bytes):
    return Image.open(io.BytesIO(image_bytes))

def image_to_bytes(img: Image.Image, format='PNG'):
    buf = io.BytesIO()
    img.save(buf, format=format)
    return buf.getvalue()


In [None]:

# --- Combined Test: acquire from AcquisitionService and send to DisplayService ---

print("\n[Client] Connecting to AcquisitionService at port 8061...")
channel=grpc.insecure_channel('localhost:8061')
acq_stub = acquisition_pb2_grpc.AcquisitionServiceStub(channel)
acquire_response = acq_stub.acquire(acquisition_pb2.AcquireRequest())

In [12]:
disp_channel=grpc.insecure_channel('localhost:8161')
disp_stub = acquisition_pb2_grpc.DisplayServiceStub(disp_channel)

display_request = acquisition_pb2.DisplayRequest(
    label=acquire_response.label,
    image=acquire_response.image)
display_response = disp_stub.display(display_request)

NameError: name 'acquire_response' is not defined

In [None]:


with grpc.insecure_channel('localhost:8061') as acq_channel:
        acq_stub = acquisition_pb2_grpc.AcquisitionServiceStub(acq_channel)

        try:
            acquire_response = acq_stub.acquire(acquisition_pb2.AcquireRequest())
            print(f"[Client] Acquired image with label: {acquire_response.label[:80]}...")

            # Decode the image for visualization (optional)
            image = bytes_to_image(acquire_response.image)
            image.show(title="Acquired Image")

            # Forward to DisplayService
            print("[Client] Forwarding image to DisplayService at port 8161...")
            with grpc.insecure_channel('localhost:8161') as disp_channel:
                disp_stub = acquisition_pb2_grpc.DisplayServiceStub(disp_channel)

                display_request = acquisition_pb2.DisplayRequest(
                    label=acquire_response.label,
                    image=acquire_response.image
                )

                display_response = disp_stub.display(display_request)
                print("[Client] DisplayService accepted the image.")

        except grpc.RpcError as e:
            print(f"[Client] gRPC error: {e.code()} - {e.details()}")



[Client] gRPC error: StatusCode.UNAVAILABLE - failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:8061: Failed to connect to remote host: connect: Connection refused (111)


In [7]:
channel

<grpc._channel.Channel at 0xffffaf528640>

In [None]:

# --- Main ---
if __name__ == "__main__":
    test_acquire_then_display()
