In [1]:

import sys
import os
from pathlib import Path
import instructor
from pydantic import BaseModel
sys.path.insert(0, '../libs')
# Import SimpleLLMAgent from libs directory
from llm_factory_openai import SimpleLLMAgent
from pydantic import BaseModel, Field
import json
import io
# Import our general LLM factory
from llm_factory_general import (
    GeneralLLMFactory,
    create_openai_factory,
    create_google_gemini_factory,
    create_anthropic_factory,
    create_openai_compatible_factory
)
from utils import download_hf_model
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv('../.env')
api_key = os.getenv("huggingface_token")
if not api_key:
    raise ValueError("huggingface_token not found in environment variables. Please check your .env file.")


#### Test call closed source model api

In [2]:

print("=== General LLM Factory Examples ===")
openai_factory = create_openai_factory(model_name='gpt-4.1',
                                        temperature=0.0,
                                        max_tokens=8000,
                                        )
result = openai_factory.test_connection()
print(f"OpenAI test result: {result}")
messages = [{"role": "user", "content": 'hi'}]
response = openai_factory.get_response_content(messages,response_model=str)
print(response)


=== General LLM Factory Examples ===
OpenAI test result: Connection successful
Hello! How can I help you today?


Test local SGLang API Server

In [3]:
## download models
model_name_list = ['Qwen/Qwen3-4B','Qwen/Qwen3-8B']
for model_name in model_name_list:
    # Create the target directory path
    target_dir = '/ephemeral/home/xiong/data/hf_cache/' + model_name
    # Check if model already exists before downloading
    if os.path.exists(target_dir) and os.listdir(target_dir):
        print(f"Model {model_name} already exists at {target_dir}, skipping download")
    else:
        print(f"Downloading model {model_name}...")
        download_hf_model(model_name, target_dir, hf_token=os.getenv('huggingface_token'))

Model Qwen/Qwen3-4B already exists at /ephemeral/home/xiong/data/hf_cache/Qwen/Qwen3-4B, skipping download
Model Qwen/Qwen3-8B already exists at /ephemeral/home/xiong/data/hf_cache/Qwen/Qwen3-8B, skipping download


### SGLang offling inference 
- https://github.com/sgl-project/sglang/blob/main/examples/runtime/engine/offline_batch_inference.py

In [4]:
# launch the offline engine
import asyncio
import io
import os

from PIL import Image
import requests
import sglang as sgl

from sglang.srt.conversation import chat_templates
from sglang.test.test_utils import is_in_ci
from sglang.utils import async_stream_and_merge, stream_and_merge
from transformers import AutoTokenizer

if is_in_ci():
    import patch
else:
    import nest_asyncio

    nest_asyncio.apply()

In [5]:
model_name = 'Qwen/Qwen3-8B'
model_path = os.path.join('/ephemeral/home/xiong/data/hf_cache','Qwen/Qwen3-8B')
#tokenizer = AutoTokenizer.from_pretrained(model_name)

In [6]:
# Arguments for sgl.Engine corresponding to the launch_server CLI options

In [7]:
if not os.path.exists(model_path):
    raise FileNotFoundError(f"Model path {model_path} does not exist. Please download the model first.")
engine_args = {
    "model_path": model_path,                # --model-path
    #"port": 8100,                            # --port
    "dtype": "bfloat16",                     # --dtype
    #"api_key": "abc",                        # --api-key
    "context_length": 8192,                  # --context-length
    #"served_model_name": "Qwen/Qwen3-8B",    # --served-model-name
    #"allow_auto_truncate": True,             # --allow-auto-truncate
    "constrained_json_whitespace_pattern": r"[\n\t ]*",  # --constrained-json-whitespace-pattern
    "mem_fraction_static": 0.9,              # --mem-fraction-static
    "dp_size": 4,                            # --dp_size
    "grammar_backend":"xgrammar",
    # "reasoning_parser": "qwen3"              # --reasoning-parser # thes works very strangely 
}

