Skip to content

Commit

Permalink
feat(apify): support async dataset mapping function (#3213)
Browse files Browse the repository at this point in the history
  • Loading branch information
omikader committed Nov 9, 2023
1 parent 0bf36f7 commit 01a98c4
Showing 1 changed file with 26 additions and 20 deletions.
46 changes: 26 additions & 20 deletions langchain/src/document_loaders/web/apify_dataset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
TaskCallOptions,
} from "apify-client";

import { AsyncCaller, AsyncCallerParams } from "../../util/async_caller.js";
import { BaseDocumentLoader, DocumentLoader } from "../base.js";
import { Document } from "../../document.js";
import { getEnvironmentVariable } from "../../util/env.js";
Expand All @@ -16,7 +17,15 @@ import { getEnvironmentVariable } from "../../util/env.js";
* dataset item) and converts it to an instance of the Document class.
*/
export type ApifyDatasetMappingFunction<Metadata extends Record<string, any>> =
(item: Record<string | number, unknown>) => Document<Metadata>;
(
item: Record<string | number, unknown>
) => Document<Metadata> | Promise<Document<Metadata>>;

export interface ApifyDatasetLoaderConfig<Metadata extends Record<string, any>>
extends AsyncCallerParams {
datasetMappingFunction: ApifyDatasetMappingFunction<Metadata>;
clientOptions?: ApifyClientOptions;
}

/**
* A class that extends the BaseDocumentLoader and implements the
Expand All @@ -31,27 +40,19 @@ export class ApifyDatasetLoader<Metadata extends Record<string, any>>

protected datasetId: string;

protected datasetMappingFunction: (
item: Record<string | number, unknown>
) => Document<Metadata>;
protected datasetMappingFunction: ApifyDatasetMappingFunction<Metadata>;

constructor(
datasetId: string,
config: {
datasetMappingFunction: ApifyDatasetMappingFunction<Metadata>;
clientOptions?: ApifyClientOptions;
}
) {
protected caller: AsyncCaller;

constructor(datasetId: string, config: ApifyDatasetLoaderConfig<Metadata>) {
super();
const apifyApiToken = ApifyDatasetLoader._getApifyApiToken(
config.clientOptions
);
this.apifyClient = new ApifyClient({
...config.clientOptions,
token: apifyApiToken,
});
const { clientOptions, datasetMappingFunction, ...asyncCallerParams } =
config;
const token = ApifyDatasetLoader._getApifyApiToken(clientOptions);
this.apifyClient = new ApifyClient({ ...clientOptions, token });
this.datasetId = datasetId;
this.datasetMappingFunction = config.datasetMappingFunction;
this.datasetMappingFunction = datasetMappingFunction;
this.caller = new AsyncCaller(asyncCallerParams);
}

private static _getApifyApiToken(config?: { token?: string }) {
Expand All @@ -68,7 +69,12 @@ export class ApifyDatasetLoader<Metadata extends Record<string, any>>
const datasetItems = (
await this.apifyClient.dataset(this.datasetId).listItems({ clean: true })
).items;
return datasetItems.map(this.datasetMappingFunction);

return await Promise.all(
datasetItems.map((item) =>
this.caller.call(async () => this.datasetMappingFunction(item))
)
);
}

/**
Expand Down

0 comments on commit 01a98c4

Please sign in to comment.