### Check env


In [1]:
import { load } from "std/dotenv/mod.ts";

const env: Record<string, string> = await load({ envPath: "../.env" });

console.log(`DEBUG=${env.DEBUG}`);


DEBUG=false


### Get Mongo Client


In [2]:
import { MongoClient } from "mongodb";

const MONGO_URI = env.MONGO_URI;

export const client = new MongoClient(MONGO_URI);

export const collection = client.db().collection("checkpoints");

console.log(collection.collectionName);


checkpoints


In [3]:
import { OpenAIEmbeddings } from "@langchain/openai";

const { OPENAI_API_KEY } = env;

// Initialize embeddings model
export const embeddings1024 = new OpenAIEmbeddings({
  model: "text-embedding-3-small",
  dimensions: 1024,
  apiKey: OPENAI_API_KEY,
});


# Create checkpointer


In [30]:
import { MongoClient, Collection } from "mongodb";

import {
  BaseMessage,
  SystemMessage,
  ToolMessage,
} from "@langchain/core/messages";
import { RunnableConfig } from "@langchain/core/runnables";

import {
  BaseCheckpointSaver,
  Checkpoint,
  CheckpointMetadata,
  CheckpointTuple,
} from "npm:@langchain/langgraph@0.0.26";

const { MONGO_URI } = env;

const CustomSerializer = {
  stringify(obj: CheckpointRecord) {
    return obj;
  },

  async parse(data: Checkpoint | string) {
    return typeof data === "string" ? JSON.parse(data) : data;
  },
};

interface CheckpointRecord {
  checkpoint: string;
  metadata: string;
  parent_id?: string;
  thread_id: string;
  checkpoint_id: string;
  embedding: number[];
  history: string;
  timestamp: Date;
}

export class MongoSaver extends BaseCheckpointSaver {
  private client: MongoClient;
  private isSetup: boolean;
  public collection: Collection<CheckpointRecord>;

  constructor(client: MongoClient) {
    super(CustomSerializer);
    this.client = client;
    this.collection = this.client.db().collection("checkpoints");
    this.isSetup = false;
  }

  static fromConnString(connString: string = MONGO_URI || ""): MongoSaver {
    return new MongoSaver(new MongoClient(connString));
  }

  private async setup(): Promise<void> {
    if (this.isSetup) return;

    try {
      await this.collection.findOne();
      this.isSetup = true;
    } catch (error) {
      console.error("Error querying checkpoints collection", error);
      throw error;
    }
  }

  // below 3 methods are necessary for any checkpointer implementation: getTuple, list and put
  async getTuple(config: RunnableConfig): Promise<CheckpointTuple | undefined> {
    await this.setup();
    const { thread_id, checkpoint_id } = config.configurable || {};

    try {
      if (checkpoint_id) {
        const res = await this.collection.findOne({ thread_id, checkpoint_id });

        if (res) {
          return {
            config,
            checkpoint: (await this.serde.parse(res.checkpoint)) as Checkpoint,
            metadata: (await this.serde.parse(
              res.metadata
            )) as CheckpointMetadata,
            parentConfig: res.parent_id
              ? {
                  configurable: {
                    thread_id,
                    checkpoint_id: res.parent_id,
                  },
                }
              : undefined,
          };
        }
      } else {
        const res = await this.collection.findOne(
          { thread_id },
          { sort: { timestamp: -1 } }
        );

        if (res) {
          return {
            config: {
              configurable: {
                thread_id: res.thread_id,
                checkpoint_id: res.checkpoint_id,
              },
            },
            checkpoint: (await this.serde.parse(res.checkpoint)) as Checkpoint,
            metadata: (await this.serde.parse(
              res.metadata
            )) as CheckpointMetadata,
            parentConfig: res.parent_id
              ? {
                  configurable: {
                    thread_id: res.thread_id,
                    checkpoint_id: res.parent_id,
                  },
                }
              : undefined,
          };
        }
      }
    } catch (error) {
      console.error("Error retrieving checkpoint", error);
      throw error;
    }

    return undefined;
  }

