In [6]:
import requests
from bs4 import BeautifulSoup
import bm25s
import re
import sys
from io import StringIO
import platform
import os
import ollama
from typing import Dict, Generator

resource module not available on Windows


### Make a Search Engine with RAG abilities:

In [8]:
class SearchEngine:

    def __init__(self) -> None:
        self.base_url = "https://www.googleapis.com/customsearch/v1"
        self.key = "APIKEY"
        self.cx = 'CXKEY' 

    def _search_google_snippets(self ,query, num_results=3):

        base_url = "https://www.googleapis.com/customsearch/v1"
        
        params = {
            'q': query,
            'key': self.key,
            'cx': self.cx,
            'num': num_results
        }
        
        try:
            response = requests.get(base_url, params=params)
            response.raise_for_status()
            results = response.json().get('items', [])
            links = [item['link'] for item in results]
            
            return links
        
        except requests.RequestException as e:
            print(f"An error occurred: {e}")
            return []
        
    def _extract_paragraph_text(self, urls):

        urls_dtype, urls_len = type(urls), len(urls)

        if urls_len == 0:
            return None
        
        if urls_dtype == str:
            urls = list([urls])

        all_paras = ' '
        for url in urls:
            response = requests.get(url)
            html_content = response.text

            soup = BeautifulSoup(html_content, 'html.parser')
            paragraphs = soup.find_all('p')
            paragraph_text = ' '.join(p.get_text(strip=True) for p in paragraphs)

            all_paras +='\n'
            all_paras +=paragraph_text
        
        return all_paras
    
    def _perform_rag_bm25(self, query, full_data):

        # Create the corpus with a simple regex split:
        corpus = re.split(r'(?<=[.!?])\s+', full_data)

        # Tokenize the corpus:
        corpus_tokens = bm25s.tokenize(corpus)

        # Create the BM25s model to index the corpus
        # You may not index if you dont want to since it takes more memory, but save search time later.
        retriever = bm25s.BM25()
        retriever.index(corpus_tokens)

        # tokenize the query:
        query_tokens = bm25s.tokenize(query)

        # Get the top-k results
        results, scores = retriever.retrieve(query_tokens, corpus, k = 5)

        # Now append the results in an enumerated plaintext format:
        context = ""
        for i in range(results.shape[1]):
            doc = results[0, i]
            context += f"{i+1}: {doc}\n"

        return context
    
    def execute(self, query):

        # Step 1: send the query to google to fetch the most matching links    
        links = self._search_google_snippets(query)
        
        # Step 2: Extract text data from <p> tags from those links
        full_data = self._extract_paragraph_text(links)

        # # Step 3: Save the Data (Just For Fun)
        # with open("Out.txt", 'w', encoding= 'utf-8') as f:
        #     f.write(full_data)

        # Step 4: Now, using the data as corpus, re-run the query against i
        context = self._perform_rag_bm25(query, full_data)

        self.context = context

        return context

### Self Fixing Coding Agent

1. Run an LLM instance
2. Extract code with regex parsing
3. Send code to a subprocess to execute it
4. Fetch output/Error and return it back to the LLM (VIA SYSTEM PROMPT)
5. Put 2-4 on Loop until desired output has been achieved.
6. Return final code

Additional tasks:
1. Give it a filepath and make it change the code base within it.
2. Create a critic that asks for required paths for any given task as inputs.

In [10]:
def extract_code_blocks(text):
    pattern = r'```(?:\w+\n)?(.+?)```'
    matches = re.findall(pattern, text, re.DOTALL)
    return [match.strip() for match in matches]

def execute_code(code_string: str):
    """Returns Boolean, Evaluation String"""
    # Redirect stdout and stderr
    old_stdout = sys.stdout
    old_stderr = sys.stderr
    redirected_output = sys.stdout = StringIO()
    redirected_error = sys.stderr = StringIO()

    output = ""
    error = None

    try:
        # Execute the code
        exec(code_string)
        output = redirected_output.getvalue()
        error = redirected_error.getvalue()
    except Exception as e:
        error = str(e)
    finally:
        # Restore stdout and stderr
        sys.stdout = old_stdout
        sys.stderr = old_stderr

    return output, error

