In [1]:
import requests
import json
from dotenv import load_dotenv
import os

In [2]:
load_dotenv()

True

In [3]:
WEBUI_URL = os.environ['OPEN_WEB_UI_URL']
API_KEY = os.environ['OPEN_WEB_API_KEY']

# Text Generation

In [4]:
def chat_with_agent(prompt, model="gemma3:12b-it-qat"):
    # The standard OpenAI-compatible endpoint in Open WebUI
    endpoint = f"{WEBUI_URL}/api/chat/completions"
    
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    
    payload = {
        "model": model,
        "messages": [
            {"role": "system", "content": "You are a helpful API agent."},
            {"role": "user", "content": prompt}
        ],
        "stream": False  # Set to True if you want to handle streaming responses
    }

    try:
        response = requests.post(endpoint, headers=headers, json=payload, timeout=60)
        response.raise_for_status()  # Raises error for 4xx/5xx codes
        
        # Parse the JSON response
        data = response.json()
        return data['choices'][0]['message']['content']
        
    except requests.exceptions.RequestException as e:
        return f"Error: {e}"

# 2. EXECUTION
if __name__ == "__main__":
    print("Sending request...")
    result = chat_with_agent("What is the best build to play in oblivion remastered")
    print("\n--- Agent Response ---\n")
    print(result)

Sending request...


KeyboardInterrupt: 

# Image Generation

In [21]:
from PIL import Image
import io

In [19]:
def generate_and_download_image(prompt, save_path="output_image.png"):
    # Endpoint for generation
    gen_url = f"{WEBUI_URL}/api/v1/images/generations"
    
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    
    payload = {
        "prompt": prompt,
        "model": "comfyui", 
        "n": 1,
        "size": "1024x1024"
    }

    print(f"1. Requesting generation for: '{prompt}'...")
    
    try:
        # Step 1: Trigger Generation
        response = requests.post(gen_url, headers=headers, json=payload, timeout=120)
        response.raise_for_status()
        
        # The API returns a list of dictionaries, e.g., [{'url': '/api/v1/files/...'}]
        data = response.json()
        
        # Extract the relative URL from the first item in the list
        # Check if 'data' wraps the list or if it is the list itself
        image_list = data.get('data', []) if isinstance(data, dict) else data
            
        if not image_list:
            print("Error: No image data returned.")
            return

        relative_path = image_list[0]['url']
        
        # Step 2: Construct the Full Download URL
        # We strip the leading slash from relative_path to avoid double slashes if needed, 
        # but usually requests handles it. Safer to join manually.
        download_url = f"{WEBUI_URL}{relative_path}"
        
        print(f"2. Image generated! Downloading from: {download_url}")

        # Step 3: Download the Image Content
        # IMPORTANT: We must pass the headers again to authenticate the file download
        file_response = requests.get(download_url, headers=headers)
        file_response.raise_for_status()
        print("Opening Image ...")
        bytes_io = io.BytesIO(file_response.content)
        img = Image.open(bytes_io)
        img.show()

        print(f"Saving image to : {os.path.abspath(save_path)}")
        # Step 4: Save to Disk
        with open(save_path, "wb") as f:
            f.write(file_response.content)
            
        print(f"Success! Image saved to: {os.path.abspath(save_path)}")

    except requests.exceptions.RequestException as e:
        print(f"Network Error: {e}")
    except Exception as e:
        print(f"Script Error: {e}")

In [22]:
# Run it
generate_and_download_image("A cyberpunk city with neon lights, realistic, 8k. The city should be modelled on Tempe Arizona in the ASU campus",
                            save_path="output2.png"
                           )

1. Requesting generation for: 'A cyberpunk city with neon lights, realistic, 8k. The city should be modelled on Tempe Arizona in the ASU campus'...
2. Image generated! Downloading from: http://ai-lab.tail8befb3.ts.net:8080/api/v1/files/d7480bc6-cb29-4845-ae8e-670833a8febe/content
Opening Image ...
Saving image to : /home/vinayak/git/paperbanana_local/output2.png
Success! Image saved to: /home/vinayak/git/paperbanana_local/output2.png


# Image VLM

In [27]:
import base64
from PIL import Image
import io

In [28]:
# IMPORTANT: You must use a VLM model name that you have pulled in Ollama/Open WebUI
MODEL_NAME = "gemma3:12b-it-qat" 

In [29]:
def encode_image(image_path):
    """
    Resizes image to max 1024px dimension and returns base64 JPEG string.
    """
    try:
        with Image.open(image_path) as img:
            # Convert to RGB to ensure compatibility (handles PNG transparency)
            if img.mode != 'RGB':
                img = img.convert('RGB')
            
            # Smart Resize: Ensures no dimension exceeds 1024 while keeping aspect ratio
            img.thumbnail((1024, 1024))
            
            # Save the image to a memory buffer (simulating a file)
            buffered = io.BytesIO()
            img.save(buffered, format="JPEG", quality=85) # Quality 85 saves huge bandwidth
            
            # Get the byte data from the buffer and encode
            return base64.b64encode(buffered.getvalue()).decode('utf-8')
    except Exception as e:
        print(f"Error processing image: {e}")
        return None

