# **Intelligent Robot:**
Basic Client-Server Simulation in Python (using roslibpy)

This example doesn't require any ROS setup, but it demonstrates the use of function calls that you’d typically use in a real ROS environment

**Key Points:**

    Publisher: The first script simulates a ROS publisher that sends messages to a topic (like /chatter).

    Subscriber: The second script simulates a ROS subscriber that listens for messages from that topic.

I have integrtead AI vision to this ROS simulation as an example

In [1]:
!pip install roslibpy



In [2]:
# Simulated message class like std_msgs/String
class StringMessage:
    def __init__(self, data):
        self.data = data

# Simulated ROS Topic
class Topic:
    def __init__(self, name):
        self.name = name
        self.subscribers = []

    def publish(self, message):
        print(f"[Publisher] Publishing to {self.name}: {message.data}")
        for callback in self.subscribers:
            callback(message)

    def subscribe(self, callback):
        self.subscribers.append(callback)
        print(f"[Subscriber] Subscribed to {self.name}")

# Simulated use
def message_callback(message):
    print(f"[Subscriber] Received: {message.data}")

# Create a mock topic (like /chatter)
chatter_topic = Topic('/chatter')

# Subscriber subscribes
chatter_topic.subscribe(message_callback)

# Publisher publishes a message
msg = StringMessage("Hello from the simulated ROS node!")
chatter_topic.publish(msg)


[Subscriber] Subscribed to /chatter
[Publisher] Publishing to /chatter: Hello from the simulated ROS node!
[Subscriber] Received: Hello from the simulated ROS node!


In [3]:
# Simulated request/response messages
class AddTwoIntsRequest:
    def __init__(self, a, b):
        self.a = a
        self.b = b

class AddTwoIntsResponse:
    def __init__(self, sum):
        self.sum = sum

# Simulated ROS Service
class Service:
    def __init__(self, name, callback):
        self.name = name
        self.callback = callback
        print(f"[Service] '{self.name}' ready.")

    def handle_request(self, request):
        return self.callback(request)

# Simulated ROS Service Client
class ServiceClient:
    def __init__(self, service):
        self.service = service

    def call(self, request):
        print(f"[Client] Sending request to '{self.service.name}': {request.a} + {request.b}")
        response = self.service.handle_request(request)
        return response

# Service-side callback function
def handle_add_two_ints(request):
    result = request.a + request.b
    print(f"[Service] Processing request: {request.a} + {request.b} = {result}")
    return AddTwoIntsResponse(result)

# === Run simulated service and client ===

# 1. Create the service
add_service = Service('/add_two_ints', handle_add_two_ints)

# 2. Create a client for the service
client = ServiceClient(add_service)

# 3. Send a request from the client
req = AddTwoIntsRequest(5, 7)
res = client.call(req)

# 4. Output the result
print(f"[Client] Received response: sum = {res.sum}")


[Service] '/add_two_ints' ready.
[Client] Sending request to '/add_two_ints': 5 + 7
[Service] Processing request: 5 + 7 = 12
[Client] Received response: sum = 12


In [4]:
import time
import threading

# Simulated Goal message
class MoveBaseGoal:
    def __init__(self, x, y):
        self.target_x = x
        self.target_y = y

# Simulated Feedback message
class MoveBaseFeedback:
    def __init__(self, current_x, current_y):
        self.current_x = current_x
        self.current_y = current_y

# Simulated Result message
class MoveBaseResult:
    def __init__(self, success, final_x, final_y):
        self.success = success
        self.final_x = final_x
        self.final_y = final_y

# Simulated Action Server for robot navigation
class NavigationActionServer:
    def __init__(self):
        self.current_x = 0
        self.current_y = 0
        print("[ActionServer] Ready to receive navigation goals.")

    def execute_goal(self, goal, feedback_cb=None, done_cb=None):
        print(f"[ActionServer] Received goal: Move to ({goal.target_x}, {goal.target_y})")

        def run():
            while self.current_x != goal.target_x or self.current_y != goal.target_y:
                time.sleep(0.5)  # Simulate motion delay

                # Move one step toward target
                if self.current_x < goal.target_x:
                    self.current_x += 1
                elif self.current_x > goal.target_x:
                    self.current_x -= 1

                if self.current_y < goal.target_y:
                    self.current_y += 1
                elif self.current_y > goal.target_y:
                    self.current_y -= 1

                # Send feedback
                if feedback_cb:
                    feedback_cb(MoveBaseFeedback(self.current_x, self.current_y))

            # Goal reached
            result = MoveBaseResult(True, self.current_x, self.current_y)
            if done_cb:
                done_cb(result)

        threading.Thread(target=run).start()

# === Simulate the robot client sending a navigation goal ===

def feedback_callback(feedback):
    print(f"[Client] Feedback: Robot at ({feedback.current_x}, {feedback.current_y})")

def done_callback(result):
    print(f"[Client] Goal reached at ({result.final_x}, {result.final_y}) — Success: {result.success}")

# Create action server
nav_server = NavigationActionServer()

# Send goal from client to server
goal = MoveBaseGoal(3, 2)
nav_server.execute_goal(goal, feedback_cb=feedback_callback, done_cb=done_callback)


[ActionServer] Ready to receive navigation goals.
[ActionServer] Received goal: Move to (3, 2)


In [5]:
!pip install -q tensorflow


In [9]:


import tensorflow as tf
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input, decode_predictions
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.datasets import cifar10
import numpy as np

# --- Simulated ROS Topic ---
class Topic:
    def __init__(self, name):
        self.name = name
        self.subscribers = []

    def publish(self, message):
        print(f"[Publisher] Published to {self.name}: {message}")
        for callback in self.subscribers:
            callback(message)

    def subscribe(self, callback):
        self.subscribers.append(callback)
        print(f"[Subscriber] Subscribed to {self.name}")

# --- AI Inference Node (like a publisher) ---
class AINode:
    def __init__(self, topic):
        self.model = MobileNetV2(weights="imagenet")
        self.topic = topic

    def classify_and_publish(self, image):
        image = tf.image.resize(image, (224, 224))
        image = img_to_array(image).copy()  # Ensure writable
        image = preprocess_input(image)
        image = np.expand_dims(image, axis=0)
        preds = self.model.predict(image)
        label = decode_predictions(preds, top=1)[0][0][1]
        self.topic.publish(label)

# --- Subscriber Node ---
def robot_decision_callback(label):
    print(f"[Robot] Detected object: {label}")
    if label in ['person', 'dog', 'cat']:
        print("[Robot] Action: Follow!")
    else:
        print("[Robot] Action: Ignore.")

# --- Set up the communication ---
image_topic = Topic('/detected_object')
image_topic.subscribe(robot_decision_callback)

# --- Load a test image (CIFAR-10) ---
(_, _), (x_test, _) = cifar10.load_data()
sample_img = x_test[1] / 255.0  # Normalize

# --- Run the AI node ---
ai_node = AINode(image_topic)
ai_node.classify_and_publish(sample_img)


[Subscriber] Subscribed to /detected_object
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
Downloading data from https://storage.googleapis.com/download.tensorflow.org/data/imagenet_class_index.json
[1m35363/35363[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
[Publisher] Published to /detected_object: spotlight
[Robot] Detected object: spotlight
[Robot] Action: Ignore.


Idea and execution by Bhadale IT, code generated by ChatGPT