# Instantiate the engine with these arguments
llm = sgl.Engine(**engine_args)

Loading safetensors checkpoint shards:   0% Completed | 0/5 [00:00<?, ?it/s]
Loading safetensors checkpoint shards:   0% Completed | 0/5 [00:00<?, ?it/s]
Loading safetensors checkpoint shards:  20% Completed | 1/5 [00:00<00:02,  1.83it/s]
Loading safetensors checkpoint shards:  40% Completed | 2/5 [00:00<00:01,  2.33it/s]
Loading safetensors checkpoint shards:  20% Completed | 1/5 [00:00<00:03,  1.30it/s]
Loading safetensors checkpoint shards:  60% Completed | 3/5 [00:01<00:01,  1.95it/s]
Loading safetensors checkpoint shards:  40% Completed | 2/5 [00:01<00:01,  1.79it/s]
Loading safetensors checkpoint shards:  80% Completed | 4/5 [00:02<00:00,  1.79it/s]
Loading safetensors checkpoint shards:  60% Completed | 3/5 [00:01<00:01,  1.62it/s]
Loading safetensors checkpoint shards: 100% Completed | 5/5 [00:02<00:00,  1.85it/s]
Loading safetensors checkpoint shards: 100% Completed | 5/5 [00:02<00:00,  1.89it/s]

Loading safetensors checkpoint shards:   0% Completed | 0/5 [00:00<?, ?it/s]
Loa

#### Simple prompts

In [8]:
prompts = [
    "Hello, my name is",
    "The president of the United States is",
    "The capital of France is",
    "The future of AI is",
]
sampling_params = {"temperature": 0.01, "top_p": 0.95}

outputs = llm.generate(prompts, sampling_params)
for prompt, output in zip(prompts, outputs):
    print(f"Prompt: {prompt}\nGenerated text: {output['text']}")

Prompt: Hello, my name is
Generated text:  Alex. I am a 23-year-old student who is currently studying for a degree in computer science. I am interested in learning more about the field of artificial intelligence and its applications. I have a basic understanding of programming, but I would like to improve my skills in this area. I am also interested in exploring the ethical implications of AI and how it can be used responsibly. I am looking for a mentor who can guide me in my studies and help me understand the concepts better. I am open to learning from someone with experience in AI, machine learning, or related fields. I am available for mentorship sessions on weekends or evenings
Prompt: The president of the United States is
Generated text:  the head of state and head of government of the United States, and the leader of the executive branch of the federal government. The president is also the commander-in-chief of the United States Armed Forces. The president is elected to a four-ye

#### Prompts with structured outputs 

In [9]:
prompts = [
    "What is the capital of China?",
    "What is the capital of Japan?",
]

class CapitalInfo(BaseModel):
    name: str = Field(..., pattern=r"^\w+$", description="Name of the capital city")
    opulation: int = Field(..., description="Population of the capital city")

sampling_params = {"temperature": 0.1,"top_p": 0.95,"json_schema": json.dumps(CapitalInfo.model_json_schema())}

outputs = llm.generate(prompts, sampling_params)
for prompt, output in zip(prompts, outputs):
    print("===============================")
    print(f"Prompt: {prompt}")  # validate the output by the pydantic model
    print(output["text"])
    # capital_info = CapitalInfo.model_validate_json(output["text"])
    # print(f"Validated output: {capital_info.model_dump_json()}")

Prompt: What is the capital of China?
{ "name": "Beijing", "opulation": 2154 }
Prompt: What is the capital of Japan?
{ "name": "Tokyo", "opulation": 37400068 }


#### Apply Proper Chat template

In [10]:
tokenizer = AutoTokenizer.from_pretrained(model_name)
# Make API request
messages = [
    [{"role": "system",
      "content":"You are a helpful assistant."},
    {
        "role": "user",
        "content": "Here is the information of the capital of France.\n",
    }],
    [{"role": "system",
      "content":"You are a helpful assistant."},
     {
        "role": "user",
        "content": "Here is the information of the capital of China .\n",
    }],
]

