# Fine-Tuning a Function-calling chain

Steps:
- Define agent and tools that you want to learn from
- Define the inputs you'd like to feed into the agent
- Capture traces
- Convert to fine-tuning format
- Fine-tune
- Use in LangChain

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from langchain import chains, agents, chat_models, prompts, tools
# Local chat loader for some Kapa questions
from discord_loader import DiscordChatLoader

In [3]:
import requests
from pydantic import BaseModel, Field
import uuid
from typing import Optional

class ChatRequest(BaseModel):
    message: str
    model: str = "openai"
    history: Optional[list] = None
    conversation_id: Optional[str] = None
    
    
class ChatLangchainRequest(BaseModel):
    message: str = Field(..., description="""The question to ask the documentation agent.""")
    conversation_uuid: Optional[uuid.UUID] = Field(default=None, description="""The UUID for the conversation. If none, a UUID will be generated.""")
    
chat_histories = {}
    
@tools.tool(args_schema=ChatLangchainRequest)
def chat_langchain(message: str,  conversation_uuid: Optional[uuid.UUID] = None) -> dict:
    """Query the langchain documentation to answer questions about LangChain, the
    LLM application framework.
    
    Args:
        message (str): The question to ask the documentation agent.
        conversation_uuid (Optional[uuid]): The optional UUID for the conversation. If none, a UUID will be generated for you.
    
    """
    global chat_histories
    conversation_id = conversation_uuid
    if conversation_id is not None:
        try:
            conversation_id = uuid.UUID(conversation_id)
        except:
            conversation_id = None
    conversation_id = conversation_id or uuid.uuid4()
    history = chat_histories.get(conversation_id) or []
    history.append({"question": message})
    request = ChatRequest(message=message, model="openai", history=history, conversation_id=str(conversation_id))
    response = requests.post("http://localhost:8080/chat", json=request.dict())
    chat_histories[conversation_id] = history + [{"result": response.text}]
    return {
        "conversation_uuid": conversation_id,
        "response": response.text
    }

In [4]:
selected_tools = [chat_langchain]

In [5]:
agent_executor = agents.initialize_agent(
    tools=selected_tools,
    llm=chat_models.ChatOpenAI(model="gpt-4"),
    agent=agents.AgentType.OPENAI_FUNCTIONS,
)

# We will reframe the question to better direct the agent.
ask_q = lambda x: f"A discord user asked this question:\n \"\"\"\n{x}\n\"\"\" What's the verified answer?? Do not stop until you have the correct answer."
chain = ask_q | agent_executor | (lambda x: x['output'])

In [6]:
docs = DiscordChatLoader("discord.txt")
sessions = docs.load()

In [7]:
from langchain import callbacks

trace_project_name = "chat-langchain-user-agent"

# We will be processing each of these questions with some level of parallelism
# questions = [m.content for m in sessions[0]['messages']]
# with callbacks.tracing_v2_enabled(project_name=trace_project_name):
#     results = chain.batch(questions, return_exceptions=True)

In [9]:
import copy
import tempfile
import json
import subprocess
import functools
from typing import List, Dict

def convert_to_typescript(functions: dict) -> str:
    # Depends on !npm install -g json-schema-to-typescript
    return subprocess.run(
        "json2ts --bannerComment=''",
        input=json.dumps(functions).encode("utf-8"),
        stdout=subprocess.PIPE,
        check=True,
        shell=True
    ).stdout.decode("utf-8")
    

def convert_from_oai(f: dict) -> str:
    schema = copy.deepcopy(f)
    parameters = schema.pop('parameters')
    parameters['title'] = schema['name']
    parameters['description'] = schema['description']
    return parameters

_cache = {}
def convert_functions_to_typescript(functions: List[dict]) -> str:
    key = tuple([str(f) for f in functions])
    if key in _cache:
        return _cache[key]
    result = "\n\n".join([convert_to_typescript(convert_from_oai(f)) for f in functions])
    _cache[key] = result
    return result
    

In [33]:
import collections
from typing import Optional, List, Type
from langchain.load import load
from langchain import schema
import langsmith

def get_ultimate_llm_runs(client: langsmith.Client, project_name: str, tags: Optional[List[str]] = None, run_name: Optional[str] = None) -> List[langsmith.schemas.Run]:
    runs = client.list_runs(project_name=project_name, run_type="llm", tags=tags, run_name=run_name)
    by_parent = collections.defaultdict(list)
    for run in runs:
        by_parent[run.parent_run_id].append(run)

    all_parents = list(by_parent.keys())
    last_llm = []
    for parent in all_parents:
        last_llm.append(sorted(by_parent[parent], key=lambda x: x.start_time, reverse=True)[0])
    return last_llm