  async *list(
    config: RunnableConfig,
    limit?: number,
    before?: RunnableConfig
  ): AsyncGenerator<CheckpointTuple> {
    await this.setup();
    const { thread_id } = config.configurable || {};
    let query: Record<string, unknown> = { thread_id };

    const params: (string | number)[] = [thread_id];
    if (before?.configurable?.checkpoint_id) {
      query = {
        ...query,
        checkpoint_id: { $lt: before.configurable.checkpoint_id },
      };
      params.push(before.configurable.checkpoint_id);
    }
    let options: Record<string, unknown> = { checkpoint_id: -1 };
    if (limit) {
      query.limit = params.length + 1;
      params.push(limit);
    }

    try {
      const res = await this.collection.find(query, options).toArray();
      for (const row of res) {
        yield {
          config: {
            configurable: {
              thread_id: row.thread_id,
              checkpoint_id: row.checkpoint_id,
            },
          },
          checkpoint: (await this.serde.parse(row.checkpoint)) as Checkpoint,
          metadata: (await this.serde.parse(
            row.metadata
          )) as CheckpointMetadata,
          parentConfig: row.parent_id
            ? {
                configurable: {
                  thread_id: row.thread_id,
                  checkpoint_id: row.parent_id,
                },
              }
            : undefined,
        };
      }
    } catch (error) {
      console.error("Error listing checkpoints", error);
      throw error;
    }
  }

  async put(
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata
  ): Promise<RunnableConfig> {
    await this.setup();
    try {
      const messages =
        (checkpoint?.channel_values?.messages as BaseMessage[]) || [];
      const lastMessage = messages?.[messages?.length - 1] || {};

      let text: string;

      if (lastMessage.content instanceof Array) {
        const message: Record<string, unknown> =
          lastMessage.content.find((message) => message.type === "text") || {};
        text = message?.text as string;
      } else {
        text = lastMessage.content;
      }

      if (text) {
        const embeddings = await embeddings1024.embedDocuments([text]);

        const update: CheckpointRecord = {
          thread_id: config?.configurable?.thread_id,
          checkpoint_id: checkpoint.id,
          parent_id: config?.configurable?.checkpoint_id,
          checkpoint: this.serde.stringify(checkpoint),
          metadata: this.serde.stringify(metadata),
          embedding: embeddings[0] || [],
          history: text,
          timestamp: new Date(),
        };

        await this.collection.insertOne(update);
      }
    } catch (error) {
      console.error("Error saving checkpoint", error);
      throw error;
    }

    return {
      configurable: {
        thread_id: config.configurable?.thread_id,
        checkpoint_id: checkpoint.id,
      },
    };
  }

  async closeConnection(): Promise<void> {
    await this.client.close();
  }
}


### Add vector store


In [31]:
import { MongoDBAtlasVectorSearch } from "npm:@langchain/mongodb@^0.0.4";

export const vectorStore = new MongoDBAtlasVectorSearch(embeddings1024, {
  collection: collection,
  indexName: "default",
  textKey: "messages.content",
  embeddingKey: "embedding",
});


### Initialize State


In [32]:
import { HumanMessage } from "@langchain/core/messages";
import { StateGraphArgs } from "@langchain/langgraph";

// Define the state interface
interface AgentState {
  messages: HumanMessage[];
}

// Define the graph state
const graphState: StateGraphArgs<AgentState>["channels"] = {
  messages: {
    value: (x: HumanMessage[], y: HumanMessage[]) => x.concat(y),
    default: () => [],
  },
};


### Define Tools


In [33]:
import { z } from "zod";
import { DynamicStructuredTool } from "@langchain/core/tools";
import { TavilySearchResults } from "@langchain/community/tools/tavily_search";
import { ToolNode } from "@langchain/langgraph/prebuilt";

const { TAVILY_API_KEY } = env;

const searchTool = new TavilySearchResults({
  maxResults: 1,
  apiKey: TAVILY_API_KEY,
});

export const historyTool = new DynamicStructuredTool({
  name: "get_history",
  description: "Use text query to perform vector search against chat history",
  schema: z.object({
    query: z.string(),
  }),
  func: async function ({ query }) {
    const embededQuery = await embeddings1024.embedQuery(query);
    const res = await vectorStore.similaritySearchVectorWithScore(
      embededQuery,
      3
    );
    const history = res
      ?.map(
        (rec: Array<Record<string, any>>) => rec?.[0]?.metadata.history || ""
      )
      .join("; ");
    return history;
  },
});

const tools = [historyTool, searchTool];

const toolNode = new ToolNode(tools);


### Create agent


In [34]:
import { AIMessage } from "@langchain/core/messages";
import { ChatAnthropic } from "@langchain/anthropic";
import { END, START, StateGraph } from "@langchain/langgraph";

const { ANTHROPIC_API_KEY } = env;

const model = new ChatAnthropic({
  model: "claude-3-sonnet-20240229",
  temperature: 0,
  apiKey: ANTHROPIC_API_KEY,
});

