Skip to content

Commit

Permalink
Revert "feat: support streaming with knowledge base (PL-987) (#814)"
Browse files Browse the repository at this point in the history
This reverts commit 4e4216a.
  • Loading branch information
Tyler authored and Tyler committed May 23, 2024
1 parent 24bf0e7 commit 4d418c9
Show file tree
Hide file tree
Showing 12 changed files with 231 additions and 555 deletions.
2 changes: 1 addition & 1 deletion lib/controllers/interact/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class InteractController extends AbstractController {
},
(event) => {
if ('trace' in event) {
session.push({ trace: event.trace }, event.type);
session.push({ type: event.type, trace: event.trace }, event.type);
}
}
);
Expand Down
1 change: 0 additions & 1 deletion lib/services/aiSynthesis/buffer-reduce.exception.ts

This file was deleted.

40 changes: 0 additions & 40 deletions lib/services/aiSynthesis/buffer-reduce.subject.ts

This file was deleted.

172 changes: 34 additions & 138 deletions lib/services/aiSynthesis/index.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,20 @@
import { BaseModels, BaseUtils } from '@voiceflow/base-types';
import _merge from 'lodash/merge';
import { concatMap, filter, from, lastValueFrom, map, Observable, of, reduce } from 'rxjs';

import { AIModelContext } from '@/lib/clients/ai/ai-model.interface';
import { AIResponse, EMPTY_AI_RESPONSE, fetchChat, fetchChatStream } from '@/lib/services/runtime/handlers/utils/ai';
import { AIResponse, EMPTY_AI_RESPONSE, fetchChat } from '@/lib/services/runtime/handlers/utils/ai';
import { getCurrentTime } from '@/lib/services/runtime/handlers/utils/generativeNoMatch';
import {
fetchFaq,
fetchKnowledgeBase,
getKBSettings,
KnowledgeBaseFaqSet,
KnowledgeBaseResponse,
} from '@/lib/services/runtime/handlers/utils/knowledgeBase';

import { SegmentEventType } from '../runtime/types';
import { AbstractManager } from '../utils';
import { BufferedReducerStopException } from './buffer-reduce.exception';
import { BufferedReducerSubject } from './buffer-reduce.subject';
import { KBResponse } from './types';
import {
convertTagsFilterToIDs,
generateAnswerSynthesisPrompt,
generateTagLabelMap,
NOT_FOUND_RESPONSES,
removePromptLeak,
} from './utils';
import { convertTagsFilterToIDs, generateAnswerSynthesisPrompt, generateTagLabelMap, removePromptLeak } from './utils';