def extract_fine_tuning_message(run: langsmith.schemas.Run) -> List[schema.BaseMessage]:
    # TODO: Incorporate the output. Probably not necessary though.
    messages = load.load(run.inputs['messages'])
    functions = run.extra['invocation_params']['functions']
    functions_ts = convert_functions_to_typescript(functions)
    # Assumes the first message is a system message
    messages[0] = schema.SystemMessage(content=functions_ts)
    return messages

def convert_function_messages(messages: List[schema.BaseMessage], to_type: Type[schema.BaseMessage] = schema.HumanMessage) -> List[schema.BaseMessage]:
    results = []
    for m in messages:
        if m.type == "function":
            results.append(to_type(content=m.content))
        else:
            results.append(m)
    return results

def move_function_call(messages: List[schema.BaseMessage]) -> List[schema.BaseMessage]:
    results = []
    for m in messages:
        if m.type == "ai" and m.additional_kwargs:
            fc = m.additional_kwargs['function_call']
            results.append(schema.AIMessage(content=m.content or "" + json.dumps(fc)))
        else:
            results.append(m)
    return results

# TODO: Filter out non-error but unhelpful messages

In [30]:
client = langsmith.Client()
project_name = "chat-langchain-user-agent"
runs = get_ultimate_llm_runs(client, project_name=project_name)

In [35]:
converted_messages = [
    {"messages": convert_function_messages(move_function_call(extract_fine_tuning_message(run)), to_type=schema.HumanMessage)}
    for run in runs
    if 'functions' in run.extra.get('invocation_params', {})
]

In [36]:
from langchain.adapters import openai as openai_adapter

training_data = openai_adapter.convert_messages_for_finetuning(converted_messages)

## Fine-tune

#### Upload file

In [13]:
# import json
# from io import BytesIO
# import time

# import openai

# # We will write the jsonl file in memory
# my_file = BytesIO()
# for m in training_data:
#     my_file.write((json.dumps({"messages": m}) + "\n").encode('utf-8'))

# my_file.seek(0)
# training_file = openai.File.create(
#   file=my_file,
#   purpose='fine-tune'
# )

# # OpenAI audits each training file for compliance reasons.
# # This make take a few minutes
# status = openai.File.retrieve(training_file.id).status
# start_time = time.time()
# while status != "processed":
#     print(f"Status=[{status}]... {time.time() - start_time:.2f}s", end="\r", flush=True)
#     time.sleep(5)
#     status = openai.File.retrieve(training_file.id).status
# print(f"File {training_file.id} ready after {time.time() - start_time:.2f} seconds.")

#### Fine-tune the model

In [17]:
import openai

In [18]:
# job = openai.FineTuningJob.create(
#     training_file=training_file.id,
#     model="gpt-3.5-turbo",
# )

In [19]:
# status = openai.FineTuningJob.retrieve(job.id).status
# start_time = time.time()
# while status != "succeeded":
#     print(f"Status=[{status}]... {time.time() - start_time:.2f}s", end="\r", flush=True)
#     time.sleep(5)
#     job = openai.FineTuningJob.retrieve(job.id)
#     status = job.status

Status=[running]... 1105.64s

In [39]:
# print(job.fine_tuned_model)

## Use in LangChain

In [40]:
from langchain.chat_models import ChatOpenAI

model = ChatOpenAI(
    model="ft:gpt-3.5-turbo-0613:personal::7sgF8sHD", # job.fine_tuned_model,
    temperature=1,
)

In [41]:
def convert_tool_to_ts(my_tool):
    schema = {"name": my_tool.name, "description": my_tool.description, "parameters": chat_langchain.args_schema.schema()}
    return convert_functions_to_typescript([schema])

In [83]:
from langchain.output_parsers import openai_functions

prompt = prompts.ChatPromptTemplate.from_messages(
    [
        ("system", "{function_ts_def}"),
        ("human", "A discord user asked this question:\n\n\"\"\"\n{input}\n\"\"\" What's the verified answer?"),
        prompts.MessagesPlaceholder(variable_name="chat_history"),
    ]
)

tool_schema = convert_tool_to_ts(chat_langchain)