const boundModel = model.bindTools(tools);

// Define the function that determines whether to continue or not
function shouldContinue(state: AgentState): "tools" | typeof END {
  const messages = state.messages;
  const lastMessage = messages[messages.length - 1] as AIMessage;

  // If the LLM makes a tool call, then we route to the "tools" node
  if (lastMessage.tool_calls?.length) {
    return "tools";
  }
  // Otherwise, we stop (reply to the user)
  return END;
}

// Define the function that calls the model
async function callModel(state: AgentState) {
  const messages = state.messages;
  const response = await boundModel.invoke(messages);

  // We return a list, because this will get added to the existing list
  return { messages: [response] };
}

// Define a new graph
const workflow = new StateGraph<AgentState>({ channels: graphState })
  .addNode("agent", callModel)
  .addNode("tools", toolNode)

  .addEdge(START, "agent")
  .addConditionalEdges("agent", shouldContinue)
  .addEdge("tools", "agent");


### Run Chat


In [44]:
import { ObjectId } from "mongodb";
import { HumanMessage, SystemMessage } from "@langchain/core/messages";
import { CompiledStateGraph } from "@langchain/langgraph";

const threadId = "conversation-1";
const checkpointer = MongoSaver.fromConnString();

let config = {
  configurable: {
    thread_id: threadId,
    checkpoint_id: new ObjectId().valueOf().toString(),
  },
};

const app: CompiledStateGraph<AgentState> = workflow
  .compile({ checkpointer })
  .withConfig(config);

async function writeUserMessage(userMessage: string) {
  try {
    console.log("\x1b[31m%s\x1b[0m", userMessage);

    const inputs = {
      messages: [
        new SystemMessage(
          `You are helpful assistent.\n Please, check current messages state first!
          If you miss something try get_history tool to get previous chat history\n
          Prompt it for vector searh.`
        ),
        new HumanMessage(userMessage),
      ],
    };

    for await (const event of await app.stream(inputs, {
      ...config,
      streamMode: "values",
    })) {
      const lastMessage = event.messages[event.messages.length - 1];
      // console.log("\x1b[32m%s\x1b[0m", 'DEBUG', lastMessage)
      if (lastMessage.tool_calls?.length === 0) {
        // final answer
        console.log("\x1b[36m%s\x1b[0m", lastMessage.content);
      }
    }
  } catch (e) {
    console.log(e);
  }
}


In [45]:
await writeUserMessage("Hello. My name is vasily");


[31mHello. My name is vasily[0m
[36mNice to meet you Vasily! I'm an AI assistant created by Anthropic. I'm always happy to chat, answer questions, or help out with any tasks you might have. Feel free to ask me about any topics you're interested in or let me know if there's anything specific I can help with.[0m


In [46]:
await writeUserMessage("what was my previous message");


[31mwhat was my previous message[0m
[36mIt seems this conversation just started, so there are no prior messages before you asked "what was my previous message". Please let me know if you have any other questions![0m


In [47]:
await writeUserMessage("what is current weather in ny");


[31mwhat is current weather in ny[0m
[36mAccording to the search results, the current weather in New York City is typical summer weather with high temperatures around 80-90°F (27-32°C) during the day and lows around 75-77°F (24-25°C) at night. The forecast shows mostly sunny and hot conditions which is normal for New York in July.

However, since this is a forecast for July 2024, it does not reflect the actual current weather conditions today. To get the latest real-time weather information, I would need to do a more specific search for "current weather in New York City today" or check an authoritative weather source like the National Weather Service.[0m


In [48]:
await writeUserMessage("do you remember my name");


[31mdo you remember my name[0m
[36mAh I see now from the chat history that your name is Vasily. Thank you for letting me check the context - I'll remember that your name is Vasily going forward.[0m


In [49]:
await writeUserMessage("did i ask about some cities?");


[31mdid i ask about some cities?[0m
[36mBased on the search results, it looks like you did ask about the weather in New York City, specifically in July 2024. The results show weather forecasts and current conditions for New York City around that time period. Let me know if you need any other details about your previous question on cities![0m


In [20]:
await writeUserMessage("what was my previous question?");


[31mwhat was my previous question?[0m
[36mBased on the chat history returned, your previous question was "did i ask about some cities?". Please let me know if you need any clarification or have an additional question![0m


In [27]:
await collection.deleteMany({});


{ acknowledged: [33mtrue[39m, deletedCount: [33m23[39m }