In [1]:
import os
import requests
import json
from typing import Dict, Optional, List
from collections import Counter
import re

In [2]:
class LLMClient:
    
    def __init__(
        self,
        api_key: str = "cse476",
        api_base: str = "http://10.4.58.53:41701/v1",
        model: str = "bens_model"
    ):

        self.api_key = api_key
        self.api_base = api_base
        self.model = model
        self.call_count = 0
    
    def call(
        self,
        prompt: str,
        system: str = "You are a helpful assistant.",
        temperature: float = 0.0,
        max_tokens: int = 128,
        timeout: int = 60
    ) -> Dict:
        self.call_count += 1
        
        url = f"{self.api_base}/chat/completions"
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        payload = {
            "model": self.model,
            "messages": [
                {"role": "system", "content": system},
                {"role": "user", "content": prompt}
            ],
            "temperature": temperature,
            "max_tokens": max_tokens,
        }
        
        try:
            resp = requests.post(url, headers=headers, json=payload, timeout=timeout)
            status = resp.status_code
            
            if status == 200:
                data = resp.json()
                text = data.get("choices", [{}])[0].get("message", {}).get("content", "")
                return {
                    "ok": True,
                    "text": text,
                    "status": status,
                    "error": None,
                    "raw": data
                }
            else:
                try:
                    err_text = resp.json()
                except Exception:
                    err_text = resp.text
                
                return {
                    "ok": False,
                    "text": None,
                    "status": status,
                    "error": str(err_text),
                    "raw": None
                }
                
        except requests.RequestException as e:
            return {
                "ok": False,
                "text": None,
                "status": -1,
                "error": str(e),
                "raw": None
            }
    
    def reset_counter(self):
        self.call_count = 0
    
    def get_call_count(self) -> int:
        return self.call_count


def call_model_chat_completions(
    prompt: str,
    system: str = "You are a helpful assistant.",
    model: str = "bens_model",
    temperature: float = 0.0,
    timeout: int = 60
) -> Dict:
    client = LLMClient(model=model)
    return client.call(prompt, system, temperature, timeout=timeout)

In [3]:
if __name__ == "__main__":
    print("Testing LLM Client:")
    
    client = LLMClient()
    
    print("\nTest 1: Simple Math")
    result = client.call("What is 5 + 3? Answer with just the number.")
    
    if result["ok"]:
        print(f"Success! Answer: {result['text']}")
        print(f"   API calls made: {client.get_call_count()}")
    else:
        print(f"Failed: {result['error']}")

    print("\nTest 2: Common Sense")
    result = client.call(
        prompt="You place an ice cube in a glass of water and mark the water level. After the ice melts, does the water level rise, fall, or stay the same? ",
        system="Answer with exactly one of: 'rise', 'fall', 'stay the same'."
    )
    
    if result["ok"]:
        print(f"Success! Answer: {result['text']}")
        print(f"   API calls made: {client.get_call_count()}")
    else:
        print(f"Failed: {result['error']}")
        
    print(f"Total API calls: {client.get_call_count()}")

    print("\nTest 3: logic_race")
    result = client.call(
        prompt="In a race, you pass the person in second place. What position are you now in? ",
        system="Answer with a single word like 'first', 'second', 'third'."
    )
    
    if result["ok"]:
        print(f"Success! Answer: {result['text']}")
        print(f"   API calls made: {client.get_call_count()}")
    else:
        print(f"Failed: {result['error']}")
        
    print(f"Total API calls: {client.get_call_count()}")

Testing LLM Client:

Test 1: Simple Math
Success! Answer: 8
   API calls made: 1

Test 2: Common Sense
Success! Answer: stay the same
   API calls made: 2
Total API calls: 2

Test 3: logic_race
Success! Answer: second
   API calls made: 3
Total API calls: 3