def split_up(message: schema.AIMessage) -> schema.AIMessage:
    if message.content.startswith('{'):
        try:
            return schema.AIMessage(content='', additional_kwargs={"function_call": json.loads(message.content)})
        except:
            pass
    return message

runnable_chain = (
    prompt.partial(function_ts_def=tool_schema) 
    | model 
    | split_up 
    | openai_functions.JsonOutputFunctionsParser(args_only=False).with_fallbacks([schema.output_parser.StrOutputParser()])
)

In [84]:
from typing import List, Tuple, Any, Union
from langchain.schema import AgentAction, AgentFinish
from langchain.agents import AgentExecutor, BaseSingleActionAgent


class MyOAIAgent(BaseSingleActionAgent):
    """Fake Custom Agent."""
    
    runnable_chain: schema.runnable.Runnable
    
    class Config:
        arbitrary_types_allowed = True
        
    @property
    def input_keys(self):
        return ["input"]
    

    def plan(
        self,
        intermediate_steps: List[Tuple[AgentAction, str]],
        callbacks = None,
        **kwargs: Any
    ) -> Union[AgentAction, AgentFinish]:
        """Given input, decided what to do.

        Args:
            intermediate_steps: Steps the LLM has taken to date,
                along with observations
            **kwargs: User inputs.

        Returns:
            Action specifying what tool to use.
        """
        # print(intermediate_steps)
        chat_history = [message for step in intermediate_steps for message in [schema.AIMessage(content=json.dumps(step[0].log)), schema.HumanMessage(content=str(step[1]))]]
        res = self.runnable_chain.invoke({"input": kwargs["input"], "chat_history": chat_history}, {"callbacks": callbacks})
        if isinstance(res, dict) and "name" in res:
            return AgentAction(tool=res["name"], tool_input=res["arguments"], log=res)
        return AgentFinish(return_values={"output": res}, log=res)

    async def aplan(
        self, intermediate_steps: List[Tuple[AgentAction, str]], **kwargs: Any
    ) -> Union[AgentAction, AgentFinish]:
        raise NotImplementedError

In [85]:
agent = MyOAIAgent(runnable_chain=runnable_chain)

agent_executor = AgentExecutor.from_agent_and_tools(
    agent=agent, tools=[chat_langchain], verbose=True
)

In [86]:
agent_executor("How do I use the ConversationRetrieverChain?")



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m{'name': 'chat_langchain', 'arguments': {'message': 'How do I use the ConversationRetrieverChain?'}}[0m[36;1m[1;3m{'conversation_uuid': UUID('a46bcc3c-7047-4ffa-872a-3ef1e97e3c08'), 'response': 'To use the ConversationalRetrievalChain in Langchain, you need to follow these steps:\n\n1. Import the necessary modules:\n```python\nfrom langchain.embeddings.openai import OpenAIEmbeddings\nfrom langchain.vectorstores import Chroma\nfrom langchain.text_splitter import CharacterTextSplitter\nfrom langchain.llms import OpenAI\nfrom langchain.chains import ConversationalRetrievalChain\nfrom langchain.document_loaders import TextLoader\n```\n\n2. Load in your documents. You can replace this with a loader for whatever type of data you want:\n```python\nloader = TextLoader(...)\n```\n\n3. Create a vector store from embeddings:\n```python\nvectorstore = Chroma(...)\n```\n\n4. Create a retriever from the vector store:\n```python\nretriev

{'input': 'How do I use the ConversationRetrieverChain?',
 'output': 'To use the ConversationalRetrievalChain in Langchain, you need to follow these steps:\n\n1. Import the necessary modules:\n```python\nfrom langchain.embeddings.openai import OpenAIEmbeddings\nfrom langchain.vectorstores import Chroma\nfrom langchain.text_splitter import CharacterTextSplitter\nfrom langchain.llms import OpenAI\nfrom langchain.chains import ConversationalRetrievalChain\nfrom langchain.document_loaders import TextLoader\n```\n\n2. Load in your documents. You can replace this with a loader for whatever type of data you want:\n```python\nloader = TextLoader(...)\n```\n\n3. Create a vector store from embeddings:\n```python\nvectorstore = Chroma(...)\n```\n\n4. Create a retriever from the vector store:\n```python\nretriever = vectorstore.as_retriever()\n```\n\n5. Create an instance of the ConversationalRetrievalChain:\n```python\nllm = OpenAI(...)\nchat = ConversationalRetrievalChain.from_llm(llm, retriever