In [1]:
# Baby Einstein 
# Version: Albert:0.0.1

# Install required packages
%pip install -r requirements.txt > /dev/null 2>&1

Note: you may need to restart the kernel to use updated packages.


In [2]:
# Import packages
import json
import requests
#import urllib3
import base64
from PIL import Image

ModuleNotFoundError: No module named 'PIL'

In [None]:
# Disable insecure request warnings
#urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

In [None]:
class SightClient:
    def __init__(self, model: str, url: str = "https://typingmind.poyner.lan/v1/chat/completions"):
        self.url = url
        self.model = model

    def resize_image(self, image_path: str, output_path: str, max_size=(800, 800)):
        with Image.open(image_path) as img:
            img.thumbnail(max_size)  # Resize image while maintaining aspect ratio
            img.save(output_path)  # Save the resized image

    def analyze_image(self, image_path: str) -> str:
        # Create a temporary resized image file
        resized_image_path = image_path.replace('.jpg', '_resized.jpg')  # Change extension if needed
        self.resize_image(image_path, resized_image_path)

        try:
            with open(resized_image_path, "rb") as f:
                image_data = f.read()
            encoded_image = base64.b64encode(image_data).decode("utf-8")
        except Exception as e:
            print(f"Error reading file {resized_image_path}: {e}")
            raise

        # Modified payload to include the messages field
        payload = {
            "model": self.model,
            "messages": [
                {
                    "role": "user",
                    "content": f"Analyze this image and provide a JSON response with input_data and threshold_score. Here's the data: {encoded_image}"
                }
            ]
        }

        print("DEBUG: Sending request to:", self.url)
        print("DEBUG: Request payload structure:", json.dumps(payload, indent=2))

        response = requests.post(self.url, json=payload, verify=False)

        print("DEBUG: Response status code:", response.status_code)
        print("DEBUG: Response headers:", dict(response.headers))
        print("DEBUG: Raw response text:", response.text)

        try:
            data = response.json()
            # Adjust response parsing according to the expected output
            if 'choices' in data and len(data['choices']) > 0 and 'message' in data['choices'][0]:
                return data['choices'][0]['message']['content']
            else:
                raise ValueError("Unexpected response format: " + str(data))
        except ValueError as e:
            raise ValueError(f"Response is not in JSON format. Status: {response.status_code}, Text: {response.text}") from e

# Create client instance and analyze image
client = SightClient(model="baby-einstein/vision/sight:latest")
result = client.analyze_image("/home/sean/baby-einstein/examples/sight/eagle.jpg")
print("Analysis Result:", result)

In [None]:
# Define the ChatClient class
# Since we are structuring these thoughts and loops in json format, we can use the ChatClient class to interact with the chat completion API.
# This class will ensure that the structure of the requests is correct and that the responses are parsed correctly.

class ChatClient:
    """A base client for interacting with the chat completion API."""
    # Here I am using a local device (typingmind) on my network (poyner.lan) to run the model using ollama. The repository is hosted on this device and models are loaded from Modelfiles there.
    def __init__(self, model: str, url: str = "https://typingmind.poyner.lan/v1/chat/completions"):
        self.url = url
        self.model = model

# The send_message method sends a message to the chat completion API and returns the response.
    def send_message(self, message: str) -> str:
        payload = {
            "model": self.model,
            "messages": [
                {"role": "user", "content": message}
            ]
        }
        response = requests.post(self.url, json=payload, verify=False)
        
# The response is parsed and the content of the first choice is returned.
        try:
            data = response.json()
            choices = data.get("choices", [])
            if not choices:
                raise ValueError("No choices available in response: " + str(data))
            content = choices[0].get("message", {}).get("content", "")
            return content
        except ValueError as e:
            raise ValueError("Response is not in JSON format: " + response.text) from e

In [None]:
# Define the ThalamusClient class
# The ThalamusClient class is a subclass of ChatClient that is specialized for sending analysis requests to the Thalamus model.
# The Thalamus model is the central processing unit of the Baby Einstein system, responsible for analyzing sensor data and routing it to the appropriate models for further processing.
class ThalamusClient(ChatClient):
    """Client for sending analysis requests to the Thalamus model."""
    def __init__(self):
        super().__init__(model="baby-einstein/thalamus:latest")

# The analyze method sends an analysis request to the Thalamus model and returns the response.
    def analyze(self, sensor: str, input_type: str, input_data: str, threshold_score: str) -> str:
        inner_message = json.dumps({
            "sensor": sensor,
            "input_type": input_type,
            "input_data": input_data,
            "threshold_score": threshold_score
        })
        return self.send_message(inner_message)

In [None]:
# Define the ACCClient class
# The ACCClient class is a subclass of ChatClient that is specialized for sending evaluation requests to the ACC model.
# The ACC model is responsible for evaluating the output of the Thalamus model and determining if it meets the criteria for further processing or needs to be returned to the thalamus.
class ACCClient(ChatClient):
    """Client for sending evaluation requests to the ACC model."""
    def __init__(self):
        super().__init__(model="baby-einstein/acc:latest")

# The evaluate method sends an evaluation request to the ACC model and returns the response.
    def evaluate(self, thalamus_output: str) -> str:
        return self.send_message(thalamus_output)


In [None]:
# Define the input parameters
sensor = "camera"
input_type = "image"
input_data = "object moving in front of camera"
threshold_score = ".88"

In [None]:
# Create a ThalamusClient instance and get its response
thalamus_client = ThalamusClient()
thalamus_response = thalamus_client.analyze(sensor, input_type, input_data, threshold_score)
print("Thalamus response:")
print(thalamus_response)

In [None]:
# Create an ACCClient instance and evaluate the Thalamus response
acc_client = ACCClient()
acc_response = acc_client.evaluate(thalamus_response)
print("ACC response:")
print(acc_response)