In [4]:
class DataLoader:
    def __init__(self, data_path: str):
        self.data_path = data_path
        self.data = self._load_data()
        self.domain_stats = self._compute_stats()
    
    def _load_data(self) -> List[Dict]:
        with open(self.data_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        return data
    
    def _compute_stats(self) -> Dict:
        domain_counts = Counter(item['domain'] for item in self.data)
        return dict(domain_counts)
    
    def get_all(self) -> List[Dict]:
        return self.data
    
    def get_by_domain(self, domain: str) -> List[Dict]:
        return [item for item in self.data if item['domain'] == domain]
    
    def get_item(self, index: int) -> Dict:
        return self.data[index]
    
    def get_stats(self) -> Dict:
        return {
            'total': len(self.data),
            'domains': self.domain_stats
        }

In [5]:
class Problem:   
    def __init__(self, data: Dict):
        self.input = data['input']
        self.expected_output = data['output']
        self.domain = data['domain']
    
    def __repr__(self):
        return f"Problem(domain='{self.domain}', input_len={len(self.input)})"
    
    def get_input(self) -> str:
        return self.input
    
    def get_expected_output(self) -> str:
        return self.expected_output
    
    def get_domain(self) -> str:
        return self.domain

In [6]:
loader = DataLoader('data/cse476_final_project_dev_data.json')
print(f"Loaded {len(loader.get_all())} problems")

Loaded 1000 problems


In [7]:
class BaselineSolver:
    def __init__(self, client: LLMClient):
        self.client = client
        self.name = "Baseline (Direct)"
    
    def solve(self, problem: Problem) -> Dict:
        question = problem.get_input()
        
        system_message = (
            "You are a helpful assistant. "
            "Reply with only the final answer - no explanation, no reasoning."
        )
        
        result = self.client.call(
            prompt=question,
            system=system_message,
            temperature=0.0,
            max_tokens=128
        )
        
        if result["ok"]:
            answer = result["text"].strip()
            answer = self._normalize_answer_format(answer)
            
            return {
                "answer": answer,
                "success": True,
                "api_calls": 1,
                "method": self.name
            }
        else:
            return {
                "answer": None,
                "success": False,
                "api_calls": 1,
                "method": self.name,
                "error": result["error"]
            }

    def _normalize_answer_format(self, answer: str) -> str:
        if not answer:
            return answer
        
        answer_lower = answer.lower().strip()
        
        if 'water level' in answer_lower:
            match = re.search(r'water\s+level\s+(.+)', answer_lower)
            if match:
                answer_lower = match.group(1).strip()
        
        answer_lower = re.sub(r'\bstays\b', 'stay', answer_lower)
        answer_lower = re.sub(r'\brises\b', 'rise', answer_lower)
        answer_lower = re.sub(r'\bfalls\b', 'fall', answer_lower)
        answer_lower = re.sub(r'\bremains\b', 'remain', answer_lower)
        answer_lower = re.sub(r'\bremain(?:s)?\s+the\s+same\b', 'stay the same', answer_lower)
        answer_lower = re.sub(r'\s+place$', '', answer_lower)
        answer_lower = re.sub(r'\s+position$', '', answer_lower)
        answer_lower = re.sub(r'^the\s+', '', answer_lower)
        answer_lower = re.sub(r'^a\s+', '', answer_lower)
        
        return answer_lower.strip()

In [8]:
class ChainOfThoughtSolver:
    
    def __init__(self, client: LLMClient):
        self.client = client
        self.name = "Chain-of-Thought"
    
    def solve(self, problem: Problem) -> Dict:
        question = problem.get_input()
        
        cot_prompt = f"{question} Think step by step to solve this problem. Show your reasoning, then provide the final answer."
        
        system_message = (
            "You are a helpful assistant that solves problems step-by-step. "
            "Show your reasoning process clearly, then provide the final answer."
        )
        
        result = self.client.call(
            prompt=cot_prompt,
            system=system_message,
            temperature=0.0,
            max_tokens=512
        )
        
        if result["ok"]:
            full_response = result["text"].strip()
            final_answer = self._extract_answer(full_response)
            
            final_answer = self._normalize_answer_format(final_answer)
            
            return {
                "answer": final_answer,
                "reasoning": full_response,
                "success": True,
                "api_calls": 1,
                "method": self.name
            }
        else:
            return {
                "answer": None,
                "reasoning": None,
                "success": False,
                "api_calls": 1,
                "method": self.name,
                "error": result["error"]
            }

    def _normalize_answer_format(self, answer: str) -> str:
        if not answer:
            return answer
        
        answer_lower = answer.lower().strip()
        
        if 'water level' in answer_lower:
            match = re.search(r'water\s+level\s+(.+)', answer_lower)
            if match:
                answer_lower = match.group(1).strip()
        
        answer_lower = re.sub(r'\bstays\b', 'stay', answer_lower)
        answer_lower = re.sub(r'\brises\b', 'rise', answer_lower)
        answer_lower = re.sub(r'\bfalls\b', 'fall', answer_lower)
        answer_lower = re.sub(r'\bremains\b', 'remain', answer_lower)
        answer_lower = re.sub(r'\bremain(?:s)?\s+the\s+same\b', 'stay the same', answer_lower)
        answer_lower = re.sub(r'\s+place$', '', answer_lower)
        answer_lower = re.sub(r'\s+position$', '', answer_lower)
        
        answer_lower = re.sub(r'^the\s+', '', answer_lower)
        answer_lower = re.sub(r'^a\s+', '', answer_lower)
        
        return answer_lower.strip()
    
    def _extract_answer(self, reasoning: str) -> str:
        last_part = reasoning[-400:] if len(reasoning) > 400 else reasoning
        final_answer_multiline = re.search(
            r'[Ff]inal\s+[Aa]nswer\s*:?\s*\n+(.+?)(?:\.|$)', 
            last_part, 
            re.DOTALL
        )
        if final_answer_multiline:
            answer = final_answer_multiline.group(1).strip()
            answer = self._clean_answer(answer)
            if 1 <= len(answer) <= 100:
                return answer
        
        final_answer_sameline = re.search(
            r'[Ff]inal\s+[Aa]nswer\s*:?\s*(.+?)(?:\.|$)',
            last_part
        )
        if final_answer_sameline:
            answer = final_answer_sameline.group(1).strip()
            answer = self._clean_answer(answer)
            if 1 <= len(answer) <= 100 and answer.lower() not in ['', 'the', 'a']:
                return answer
        answer_is = re.search(
            r'[Tt]he\s+answer\s+is\s*:?\s*(.+?)(?:\.|$)',
            last_part,
            re.IGNORECASE
        )
        if answer_is:
            answer = answer_is.group(1).strip()
            answer = self._clean_answer(answer)
            if 1 <= len(answer) <= 100:
                return answer
        boxed = re.search(r'\\boxed\{([^}]+)\}', last_part)
        if boxed:
            return self._clean_answer(boxed.group(1))
        latex_double = re.findall(r'\$\$(.+?)\$\$', last_part, re.DOTALL)
        if latex_double:
            answer = self._clean_answer(latex_double[-1])
            if 1 <= len(answer) < 50:
                return answer
        bold = re.findall(r'\*\*([^\*]+)\*\*', last_part)
        if bold:
            sorted_bold = sorted(bold, key=len)
            for b in sorted_bold:
                cleaned = self._clean_answer(b)
                if 1 <= len(cleaned) <= 30:
                    return cleaned
        standalone_number = re.search(r'\b(\d+(?:\.\d+)?)\b(?!\s*\w)', last_part)
        if standalone_number:
            return standalone_number.group(1)
        position_words = ['first', 'second', 'third', 'fourth', 'fifth', 'sixth']
        words_in_text = re.findall(r'\b([a-zA-Z]+)\b', last_part.lower())
        for word in reversed(words_in_text):
            if word in position_words:
                return word
        sentences = [s.strip() for s in re.split(r'[.!?]+', last_part) if s.strip()]
        if sentences:
            last_sentence = sentences[-1]
            cleaned = self._clean_answer(last_sentence)
            
            if 1 <= len(cleaned) <= 50:
                return cleaned
        lines = [line.strip() for line in last_part.split('\n') if line.strip()]
        if lines:
            last_line = lines[-1]
            cleaned = self._clean_answer(last_line)
            
            if 1 <= len(cleaned) <= 100:
                return cleaned
        return self._clean_answer(last_part[-100:] if len(last_part) > 100 else last_part)
    
    def _clean_answer(self, answer: str) -> str:
        if not answer:
            return ""
        
        answer = re.sub(r'\*+', '', answer)
        answer = re.sub(r'\$+', '', answer)
        answer = re.sub(r'\\boxed\{([^}]+)\}', r'\1', answer)
        answer = re.sub(r'\\[a-zA-Z]+', '', answer)
        answer = re.sub(r'[{}()\[\]]', '', answer)
        answer = re.sub(r'[.!?;:,]+$', '', answer)
        answer = re.sub(r'^[.!?;:,\s]+', '', answer)
        answer = re.sub(r'\s+', ' ', answer)
        answer = re.sub(r'^(?:you\s+)?(?:are\s+)?(?:now\s+)?(?:in\s+)?(?:the\s+)?', '', answer, flags=re.IGNORECASE)
        answer = re.sub(r'^(?:the|is|be|it)\s+', '', answer, flags=re.IGNORECASE)
        
        return answer.strip()

In [9]:
class SelfConsistencySolver:
    def __init__(self, client: LLMClient, num_samples: int = 5):
        self.client = client
        self.num_samples = num_samples
        self.name = "Self-Consistency"
    
    def solve(self, problem: Problem) -> Dict:
        question = problem.get_input()
        
        cot_prompt = f"{question} Think step by step to solve this problem. Show your reasoning, then provide the final answer."
        
        system_message = (
            "You are a helpful assistant that solves problems step-by-step. "
            "Show your reasoning process clearly, then provide the final answer."
        )
        
        all_answers = []
        all_reasoning = []
        successful_calls = 0
        
        for i in range(self.num_samples):
            result = self.client.call(
                prompt=cot_prompt,
                system=system_message,
                temperature=0.1,
                max_tokens=512
            )
            
            if result["ok"]:
                full_response = result["text"].strip()
                answer = self._extract_answer(full_response)
                
                answer = self._normalize_answer_format(answer)
                
                all_answers.append(answer)
                all_reasoning.append(full_response)
                successful_calls += 1
            else:
                all_answers.append(None)
                all_reasoning.append(None)
        
        if successful_calls == 0:
            return {
                "answer": None,
                "all_answers": all_answers,
                "vote_counts": {},
                "all_reasoning": all_reasoning,
                "success": False,
                "api_calls": self.num_samples,
                "method": self.name,
                "error": "All API calls failed"
            }
        
        valid_answers = [a for a in all_answers if a is not None]
        normalized_answers = [self._normalize_answer(a) for a in valid_answers]
        vote_counts = Counter(normalized_answers)
        
        if vote_counts:
            majority_answer = vote_counts.most_common(1)[0][0]
            confidence = vote_counts[majority_answer] / len(valid_answers)
        else:
            majority_answer = None
            confidence = 0.0
        
        return {
            "answer": majority_answer,
            "all_answers": valid_answers,
            "vote_counts": dict(vote_counts),
            "confidence": confidence,
            "all_reasoning": [r for r in all_reasoning if r is not None],
            "success": True,
            "api_calls": self.num_samples,
            "method": self.name
        }
    
    def _extract_answer(self, reasoning: str) -> str:
        last_part = reasoning[-400:] if len(reasoning) > 400 else reasoning
        final_answer_multiline = re.search(
            r'[Ff]inal\s+[Aa]nswer\s*:?\s*\n+(.+?)(?:\.|$)', 
            last_part, 
            re.DOTALL
        )
        if final_answer_multiline:
            answer = final_answer_multiline.group(1).strip()
            answer = self._clean_answer(answer)
            if 1 <= len(answer) <= 100:
                return answer
        
        final_answer_sameline = re.search(
            r'[Ff]inal\s+[Aa]nswer\s*:?\s*(.+?)(?:\.|$)',
            last_part
        )
        if final_answer_sameline:
            answer = final_answer_sameline.group(1).strip()
            answer = self._clean_answer(answer)
            if 1 <= len(answer) <= 100 and answer.lower() not in ['', 'the', 'a']:
                return answer
        position_words = ['first', 'second', 'third', 'fourth', 'fifth', 'sixth']
        last_200 = reasoning[-200:] if len(reasoning) > 200 else reasoning
        
        for word in position_words:
            matches = re.findall(r'\b(' + word + r')\b', last_200, re.IGNORECASE)
            if matches:
                return matches[-1].lower()
        answer_is = re.search(
            r'[Tt]he\s+answer\s+is\s*:?\s*(.+?)(?:\.|$)',
            last_part,
            re.IGNORECASE
        )
        if answer_is:
            answer = answer_is.group(1).strip()
            answer = self._clean_answer(answer)
            if 1 <= len(answer) <= 100:
                return answer
        boxed = re.search(r'\\boxed\{([^}]+)\}', last_part)
        if boxed:
            return self._clean_answer(boxed.group(1))
        
        latex_double = re.findall(r'\$\$(.+?)\$\$', last_part, re.DOTALL)
        if latex_double:
            answer = self._clean_answer(latex_double[-1])
            if 1 <= len(answer) < 50:
                return answer
        
        bold = re.findall(r'\*\*([^\*]+)\*\*', last_part)
        if bold:
            sorted_bold = sorted(bold, key=len)
            for b in sorted_bold:
                cleaned = self._clean_answer(b)
                if 1 <= len(cleaned) <= 30:
                    return cleaned
        standalone_number = re.search(r'\b(\d+(?:\.\d+)?)\b(?!\s*\w)', last_part)
        if standalone_number:
            return standalone_number.group(1)
        sentences = [s.strip() for s in re.split(r'[.!?]+', last_part) if s.strip()]
        if sentences:
            last_sentence = sentences[-1]
            cleaned = self._clean_answer(last_sentence)
            
            if 1 <= len(cleaned) <= 50:
                return cleaned
        lines = [line.strip() for line in last_part.split('\n') if line.strip()]
        if lines:
            last_line = lines[-1]
            cleaned = self._clean_answer(last_line)
            
            if 1 <= len(cleaned) <= 100:
                return cleaned
        return self._clean_answer(last_part[-100:] if len(last_part) > 100 else last_part)
    
    def _clean_answer(self, answer: str) -> str:
        if not answer:
            return ""
        
        answer = re.sub(r'\*+', '', answer)
        answer = re.sub(r'\$+', '', answer)
        answer = re.sub(r'\\boxed\{([^}]+)\}', r'\1', answer)
        answer = re.sub(r'\\[a-zA-Z]+', '', answer)
        answer = re.sub(r'[{}()\[\]]', '', answer)
        answer = re.sub(r'[.!?;:,]+$', '', answer)
        answer = re.sub(r'^[.!?;:,\s]+', '', answer)
        answer = re.sub(r'\s+', ' ', answer)
        answer = re.sub(r'^(?:you\s+)?(?:are\s+)?(?:now\s+)?(?:in\s+)?(?:the\s+)?', '', answer, flags=re.IGNORECASE)
        answer = re.sub(r'^(?:the|is|be|it)\s+', '', answer, flags=re.IGNORECASE)
        
        return answer.strip()
    
    def _normalize_answer_format(self, answer: str) -> str:
        if not answer:
            return answer
        
        answer_lower = answer.lower().strip()
        if 'water level' in answer_lower:
            match = re.search(r'water\s+level\s+(.+)', answer_lower)
            if match:
                answer_lower = match.group(1).strip()
        
        answer_lower = re.sub(r'\bstays\b', 'stay', answer_lower)
        answer_lower = re.sub(r'\brises\b', 'rise', answer_lower)
        answer_lower = re.sub(r'\bfalls\b', 'fall', answer_lower)
        answer_lower = re.sub(r'\bremains\b', 'remain', answer_lower)
        answer_lower = re.sub(r'\bremain(?:s)?\s+the\s+same\b', 'stay the same', answer_lower)
        answer_lower = re.sub(r'\s+place$', '', answer_lower)
        answer_lower = re.sub(r'\s+position$', '', answer_lower)
        answer_lower = re.sub(r'^the\s+', '', answer_lower)
        answer_lower = re.sub(r'^a\s+', '', answer_lower)
        
        return answer_lower.strip()
    
    def _normalize_answer(self, answer: str) -> str:
        if not answer:
            return ""
        
        normalized = answer.lower().strip()
        normalized = re.sub(r'\*\*', '', normalized)
        normalized = re.sub(r'\$', '', normalized)
        normalized = re.sub(r'\\boxed\{([^}]+)\}', r'\1', normalized)
        normalized = re.sub(r'\\', '', normalized)
        normalized = re.sub(r'[{}]', '', normalized)
        normalized = re.sub(r'[.!?]+$', '', normalized)
        normalized = re.sub(r'\s+', ' ', normalized)
        
        return normalized.strip()

In [10]:
client = LLMClient()
print("LLM Client created")

loader = DataLoader('data/cse476_final_project_dev_data.json')
print(f"Loaded {len(loader.get_all())} problems")

baseline = BaselineSolver(client)
cot = ChainOfThoughtSolver(client)
sc = SelfConsistencySolver(client, num_samples=5)

print("All solvers ready!")

stats = loader.get_stats()
print(f"\nDataset: {stats['total']} problems")
for domain, count in stats['domains'].items():
    print(f"  {domain}: {count}")

LLM Client created
Loaded 1000 problems
All solvers ready!

Dataset: 1000 problems
  math: 300
  coding: 100
  future_prediction: 100
  planning: 100
  common_sense: 400


In [11]:
test_problems = [
    {
        "input": "What is 17 + 28?",
        "output": "45",
        "domain": "math"
    },
    {
        "input": "In a race, you pass the person in second place. What position are you now in?",
        "output": "second",
        "domain": "logic"
    },
    {
        "input": "You place an ice cube in a glass of water. After the ice melts, does the water level rise, fall, or stay the same?",
        "output": "stay the same",
        "domain": "common_sense"
    }
]

print("="*70)
print("TESTING BASELINE SOLVER")
print("="*70)

client.reset_counter()

for prob_data in test_problems:
    problem = Problem(prob_data)
    result = baseline.solve(problem)
    
    match = result['answer'] == prob_data['output']
    
    print(f"\n {prob_data['domain'].upper()}")
    print(f"   Expected: '{prob_data['output']}'")
    print(f"   Got: '{result['answer']}'")

print(f"\nTotal API calls: {client.get_call_count()}")

print("="*70)
print("Testing CoT on all problem types:")
print("="*70)

for prob_data in test_problems:
    problem = Problem(prob_data)
    result = cot.solve(problem)
    
    match = result['answer'] == prob_data['output']
    
    print(f"\n {prob_data['domain'].upper()}")
    print(f"   Expected: '{prob_data['output']}'")
    print(f"   Got: '{result['answer']}'")

print(f"\nTotal API calls: {client.get_call_count()}")

print("\n" + "="*70)
print("TESTING SELF-CONSISTENCY SOLVER")
print("="*70)

for prob_data in test_problems:
    problem = Problem(prob_data)
    result = sc.solve(problem)
    
    match = result['answer'] == prob_data['output']
    
    print(f"\n {prob_data['domain'].upper()}")
    print(f"   Expected: '{prob_data['output']}'")
    print(f"   Got: '{result['answer']}'")
    print(f"   Confidence: {result['confidence']:.0%}")
    print(f"   All answers: {result['all_answers']}")

print(f"\nTotal API calls: {client.get_call_count()}")

TESTING BASELINE SOLVER

 MATH
   Expected: '45'
   Got: '45'

 LOGIC
   Expected: 'second'
   Got: 'second'

 COMMON_SENSE
   Expected: 'stay the same'
   Got: 'stay the same'

Total API calls: 3
Testing CoT on all problem types:

 MATH
   Expected: '45'
   Got: '45'

 LOGIC
   Expected: 'second'
   Got: 'second'

 COMMON_SENSE
   Expected: 'stay the same'
   Got: 'stay the same'

Total API calls: 6

TESTING SELF-CONSISTENCY SOLVER

 MATH
   Expected: '45'
   Got: '45'
   Confidence: 100%
   All answers: ['45', '45', '45', '45', '45']

 LOGIC
   Expected: 'second'
   Got: 'second'
   Confidence: 100%
   All answers: ['second', 'second', 'second', 'second', 'second']

 COMMON_SENSE
   Expected: 'stay the same'
   Got: 'stay the same'
   Confidence: 60%
   All answers: ['volume of water = 100 g1.0 g/cm^3 = 100 cm^3', 'stay the same', 'volume', 'stay the same', 'stay the same']

Total API calls: 21