class AISynthesis extends AbstractManager {
private readonly DEFAULT_ANSWER_SYNTHESIS_RETRY_DELAY_MS = 4000;
Expand All @@ -35,15 +26,14 @@ class AISynthesis extends AbstractManager {
private readonly DEFAULT_QUESTION_SYNTHESIS_RETRIES = 2;

private filterNotFound(output: string) {
const upperCase = output.toUpperCase();
if (NOT_FOUND_RESPONSES.some((phrase) => upperCase.includes(phrase))) {
const upperCase = output?.toUpperCase();
if (upperCase?.includes('NOT_FOUND') || upperCase?.startsWith("I'M SORRY,") || upperCase?.includes('AS AN AI')) {
return null;
}

return output;
}

answerSynthesisStream({
async answerSynthesis({
question,
instruction,
data,
Expand All @@ -57,7 +47,9 @@ class AISynthesis extends AbstractManager {
variables?: Record<string, any>;
options?: Partial<BaseUtils.ai.AIModelParams>;
context: AIModelContext;
}): Observable<AIResponse> {
}): Promise<AIResponse | null> {
let response: AIResponse = EMPTY_AI_RESPONSE;

const systemWithTime = `${system}\n\n${getCurrentTime()}`.trim();

const options = { model, system: systemWithTime, temperature, maxTokens };
Expand All @@ -69,46 +61,15 @@ class AISynthesis extends AbstractManager {
},
];

return from(
fetchChatStream(
{ ...options, messages },
this.services.mlGateway,
{
retries: this.DEFAULT_ANSWER_SYNTHESIS_RETRIES,
retryDelay: this.DEFAULT_ANSWER_SYNTHESIS_RETRY_DELAY_MS,
context,
},
variables
)
).pipe(filter((completion) => !!completion?.output));
}

async answerSynthesis(params: {
question: string;
instruction?: string;
data: KnowledgeBaseResponse;
variables?: Record<string, any>;
options?: Partial<BaseUtils.ai.AIModelParams>;
context: AIModelContext;
}): Promise<AIResponse | null> {
const response: AIResponse = await lastValueFrom(
from(this.answerSynthesisStream(params)).pipe(
reduce(
(acc, completion) => {
if (!completion) return acc;
if (!acc.output) acc.output = '';

acc.output += completion.output ?? '';
acc.answerTokens += completion.answerTokens;
acc.queryTokens += completion.queryTokens;
acc.tokens += completion.tokens;
acc.model = completion.model;
acc.multiplier = completion.multiplier;
return acc;
},
{ ...EMPTY_AI_RESPONSE }
)
)
response = await fetchChat(
{ ...options, messages },
this.services.mlGateway,
{
retries: this.DEFAULT_ANSWER_SYNTHESIS_RETRIES,
retryDelay: this.DEFAULT_ANSWER_SYNTHESIS_RETRY_DELAY_MS,
context,
},
variables
);

response.output = response.output?.trim() || null;
Expand Down Expand Up @@ -193,7 +154,7 @@ class AISynthesis extends AbstractManager {
}
};

async knowledgeBaseQueryStream({
async knowledgeBaseQuery({
project,
version,
question,
Expand All @@ -212,7 +173,7 @@ class AISynthesis extends AbstractManager {
summarization?: Partial<BaseModels.Project.KnowledgeBaseSettings['summarization']>;
};
tags?: BaseModels.Project.KnowledgeBaseTagsFilter;
}): Promise<Observable<KBResponse>> {
}): Promise<AIResponse & Partial<KnowledgeBaseResponse> & { faqSet?: KnowledgeBaseFaqSet }> {
let tagsFilter: BaseModels.Project.KnowledgeBaseTagsFilter = {};

if (tags) {
Expand All @@ -239,10 +200,10 @@ class AISynthesis extends AbstractManager {
project?.knowledgeBase?.faqSets,
settingsWithoutModel
);
if (faq?.answer) return of({ ...EMPTY_AI_RESPONSE, output: faq.answer, faqSet: faq.faqSet });
if (faq?.answer) return { ...EMPTY_AI_RESPONSE, output: faq.answer, faqSet: faq.faqSet };

const data = await fetchKnowledgeBase(project._id, project.teamID, question, settingsWithoutModel, tagsFilter);
if (!data) return of({ ...EMPTY_AI_RESPONSE, chunks: [] });
if (!data) return { ...EMPTY_AI_RESPONSE, chunks: [] };

// attach metadata to chunks
const api = await this.services.dataAPI.get();
Expand All @@ -259,87 +220,22 @@ class AISynthesis extends AbstractManager {
},
}));

if (!synthesis) return of({ ...EMPTY_AI_RESPONSE, chunks });

const stream$ = new BufferedReducerSubject<KBResponse>(
(resp) => {
const output = (resp.output ?? '').toLocaleUpperCase();
if (!synthesis) return { ...EMPTY_AI_RESPONSE, chunks };

// Stop stream all together if output is a not found phrase
if (this.filterNotFound(output) === null) throw new BufferedReducerStopException();

// Keep buffering if the output appears to match a not found phrase
return NOT_FOUND_RESPONSES.some((phrase) => phrase.startsWith(output));
},
(buffer, value) => ({
chunks: (buffer.chunks || []).concat(value.chunks ?? []),
output: (buffer.output ?? '') + (value.output ?? ''),
answerTokens: buffer.answerTokens + value.answerTokens,
queryTokens: buffer.queryTokens + value.queryTokens,
tokens: buffer.tokens + value.tokens,
model: value.model,
multiplier: value.multiplier,
faqSet: value.faqSet,
})
);
const answer = await this.services.aiSynthesis.answerSynthesis({
question,
instruction,
data,
options: settings?.summarization,
context: { projectID: project._id, workspaceID: project.teamID },
});

from(
this.services.aiSynthesis.answerSynthesisStream({
question,
instruction,
data,
options: settings?.summarization,
context: { projectID: project._id, workspaceID: project.teamID },
})
)
.pipe(map((answer, i) => (i === 0 ? { chunks, ...answer } : answer)))
.subscribe(stream$);

return stream$;
}
if (!answer) return { ...EMPTY_AI_RESPONSE, chunks };

async knowledgeBaseQuery(params: {
project: BaseModels.Project.Model<any, any>;
version?: BaseModels.Version.Model<any> | null;
question: string;
instruction?: string;
synthesis?: boolean;
options?: {
search?: Partial<BaseModels.Project.KnowledgeBaseSettings['search']>;
summarization?: Partial<BaseModels.Project.KnowledgeBaseSettings['summarization']>;
return {
chunks,
...answer,
};
tags?: BaseModels.Project.KnowledgeBaseTagsFilter;
}): Promise<KBResponse> {
const response = await lastValueFrom(
from(this.knowledgeBaseQueryStream(params)).pipe(
concatMap((stream) => stream),
reduce<KBResponse, KBResponse>(
(acc, completion) => {
if (!acc.output) acc.output = '';
if (!acc.chunks) acc.chunks = [];

acc.chunks.push(...(completion.chunks ?? []));
acc.output += completion.output ?? '';
acc.answerTokens += completion.answerTokens;
acc.queryTokens += completion.queryTokens;
acc.tokens += completion.tokens;
acc.model = completion.model;
acc.multiplier = completion.multiplier;
acc.faqSet = completion.faqSet;
return acc;
},
{ ...EMPTY_AI_RESPONSE }
)
)
);

response.output = response.output?.trim() || null;
if (response.output) {
response.output = this.filterNotFound(response.output);
response.output = removePromptLeak(response.output);
}

return response;
}
}

Expand Down
4 changes: 0 additions & 4 deletions lib/services/aiSynthesis/types.ts

This file was deleted.

2 changes: 0 additions & 2 deletions lib/services/aiSynthesis/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,3 @@ export const removePromptLeak = (output: string | null) => {

return output?.replace(regex_prompt_leak, '') || null;
};

export const NOT_FOUND_RESPONSES = ['NOT_FOUND', 'NOTFOUND', "I'M SORRY,", 'AS AN AI'];

0 comments on commit 4d418c9

Please sign in to comment.