In [19]:
# Lets create a system prompt:

SYSTEM_MESSAGE = f"""
You are a fast, efficient AI assistant specialized in analyzing terminal history and providing solutions. Your task is to:

1. Scan the provided message history.
2. Identify the most recent error or issue or request.
3. Take a deep breath, and thoughtfully, carefully determine the most likely solution or debugging step.
4. Respond with a VERY brief explanation followed by a markdown code block containing a PYTHON CODE to solve the issue.
5. Make sure to call the functions or classes you write in the Python Code as well, in the same code.

Rules:
- DO NOT WRITE PYTHON at the TOP when returning PYTHON CODE
- Keep any explanatory text extremely brief and concise.
- Place explanatory text before the code block.
- NEVER USE COMMENTS IN YOUR CODE.
- Ensure that your code follows given CWD and System
- If previously given code attempted to fix the issue and failed, learn from them by proposing a DIFFERENT code.
- Focus on the most recent error, ignoring earlier unrelated code.
- The error may be as simple as a spelling error, or as complex as requiring tests to be run, or code to be find-and-replaced.
- Prioritize speed and conciseness in your response. Don't use markdown headings. Don't say more than a sentence or two. Be incredibly concise.

User's System: {platform.system()}
CWD: {os.getcwd()}
"""

In [24]:
# Lets define the messages chain:
messages = []
print("Created Session State ...")

models = [model["name"] for model in ollama.list()["models"]]
model = models[2]

print("Select Model:", model)

messages.append({"role": "system", "content": SYSTEM_MESSAGE})

print("System Message Initialized ...")

Created Session State ...
Select Model: phi3:latest
System Message Initialized ...


In [27]:
def chat_and_execute(query, messages, model):

    # Get user input
    print("query:\n\n", query, "\n\n")
    messages.append({"role": "user", "content": query})
    
    # Chat with LLM
    response = ollama.chat(model, messages)
    assistant_response = response['message']['content']
    messages.append({"role": "assistant", "content": assistant_response})
    print("==== ASSISTANT ================")

    print(assistant_response)
    
    # Parse code from response
    code = extract_code_blocks(assistant_response)[0]
    
    if code:
        # Execute code
        result, error = execute_code(code)
        
        while error:
            # Code execution failed, ask LLM to refine

            print('Error', error)
            outstr = f"""
                The code execution failed with the following error: {error}.
                Please rewrite the code to fix this issue."""
            messages.append({"role": "user", "content": outstr})
            response = ollama.chat(model, messages)
            assistant_response = response['message']['content']
            messages.append({"role": "assistant", "content": assistant_response})
            print("==== ASSISTANT ================")
            print(assistant_response)

            code = extract_code_blocks(assistant_response)[0]
            if code:
                result, error = execute_code(code)
            else:
                error = None
        else:
            print("Code executed successfully.")
            print("Result:", result)
    
    print("Assistant:", assistant_response)

chat_and_execute("You are still wrong, do it by making a class that writes an algorithm to get a random number instead of using the random library", messages, model)

query:

 You are still wrong, do it by making a class that writes an algorithm to get a random number instead of using the random library 


 ```python
class RandomNumberGenerator:
    def __init__(self):
        self.seed = int((datetime.datetime.now() - datetime.datetime(1970, 1, 1)).total_seconds()) % (2**32) # Seed calculation from current time
    
    @staticmethod
    def _getRandomIndex():
        return random.randint(0, self.seed) if not hasattr(self,'random') else getattr(random, 'randrange')(1, self.seed + 2 - len(str(self.seed))) # Secure pseudo-random index generation with seed modification for reproducibility
        
    def generateRandomNumber(self):
        if not hasattr(self,'r'):
            return None
        r = getattr(random, 'randrange') if self.use_secure else _getRandomIndex # Lazy initialization and secure random fallback using os.urandom for seed in the next step of optimization
        self.r = [int((1 - (i / float(self.seed))) ** 0.5) * r % self.seed +