prompts = tokenizer.apply_chat_template(
    messages, tokenize=False, add_generation_prompt=True
)
outputs = llm.generate(prompts, sampling_params)
for prompt, output in zip(prompts, outputs):
    print("===============================")
    print(f"Prompt: {prompt}")  # validate the output by the pydantic model
    print(output["text"])
    # capital_info = CapitalInfo.model_validate_json(output["text"])
    # print(f"Validated output: {capital_info.model_dump_json()}")

Prompt: <|im_start|>system
You are a helpful assistant.<|im_end|>
<|im_start|>user
Here is the information of the capital of France.
<|im_end|>
<|im_start|>assistant

{
  "name": "Paris",
  "opulation": 2148274
  		
	
}
Prompt: <|im_start|>system
You are a helpful assistant.<|im_end|>
<|im_start|>user
Here is the information of the capital of China .
<|im_end|>
<|im_start|>assistant

{
  "name": "Beijing",
  "opulation": 21540000
  
  
}


#### Use Openai to call local llm with sglang

In [2]:
from llm_factory_openai import SimpleLLMAgent

local_model_args = {"model":"Qwen/Qwen3-8B",
                    "base_url":"http://localhost:8101/v1",
                    "temperature":0.1,
                    "api_key":"abc"
                    }
openai_agent = SimpleLLMAgent(**local_model_args)

result = openai_agent.test_connection()
print(f"OpenAI test result: {result}")



OpenAI test result: Hello! 😊 How can I assist you today? Whether you have questions, need help with something, or just want to chat, I'm here for you! What's on your mind?


In [3]:
## send messages to sglang server; try to response in json format
messages = [{"role": "user", "content": 'hi'}]
response = openai_agent.get_response_content(messages)
print(response)

Hello! How can I assist you today? 😊


In [4]:
# Define a Pydantic model for structured output
class CountryInfo(BaseModel):
    country: str = Field(..., description="Country name")
    capital: str = Field(..., description="Capital city")
    population_millions: float = Field(..., description="Population in millions")

# Prepare a prompt asking for structured information
messages = [
    {
        "role": "system",
        "content": "You are a helpful assistant that provides information in JSON format."
    },
    {
        "role": "user",
        "content": (
            "Please provide the following information about Japan in JSON format: "
            "country name, capital city, and population in millions."
        )
    }
]

# Get structured response using the Pydantic model
structured_result = openai_agent.get_response_content(messages, response_format=CountryInfo)
print("Structured output:", structured_result.model_dump_json(indent=2))


Structured output: {
  "country": "Japan",
  "capital": "Tokyo",
  "population_millions": 126.0
}


#### Test using Openai competable batch job with SGLang

In [None]:
from openai.lib._parsing._completions import type_to_response_format_param

In [None]:
type_to_response_format_param(CountryInfo)

In [None]:
def create_batch_tasks(batch_messages, output_format_type,model_name,task_id):
    
    output_json_schema = type_to_response_format_param(output_format_type)
    
    tasks = []
    for messages in batch_messages:
        task = {
            "custom_id": f"task-{task_id}",
            "method": "POST",
            "url": "/chat/completions",
            "body": {
                "model": model_name,
                "messages": messages,
                "response_format": output_json_schema, #{"type": "json_object"}, #output_json_schema, output format affects output speed
                "temperature": 0.1
            }
        }
        tasks.append(task)
    
    return tasks

In [None]:
batch_file_path = "/ephemeral/home/xiong/data/Fund/Factiva_News/temp/batch.jsonl"
batch_messages = [messages] *100
tasks = create_batch_tasks(batch_messages, output_format_type=CountryInfo,model_name='Qwen/Qwen3-8B',task_id='test')
jsonl_data = '\n'.join([json.dumps(t) for t in tasks])
open(batch_file_path, 'w').write(jsonl_data)
print(f"Created {len(tasks)} batch tasks")

- batch file process, currently not supported : https://github.com/sgl-project/sglang/issues/7427
