In [None]:
# !pip install python-dotenv OpenAI fastapi uvicorn nest_asyncio pydantic tiktoken --quiet

In [None]:
import os
from dotenv import load_dotenv
from openai import OpenAI

load_dotenv()  

client = OpenAI(
    api_key=os.environ.get("GILAS_API_KEY"),
    base_url="https://api.gilas.io/v1/"
)

In [None]:
context = 16385
max_tokens = 1000
messages_formating_tokens = 20 # number of tokens used to format messages

def get_completion(instruction, prompt, model="gpt-4o-mini"):
    messages = [{"role": "system", "content": instruction},
                {"role": "user", "content": prompt}]
    response = client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=0.0,
        max_tokens=max_tokens
    )
    
    return response.choices[0].message.content

In [None]:
import tiktoken
import random

encoding = tiktoken.get_encoding("cl100k_base")

def num_tokens_from_string(string: str) -> int:
    """Returns the number of tokens in a text string."""
    num_tokens = len(encoding.encode(string))
    return num_tokens

def generate_few_shots_instruction(basic_instruction, prompt):
    prompt_tokens = num_tokens_from_string(prompt)
    instruction = f"{basic_instruction}\n"

    file_path = "few-shots.txt"
    with open(file_path, "r") as file:
        lines = file.readlines()
    
    # remove the empty lines
    lines = [line.strip() for line in lines if line.strip()]
    
    # store samples in a list
    samples = []
    # read every two lines (EN, FA) together
    for i in range(0, len(lines), 2):
        if i + 1 < len(lines):
            if "English:" not in lines[i].strip():
                raise RuntimeError(f"Line does not start with 'English: ', {lines[i]}")
            if "Farsi:" not in lines[i+1].strip():
                raise RuntimeError(f"Line does not start with 'Farsi: ', {lines[i+1]}")
            
            sample = f"{lines[i].strip()}\n{lines[i + 1].strip()}\n"
            samples.append(sample)

    # randomize to allow model to see variety of samples, if samples are more than the context size
    random.shuffle(samples)
    for sample in samples:
        new_instruction = instruction + f"{sample}\n"
        if num_tokens_from_string(new_instruction) > (context - (max_tokens + prompt_tokens + messages_formating_tokens)):
            break
        instruction = new_instruction
    
    return instruction

In [None]:
basic_instruction = f"""
You are an intelligent translator specializing in translating technical texts from English to Farsi. Please follow these steps to translate the provided English texts to Farsi:
1. Thoroughly understand the given text.
2. Translate the content into fluent Farsi, preserving the original structure and flow.
3. Tailor the translation for software engineers by keeping the technical and programming terms in English, enclosed in backticks (e.g., `GPT-3`).
4. You can include additional explanations in Farsi for clarity, if needed.
5. If you encounter a block of text enclosed in triple backticks, do not translate it; keep it as it is in your translation.
6. Review your translation to ensure you followed the steps. If the translated text is not fluent Farsi, revise it. 
7. Your output should only include the very final version and formatted translation. 
"""

In [None]:
def get_next_test_data():
    file_path = "test-data.txt"
    with open(file_path, "r") as file:
        lines = file.readlines()
    
    if not hasattr(get_next_test_data, "current_index"):
        get_next_test_data.current_index = 0

    while get_next_test_data.current_index < len(lines):
        next_line = lines[get_next_test_data.current_index].strip()
        get_next_test_data.current_index += 1
        if next_line:
            return f"""English: {next_line} \n
            Farsi: ?
            """

    return None

In [None]:
def few_shots_prompt(question):
    _instruction = generate_few_shots_instruction(basic_instruction, question)   
    return get_completion(_instruction, question)

def zero_shot_prompt(question):
    return get_completion(basic_instruction, question)

In [None]:
def generate_html_table(prompt, single_shot_response, few_shots_response):
    # Initialize the table with headers if it's the first call
    if not hasattr(generate_html_table, 'header_added'):
        generate_html_table.html_content = "<table>\n<tr><th>English Text</th><th>Zero Shot Translation</th><th>Few Shots Translation</th><th>Manual Translation</th></tr>\n"
        generate_html_table.header_added = True
    else:
        # Remove the closing </table> tag if it exists
        if generate_html_table.html_content.endswith("</table>"):
            generate_html_table.html_content = generate_html_table.html_content[:-8]


    prompt = prompt.replace("English:", "").replace("Farsi: ?", "").replace("Farsi:", "")
    few_shots_response = few_shots_response.replace("English:", "").replace("Farsi: ?", "").replace("Farsi:", "")
    single_shot_response = single_shot_response.replace("English:", "").replace("Farsi: ?", "").replace("Farsi:", "")

    # Append new row to the html content
    generate_html_table.html_content += f"""
    <tr>
    <td>{prompt}</td>
    <td><input type="checkbox" class="single_shot"><label dir="rtl">{single_shot_response}</label></td>
    <td><input type="checkbox" class="few_shots"><label dir="rtl">{few_shots_response}</label></td>
    <td><textarea rows="10" cols="50" dir="rtl"></textarea></td>
    </tr>\n"""

    # Re-add the closing </table> tag
    generate_html_table.html_content += "</table>"

    return generate_html_table.html_content


