-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
redis.ts
111 lines (99 loc) · 3.31 KB
/
redis.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// TODO: Deprecate in favor of stores/message/ioredis.ts when LLMCache and other implementations are ported
import {
createClient,
RedisClientOptions,
RedisClientType,
RedisModules,
RedisFunctions,
RedisScripts,
} from "redis";
import { BaseMessage, BaseListChatMessageHistory } from "../../schema/index.js";
import {
mapChatMessagesToStoredMessages,
mapStoredMessagesToChatMessages,
} from "./utils.js";
/**
* Type for the input to the `RedisChatMessageHistory` constructor.
*/
export type RedisChatMessageHistoryInput = {
sessionId: string;
sessionTTL?: number;
config?: RedisClientOptions;
// Typing issues with createClient output: https://github.com/redis/node-redis/issues/1865
// eslint-disable-next-line @typescript-eslint/no-explicit-any
client?: any;
};
/**
* Class for storing chat message history using Redis. Extends the
* `BaseListChatMessageHistory` class.
*/
export class RedisChatMessageHistory extends BaseListChatMessageHistory {
lc_namespace = ["langchain", "stores", "message", "redis"];
get lc_secrets() {
return {
"config.url": "REDIS_URL",
"config.username": "REDIS_USERNAME",
"config.password": "REDIS_PASSWORD",
};
}
public client: RedisClientType<RedisModules, RedisFunctions, RedisScripts>;
private sessionId: string;
private sessionTTL?: number;
constructor(fields: RedisChatMessageHistoryInput) {
super(fields);
const { sessionId, sessionTTL, config, client } = fields;
this.client = (client ?? createClient(config ?? {})) as RedisClientType<
RedisModules,
RedisFunctions,
RedisScripts
>;
this.sessionId = sessionId;
this.sessionTTL = sessionTTL;
}
/**
* Ensures the Redis client is ready to perform operations. If the client
* is not ready, it attempts to connect to the Redis database.
* @returns Promise resolving to true when the client is ready.
*/
async ensureReadiness() {
if (!this.client.isReady) {
await this.client.connect();
}
return true;
}
/**
* Retrieves all chat messages from the Redis database for the current
* session.
* @returns Promise resolving to an array of `BaseMessage` instances.
*/
async getMessages(): Promise<BaseMessage[]> {
await this.ensureReadiness();
const rawStoredMessages = await this.client.lRange(this.sessionId, 0, -1);
const orderedMessages = rawStoredMessages
.reverse()
.map((message) => JSON.parse(message));
return mapStoredMessagesToChatMessages(orderedMessages);
}
/**
* Adds a new chat message to the Redis database for the current session.
* @param message The `BaseMessage` instance to add.
* @returns Promise resolving when the message has been added.
*/
async addMessage(message: BaseMessage): Promise<void> {
await this.ensureReadiness();
const messageToAdd = mapChatMessagesToStoredMessages([message]);
await this.client.lPush(this.sessionId, JSON.stringify(messageToAdd[0]));
if (this.sessionTTL) {
await this.client.expire(this.sessionId, this.sessionTTL);
}
}
/**
* Deletes all chat messages from the Redis database for the current
* session.
* @returns Promise resolving when the messages have been deleted.
*/
async clear(): Promise<void> {
await this.ensureReadiness();
await this.client.del(this.sessionId);
}
}