def analyze_image(image_path, prompt):
    endpoint = f"{WEBUI_URL}/api/chat/completions"
    
    # 1. Prepare the image data
    if not os.path.exists(image_path):
        return f"Error: File not found at {image_path}"
        
    base64_image = encode_image(image_path)
    
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    
    # 2. Construct the Payload (OpenAI Vision Format)
    payload = {
        "model": MODEL_NAME,
        "messages": [
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": prompt
                    },
                    {
                        "type": "image_url",
                        "image_url": {
                            # The prefix is required for the API to recognize it as data
                            "url": f"data:image/jpeg;base64,{base64_image}"
                        }
                    }
                ]
            }
        ],
        "stream": False
    }

    print(f"Sending image ({os.path.getsize(image_path)/1024:.1f} KB) to {MODEL_NAME}...")

    try:
        response = requests.post(endpoint, headers=headers, json=payload, timeout=60)
        response.raise_for_status()
        
        result = response.json()
        return result['choices'][0]['message']['content']

    except requests.exceptions.RequestException as e:
        return f"Network Error: {e}"


In [11]:
# --- RUN IT ---
# Make sure you have a real image file named 'test_image.jpg' in the folder
response_text = analyze_image("output_image.png", "Describe what you see in this image in detail.")
print("\n--- VLM Response ---\n")
print(response_text)

Error processing image: name 'Image' is not defined


NameError: name 'MODEL_NAME' is not defined

# Image Editing

This fails when trying to query open-web-ui, going through comfy directly works better

## My try

In [None]:
import websocket #NOTE: websocket-client (https://github.com/websocket-client/websocket-client)
import uuid
import json
import urllib.request
import urllib.parse
import ssl

In [135]:
WORKFLOW_FILE = "workflows/flux2_klein_image_edit_base64.json"
SERVER_ADDRESS = "ai-lab.tail8befb3.ts.net:8188"
client_id = str(uuid.uuid4())

Edit workflow parameters (example: prompt and seed)

* ["107", "inputs", "text"], "Remake this into a fairly style image")
* ["116", "inputs", "image_base64"], base64_image)

In [None]:
def queue_prompt(prompt):
    p = {"prompt": prompt, "client_id": client_id}
    data = json.dumps(p).encode('utf-8')
    req =  urllib.request.Request("http://{}/prompt".format(server_address), data=data)
    return json.loads(urllib.request.urlopen(req).read())

def get_image(filename, subfolder, folder_type):
    data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
    url_values = urllib.parse.urlencode(data)
    with urllib.request.urlopen("http://{}/view?{}".format(server_address, url_values)) as response:
        return response.read()

def get_history(prompt_id):
    with urllib.request.urlopen("http://{}/history/{}".format(server_address, prompt_id)) as response:
        return json.loads(response.read())

In [137]:
with open(WORKFLOW_FILE, 'r', encoding='utf-8') as f:
    workflow = json.load(f)

In [138]:
base64_image = encode_image("output_image.png")

workflow["116"]["inputs"]["image_base64"] = base64_image
print(f"Updated Node 116 with base64 image")

Updated Node 116 with base64 image


In [139]:
prompt = "Remake this into a fairly style image"

In [140]:
workflow["107"]["inputs"]["text"] = prompt
print(f"Updated Text Prompt (Node 107) with {prompt}")

Updated Text Prompt (Node 107) with Remake this into a fairly style image


In [146]:
ws = websocket.WebSocket()
ws_url = "wss://{}/ws?clientId={}".format(SERVER_ADDRESS, CLIENT_ID)
ws.connect(
            ws_url, 
            sslopt={"cert_reqs": ssl.CERT_NONE},
            header={"Origin": f"https://{SERVER_ADDRESS}"} 
        )

# 5. Queue the prompt
print("Queuing workflow...")
prompt_id = queue_prompt(workflow)['prompt_id']
print(f"Workflow started with ID: {prompt_id}")
# 6. Wait for completion
while True:
    out = ws.recv()
    if isinstance(out, str):
        message = json.loads(out)
        
        # Print current step
        if message['type'] == 'executing':
            data = message['data']
            if data['node'] is None and data['prompt_id'] == prompt_id:
                print("Execution complete!")
                break # Execution is done
            elif data['prompt_id'] == prompt_id:
                print(f"Executing Node: {data['node']}")

# 7. Retrieve and save results
history = get_history(prompt_id)[prompt_id]
outputs = history.get('outputs', {})

Queuing workflow...
Workflow started with ID: 2f2c36f4-8547-4d5e-ad2f-e6adb28ac921