In [None]:
import asyncio

async def gen_prompt_response(sample):
    # Run synchronous functions in separate threads
    few_shot_future = asyncio.to_thread(few_shots_prompt, sample)
    single_shot_future = asyncio.to_thread(zero_shot_prompt, sample)
    # Wait for both to complete
    few_shots_response, single_shot_response = await asyncio.gather(few_shot_future, single_shot_future)

    # Generate the HTML table with the results
    return generate_html_table(sample, single_shot_response, few_shots_response)

In [None]:
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse
import uvicorn
import nest_asyncio
from pydantic import BaseModel
from typing import List, Dict

app = FastAPI()

@app.get("/", response_class=HTMLResponse)
async def prompt_endpoint():
    html_table = ""
    sample = get_next_test_data()
    while sample:
        html_table = await gen_prompt_response(sample)
        sample = get_next_test_data()

    js_script = """
    function sendPromptData() {
    
            const tableRows = document.querySelectorAll("table tr:not(:first-child)");
            const data = Array.from(tableRows).map(row => {
                const inputText = row.cells[0].innerText;
                const singleShotCheckbox = row.querySelector('.single_shot');
                const fewShotsCheckbox = row.querySelector('.few_shots');
                const textarea = row.querySelector('textarea');

                let response = textarea.value;  // Default to textarea value
                if (singleShotCheckbox.checked) {
                response = singleShotCheckbox.nextElementSibling.innerText; // Get the label next to the checkbox
                } else if (fewShotsCheckbox.checked) {
                response = fewShotsCheckbox.nextElementSibling.innerText; // Get the label next to the checkbox
                }

                return { "english": inputText, "farsi": response };
            });

            var payload = { "data": data }
            console.log(payload);

            // Create the request
            fetch('/rlhf', {
                method: 'POST', 
                headers: {
                    'Content-Type': 'application/json'
                },
                body: JSON.stringify(payload)
            })
            .then(response => response.json())
            .then(data => {
                console.log('Success:', data);
                alert('Data sent successfully!');
            })
            .catch((error) => {
                console.error('Error:', error);
                alert('Failed to send data.');
            });
        }
    """
    html_content = f"""<html>
    <head><title>Prompt Results</title>
    <script>
            {js_script}
    </script>
    </head>
    
    <body> 
    <h2> How to use this tool </h2>
    <p>
    To utilize this Right-Left-Human-Feedback tool, you must either select one of the provided translations for the given English text by checking its checkbox or provide your own translation. <br/>
    You can also skip some rows if needed. <br/> <br/>
    Upon clicking the "Submit Feedback" button, your feedback is sent to the server and appended to the few-shots.txt file for future reference.
    </p>
    {html_table} 
    <br />
    <br />
    <button type="button" onclick="sendPromptData()">Submit Feedback</button>

    </body></html>"""


    return html_content

# Define the data model for the payload
class Entry(BaseModel):
    english: str
    farsi: str

class Data(BaseModel):
    data: List[Entry]

@app.post("/rlhf")
async def receive_prompt(payload: Data):
    try:
        # update samples in the few-shots file
        with open("few-shots.txt", 'a') as file:
            data = payload.model_dump()['data']
            for item in data:
                if len(item['farsi']) == 0:
                    continue
                output_string = f"English: {item['english']}\nFarsi: {item['farsi']}\n\n"
                file.write(output_string)

        # delete the test entry from the test-data file
        with open("test-data.txt", 'r') as file:
            lines = file.readlines()

        found = False
        for line in lines:
            for item in data:
                if len(item['farsi']) > 0 and item['english'] in line:
                    lines.remove(line)
                    found = True

        if not found:
            return

        # Write the modified content back to the file
        with open("test-data.txt", 'w') as file:
            file.writelines(lines)

    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))
    
# If using asyncio in Jupyter, apply the following:
nest_asyncio.apply()

uvicorn.run(app, host="127.0.0.1", port=8000, log_level="info")