Skip to content

Commit

Permalink
Feat/upstash redis storage (#2963)
Browse files Browse the repository at this point in the history
* feat: upstash redis storage BaseStore

* nit: rename file

* nit: rename class & input interface

* nit: remove unused input

* feat: tests

* fix: yield & tests yield

* chore: lint files

* nit

* chore: added entrypoint

* nit: tests

* fix: added to requiresOptionalDependency
  • Loading branch information
bracesproul committed Oct 20, 2023
1 parent d8da36a commit e87ee52
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 2 deletions.
5 changes: 3 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
"./langchain",
"./examples",
"./docs",
"./test-exports-vercel",
"./test-exports-vercel"
],
"yaml.schemas": {
"https://json.schemastore.org/github-workflow.json": "./.github/workflows/deploy.yml"
},
"typescript.tsdk": "node_modules/typescript/lib"
"typescript.tsdk": "node_modules/typescript/lib",
"cSpell.words": ["Upstash"]
}
3 changes: 3 additions & 0 deletions langchain/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,9 @@ storage/ioredis.d.ts
storage/vercel_kv.cjs
storage/vercel_kv.js
storage/vercel_kv.d.ts
storage/upstash_redis.cjs
storage/upstash_redis.js
storage/upstash_redis.d.ts
graphs/neo4j_graph.cjs
graphs/neo4j_graph.js
graphs/neo4j_graph.d.ts
Expand Down
8 changes: 8 additions & 0 deletions langchain/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,9 @@
"storage/vercel_kv.cjs",
"storage/vercel_kv.js",
"storage/vercel_kv.d.ts",
"storage/upstash_redis.cjs",
"storage/upstash_redis.js",
"storage/upstash_redis.d.ts",
"graphs/neo4j_graph.cjs",
"graphs/neo4j_graph.js",
"graphs/neo4j_graph.d.ts",
Expand Down Expand Up @@ -2389,6 +2392,11 @@
"import": "./storage/vercel_kv.js",
"require": "./storage/vercel_kv.cjs"
},
"./storage/upstash_redis": {
"types": "./storage/upstash_redis.d.ts",
"import": "./storage/upstash_redis.js",
"require": "./storage/upstash_redis.cjs"
},
"./graphs/neo4j_graph": {
"types": "./graphs/neo4j_graph.d.ts",
"import": "./graphs/neo4j_graph.js",
Expand Down
3 changes: 3 additions & 0 deletions langchain/scripts/create-entrypoints.js
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ const entrypoints = {
"storage/in_memory": "storage/in_memory",
"storage/ioredis": "storage/ioredis",
"storage/vercel_kv": "storage/vercel_kv",
"storage/upstash_redis": "storage/upstash_redis",
// graphs
"graphs/neo4j_graph": "graphs/neo4j_graph",
// hub
hub: "hub",
Expand Down Expand Up @@ -439,6 +441,7 @@ const requiresOptionalDependency = [
"stores/message/xata",
"storage/ioredis",
"storage/vercel_kv",
"storage/upstash_redis",
"graphs/neo4j_graph",
// Prevent export due to circular dependency with "load" entrypoint
"hub",
Expand Down
1 change: 1 addition & 0 deletions langchain/src/load/import_constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ export const optionalImportEntrypoints = [
"langchain/stores/message/xata",
"langchain/storage/ioredis",
"langchain/storage/vercel_kv",
"langchain/storage/upstash_redis",
"langchain/graphs/neo4j_graph",
"langchain/hub",
"langchain/experimental/multimodal_embeddings/googlevertexai",
Expand Down
3 changes: 3 additions & 0 deletions langchain/src/load/import_type.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,9 @@ export interface OptionalImportMap {
"langchain/storage/vercel_kv"?:
| typeof import("../storage/vercel_kv.js")
| Promise<typeof import("../storage/vercel_kv.js")>;
"langchain/storage/upstash_redis"?:
| typeof import("../storage/upstash_redis.js")
| Promise<typeof import("../storage/upstash_redis.js")>;
"langchain/graphs/neo4j_graph"?:
| typeof import("../graphs/neo4j_graph.js")
| Promise<typeof import("../graphs/neo4j_graph.js")>;
Expand Down
87 changes: 87 additions & 0 deletions langchain/src/storage/tests/upstash_redis.int.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/* eslint-disable no-process-env */

import { test } from "@jest/globals";
import { Redis as UpstashRedis } from "@upstash/redis";
import { UpstashRedisStore } from "../upstash_redis.js";

const getClient = () => {
if (
!process.env.UPSTASH_REDIS_REST_URL ||
!process.env.UPSTASH_REDIS_REST_TOKEN
) {
throw new Error("Missing Upstash Redis env variables.");
}

const config = {
url: process.env.UPSTASH_REDIS_REST_URL,
token: process.env.UPSTASH_REDIS_REST_TOKEN,
};

return new UpstashRedis(config);
};

describe("UpstashRedisStore", () => {
const keys = ["key1", "key2"];
const client = getClient();

afterEach(async () => {
await client.del(...keys);
});

test("UpstashRedis can write & read values", async () => {
const encoder = new TextEncoder();
const decoder = new TextDecoder();
const store = new UpstashRedisStore({
client,
});
const value1 = new Date().toISOString();
const value2 = new Date().toISOString() + new Date().toISOString();
await store.mset([
[keys[0], encoder.encode(value1)],
[keys[1], encoder.encode(value2)],
]);
const retrievedValues = await store.mget([keys[0], keys[1]]);
const everyValueDefined = retrievedValues.every((v) => v !== undefined);
expect(everyValueDefined).toBe(true);
expect(retrievedValues.map((v) => decoder.decode(v))).toEqual([
value1,
value2,
]);
});

test("UpstashRedis can delete values", async () => {
const encoder = new TextEncoder();
const store = new UpstashRedisStore({
client,
});
const value1 = new Date().toISOString();
const value2 = new Date().toISOString() + new Date().toISOString();
await store.mset([
[keys[0], encoder.encode(value1)],
[keys[1], encoder.encode(value2)],
]);
await store.mdelete(keys);
const retrievedValues = await store.mget([keys[0], keys[1]]);
const everyValueUndefined = retrievedValues.every((v) => v === undefined);
expect(everyValueUndefined).toBe(true);
});

test("UpstashRedis can yield keys with prefix", async () => {
const prefix = "prefix_";
const keysWithPrefix = keys.map((key) => `${prefix}${key}`);
const encoder = new TextEncoder();
const store = new UpstashRedisStore({
client,
});
const value = new Date().toISOString();
await store.mset(keysWithPrefix.map((key) => [key, encoder.encode(value)]));
const yieldedKeys = [];
for await (const key of store.yieldKeys(prefix)) {
yieldedKeys.push(key);
}
console.log("Yielded keys:", yieldedKeys);
expect(yieldedKeys.sort()).toEqual(keysWithPrefix.sort());
// afterEach won't automatically delete these since we're applying a prefix.
await store.mdelete(keysWithPrefix);
});
});
157 changes: 157 additions & 0 deletions langchain/src/storage/upstash_redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import { Redis as UpstashRedis, type RedisConfigNodejs } from "@upstash/redis";

import { BaseStore } from "../schema/storage.js";

/**
* Type definition for the input parameters required to initialize an
* instance of the UpstashStoreInput class.
*/
export interface UpstashRedisStoreInput {
sessionTTL?: number;
config?: RedisConfigNodejs;
client?: UpstashRedis;
/**
* The amount of keys to retrieve per batch when yielding keys.
* @default 1000
*/
yieldKeysScanBatchSize?: number;
/**
* The namespace to use for the keys in the database.
*/
namespace?: string;
}

/**
* Class that extends the BaseStore class to interact with an Upstash Redis
* database. It provides methods for getting, setting, and deleting data,
* as well as yielding keys from the database.
*/
export class UpstashRedisStore extends BaseStore<string, Uint8Array> {
lc_namespace = ["langchain", "storage"];

protected client: UpstashRedis;

protected namespace?: string;

protected yieldKeysScanBatchSize = 1000;

private sessionTTL?: number;

constructor(fields: UpstashRedisStoreInput) {
super(fields);
if (fields.client) {
this.client = fields.client;
} else if (fields.config) {
this.client = new UpstashRedis(fields.config);
} else {
throw new Error(
`Upstash Redis store requires either a config object or a pre-configured client.`
);
}
this.sessionTTL = fields.sessionTTL;
this.yieldKeysScanBatchSize =
fields.yieldKeysScanBatchSize ?? this.yieldKeysScanBatchSize;
this.namespace = fields.namespace;
}

_getPrefixedKey(key: string) {
if (this.namespace) {
const delimiter = "/";
return `${this.namespace}${delimiter}${key}`;
}
return key;
}

_getDeprefixedKey(key: string) {
if (this.namespace) {
const delimiter = "/";
return key.slice(this.namespace.length + delimiter.length);
}
return key;
}

/**
* Gets multiple keys from the Upstash Redis database.
* @param keys Array of keys to be retrieved.
* @returns An array of retrieved values.
*/
async mget(keys: string[]) {
const encoder = new TextEncoder();

const prefixedKeys = keys.map(this._getPrefixedKey.bind(this));
const retrievedValues = await this.client.mget<Uint8Array[]>(
...prefixedKeys
);
return retrievedValues.map((value) => {
if (!value) {
return undefined;
} else if (typeof value === "object") {
return encoder.encode(JSON.stringify(value));
} else {
return encoder.encode(value);
}
});
}

/**
* Sets multiple keys in the Upstash Redis database.
* @param keyValuePairs Array of key-value pairs to be set.
* @returns Promise that resolves when all keys have been set.
*/
async mset(keyValuePairs: [string, Uint8Array][]): Promise<void> {
const decoder = new TextDecoder();
const encodedKeyValuePairs = keyValuePairs.map(([key, value]) => [
this._getPrefixedKey(key),
decoder.decode(value),
]);
const pipeline = this.client.pipeline();
for (const [key, value] of encodedKeyValuePairs) {
if (this.sessionTTL) {
pipeline.setex(key, this.sessionTTL, value);
} else {
pipeline.set(key, value);
}
}
await pipeline.exec();
}

/**
* Deletes multiple keys from the Upstash Redis database.
* @param keys Array of keys to be deleted.
* @returns Promise that resolves when all keys have been deleted.
*/
async mdelete(keys: string[]): Promise<void> {
await this.client.del(...keys.map(this._getPrefixedKey.bind(this)));
}

/**
* Yields keys from the Upstash Redis database.
* @param prefix Optional prefix to filter the keys. A wildcard (*) is always appended to the end.
* @returns An AsyncGenerator that yields keys from the Upstash Redis database.
*/
async *yieldKeys(prefix?: string): AsyncGenerator<string> {
let pattern;
if (prefix) {
const wildcardPrefix = prefix.endsWith("*") ? prefix : `${prefix}*`;
pattern = `${this._getPrefixedKey(wildcardPrefix)}*`;
} else {
pattern = this._getPrefixedKey("*");
}
let [cursor, batch] = await this.client.scan(0, {
match: pattern,
count: this.yieldKeysScanBatchSize,
});
for (const key of batch) {
yield this._getDeprefixedKey(key);
}
while (cursor !== 0) {
[cursor, batch] = await this.client.scan(cursor, {
match: pattern,
count: this.yieldKeysScanBatchSize,
});
for (const key of batch) {
yield this._getDeprefixedKey(key);
}
}
}
}
1 change: 1 addition & 0 deletions langchain/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@
"src/storage/in_memory.ts",
"src/storage/ioredis.ts",
"src/storage/vercel_kv.ts",
"src/storage/upstash_redis.ts",
"src/graphs/neo4j_graph.ts",
"src/hub.ts",
"src/util/math.ts",
Expand Down

0 comments on commit e87ee52

Please sign in to comment.