KeyboardInterrupt: 

### Gemini Stuff

In [114]:
# --- Configuration ---
SERVER_ADDRESS = "https://ai-lab.tail8befb3.ts.net:8188"
CLIENT_ID = str(uuid.uuid4())
WORKFLOW_FILE = "image_edit_workflow.json"
INPUT_IMAGE_PATH = "output_image.png"  # The local image you want to edit
OUTPUT_FOLDER = "output_images"

In [83]:
# --- Helper Functions ---
def queue_prompt(prompt):
    p = {"prompt": prompt, "client_id": CLIENT_ID}
    data = json.dumps(p).encode('utf-8')
    req = urllib.request.Request("http://{}/prompt".format(SERVER_ADDRESS), data=data)
    return json.loads(urllib.request.urlopen(req).read())

def get_image(filename, subfolder, folder_type):
    data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
    url_values = urllib.parse.urlencode(data)
    with urllib.request.urlopen("http://{}/view?{}".format(SERVER_ADDRESS, url_values)) as response:
        return response.read()

def get_history(prompt_id):
    with urllib.request.urlopen("http://{}/history/{}".format(SERVER_ADDRESS, prompt_id)) as response:
        return json.loads(response.read())

In [75]:
def run_workflow(prompt):
    # 1. Load the workflow JSON
    try:
        with open(WORKFLOW_FILE, 'r', encoding='utf-8') as f:
            workflow = json.load(f)
    except FileNotFoundError:
        print(f"Error: Could not find {WORKFLOW_FILE}. Please save your JSON to this file.")
        return

    # 2. Convert image to base64
    base64_image = encode_image("output_image.png")

    # 3.1 Update the Workflow with the uploaded image
    # Node "76" is the LoadImage node in your specific JSON
    if "76" in workflow:
        workflow["76"]["inputs"]["image"] = server_filename
        print(f"Updated Node 76 to use image: {server_filename}")
    else:
        print("Warning: Node 76 (LoadImage) not found in workflow. Check node IDs.")

    # Update the prompt text programmatically (Node 107)
    workflow["107"]["inputs"]["text"] = prompt
    print(f"Updated Text Prompt (Node 107) with {prompt}")
    
    # 4. Connect to WebSocket to track progress
    ws = websocket.WebSocket()
    ws.connect("ws://{}/ws?clientId={}".format(SERVER_ADDRESS, CLIENT_ID))
    
    # 5. Queue the prompt
    print("Queuing workflow...")
    prompt_id = queue_prompt(workflow)['prompt_id']
    print(f"Workflow started with ID: {prompt_id}")

    # 6. Wait for completion
    while True:
        out = ws.recv()
        if isinstance(out, str):
            message = json.loads(out)
            
            # Print current step
            if message['type'] == 'executing':
                data = message['data']
                if data['node'] is None and data['prompt_id'] == prompt_id:
                    print("Execution complete!")
                    break # Execution is done
                elif data['prompt_id'] == prompt_id:
                    print(f"Executing Node: {data['node']}")

    # 7. Retrieve and save results
    history = get_history(prompt_id)[prompt_id]
    outputs = history.get('outputs', {})

    if not os.path.exists(OUTPUT_FOLDER):
        os.makedirs(OUTPUT_FOLDER)

    for node_id, node_output in outputs.items():
        if 'images' in node_output:
            for image in node_output['images']:
                image_data = get_image(image['filename'], image['subfolder'], image['type'])
                file_name = f"{OUTPUT_FOLDER}/{image['filename']}"
                with open(file_name, 'wb') as f:
                    f.write(image_data)
                print(f"Saved output to: {file_name}")

In [76]:
run_workflow()

Uploading output_image.png...
Error uploading image: Expecting value: line 1 column 1 (char 0)


In [79]:
def upload_image(input_path, name, server_address, image_type="input", overwrite=False):
    with open(input_path, 'rb') as file:
        files = {"image": (name, file)} # explicit filename in tuple is safer
        data = {"overwrite": str(overwrite).lower(), "type": image_type}
        
        print(f"Sending request to: http://{server_address}/upload/image")
        response = requests.post("http://{}/upload/image".format(server_address), files=files, data=data)
        
        # DEBUGGING BLOCK
        if response.status_code != 200:
            print(f"Server Error {response.status_code}:")
            print(response.text) # Print the raw error message from the server
            return None # Prevent crash
            
    return response.json()

In [88]:
upload_resp = upload_image(INPUT_IMAGE_PATH, os.path.basename(INPUT_IMAGE_PATH), SERVER_ADDRESS)

ConnectionError: HTTPConnectionPool(host='https', port=80): Max retries exceeded with url: /ai-lab.tail8befb3.ts.net:8188/upload/image (Caused by NameResolutionError("HTTPConnection(host='https', port=80): Failed to resolve 'https' ([Errno -3] Temporary failure in name resolution)"))