Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/global/common/middle/tracks/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ export enum TrackEnum {
useAppTemplate = 'useAppTemplate',
createDataset = 'createDataset',
appNodes = 'appNodes',
runSystemTool = 'runSystemTool'
runSystemTool = 'runSystemTool',
datasetSearch = 'datasetSearch'
}
79 changes: 79 additions & 0 deletions packages/service/common/middle/tracks/processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { delay } from '@fastgpt/global/common/system/utils';
import { addLog } from '../../system/log';
import { TrackModel } from './schema';
import { TrackEnum } from '@fastgpt/global/common/middle/tracks/constants';

const batchUpdateTime = Number(process.env.TRACK_BATCH_UPDATE_TIME || 10000);

const getCurrentTenMinuteBoundary = () => {
const now = new Date();
const minutes = now.getMinutes();
const tenMinuteBoundary = Math.floor(minutes / 10) * 10;

const boundary = new Date(now);
boundary.setMinutes(tenMinuteBoundary, 0, 0);
return boundary;
};

export const trackTimerProcess = async () => {
while (true) {
await countTrackTimer();
await delay(batchUpdateTime);
}
};

export const countTrackTimer = async () => {
if (!global.countTrackQueue || global.countTrackQueue.size === 0) {
return;
}

const queuedItems = Array.from(global.countTrackQueue.values());
global.countTrackQueue = new Map();

try {
const currentBoundary = getCurrentTenMinuteBoundary();

const bulkOps = queuedItems
.map(({ event, count, data }) => {
if (event === TrackEnum.datasetSearch) {
const { teamId, datasetId } = data;

return [
{
updateOne: {
filter: {
event,
teamId,
createTime: currentBoundary,
'data.datasetId': datasetId
},
update: [
{
$set: {
event,
teamId,
createTime: { $ifNull: ['$createTime', currentBoundary] },
data: {
datasetId,
count: { $add: [{ $ifNull: ['$data.count', 0] }, count] }
}
}
}
],
upsert: true
}
}
];
}
return [];
})
.flat();

if (bulkOps.length > 0) {
await TrackModel.bulkWrite(bulkOps);
addLog.info('Track timer processing success');
}
} catch (error) {
addLog.error('Track timer processing error', error);
}
};
9 changes: 9 additions & 0 deletions packages/service/common/middle/tracks/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ const TrackSchema = new Schema({

try {
TrackSchema.index({ event: 1 });

TrackSchema.index(
{ event: 1, teamId: 1, 'data.datasetId': 1, createTime: -1 },
{
partialFilterExpression: {
'data.datasetId': { $exists: true }
}
}
);
} catch (error) {
console.log(error);
}
Expand Down
17 changes: 17 additions & 0 deletions packages/service/common/middle/tracks/type.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export type TracksQueueType = {
event: TrackEnum;
data: Record<string, any>;
};

declare global {
var countTrackQueue:
| Map<
string,
{
event: TrackEnum;
count: number;
data: Record<string, any>;
}
>
| undefined;
}
48 changes: 48 additions & 0 deletions packages/service/common/middle/tracks/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,42 @@ const createTrack = ({ event, data }: { event: TrackEnum; data: Record<string, a
data: props
});
};

// Run times
const pushCountTrack = ({
event,
key,
data
}: {
event: TrackEnum;
key: string;
data: Record<string, any>;
}) => {
if (!global.feConfigs?.isPlus) return;
addLog.debug('Push tracks', {
event,
key
});

if (!global.countTrackQueue) {
global.countTrackQueue = new Map();
}

const value = global.countTrackQueue.get(key);
if (value) {
global.countTrackQueue.set(key, {
...value,
count: value.count + 1
});
} else {
global.countTrackQueue.set(key, {
event,
data,
count: 1
});
}
};

export const pushTrack = {
login: (data: PushTrackCommonType & { type: `${OAuthEnum}` | 'password' }) => {
return createTrack({
Expand Down Expand Up @@ -73,5 +109,17 @@ export const pushTrack = {
event: TrackEnum.runSystemTool,
data
});
},
datasetSearch: (data: { teamId: string; datasetIds: string[] }) => {
data.datasetIds.forEach((datasetId) => {
pushCountTrack({
event: TrackEnum.datasetSearch,
key: `${TrackEnum.datasetSearch}_${datasetId}`,
data: {
teamId: data.teamId,
datasetId
}
});
});
}
};
10 changes: 10 additions & 0 deletions packages/service/core/dataset/search/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ import type { NodeInputKeyEnum } from '@fastgpt/global/core/workflow/constants';
import { datasetSearchQueryExtension } from './utils';
import type { RerankModelItemType } from '@fastgpt/global/core/ai/model.d';
import { formatDatasetDataValue } from '../data/controller';
import { pushTrack } from '../../../common/middle/tracks/utils';
import { isProVersion } from '../../../common/system/constants';

export type SearchDatasetDataProps = {
histories: ChatItemType[];
teamId: string;
uid?: string;
tmbId?: string;
model: string;
datasetIds: string[];
reRankQuery: string;
Expand Down Expand Up @@ -165,6 +169,8 @@ export async function searchDatasetData(
): Promise<SearchDatasetDataResponse> {
let {
teamId,
uid,
tmbId,
reRankQuery,
queries,
model,
Expand Down Expand Up @@ -900,6 +906,10 @@ export async function searchDatasetData(
// token filter
const filterMaxTokensResult = await filterDatasetDataByMaxTokens(scoreFilter, maxTokens);

if (teamId && datasetIds.length > 0 && isProVersion()) {
pushTrack.datasetSearch({ datasetIds, teamId });
}

return {
searchRes: filterMaxTokensResult,
embeddingTokens,
Expand Down
3 changes: 3 additions & 0 deletions packages/service/core/workflow/dispatch/dataset/search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export async function dispatchDatasetSearch(
const {
runningAppInfo: { teamId },
runningUserInfo: { tmbId },
uid,
histories,
node,
params: {
Expand Down Expand Up @@ -127,6 +128,8 @@ export async function dispatchDatasetSearch(
const searchData = {
histories,
teamId,
uid,
tmbId,
reRankQuery: userChatInput,
queries: [userChatInput],
model: vectorModel.model,
Expand Down
7 changes: 5 additions & 2 deletions projects/app/src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ export async function register() {
{ preLoadWorker },
{ loadSystemModels },
{ connectSignoz },
{ getSystemTools }
{ getSystemTools },
{ trackTimerProcess }
] = await Promise.all([
import('@fastgpt/service/common/mongo/init'),
import('@fastgpt/service/common/mongo/index'),
Expand All @@ -34,7 +35,8 @@ export async function register() {
import('@fastgpt/service/worker/preload'),
import('@fastgpt/service/core/ai/config/utils'),
import('@fastgpt/service/common/otel/trace/register'),
import('@fastgpt/service/core/app/plugin/controller')
import('@fastgpt/service/core/app/plugin/controller'),
import('@fastgpt/service/common/middle/tracks/processor')
]);

// connect to signoz
Expand All @@ -61,6 +63,7 @@ export async function register() {
startMongoWatch();
startCron();
startTrainingQueue(true);
trackTimerProcess();

console.log('Init system success');
}
Expand Down
Empty file.
4 changes: 3 additions & 1 deletion projects/app/src/pages/api/core/dataset/searchTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async function handler(req: ApiRequestProps<SearchTestProps>): Promise<SearchTes
const start = Date.now();

// auth dataset role
const { dataset, teamId, tmbId, apikey } = await authDataset({
const { dataset, teamId, tmbId, userId, apikey } = await authDataset({
req,
authToken: true,
authApiKey: true,
Expand All @@ -62,6 +62,8 @@ async function handler(req: ApiRequestProps<SearchTestProps>): Promise<SearchTes
const searchData = {
histories: [],
teamId,
uid: userId,
tmbId,
reRankQuery: text,
queries: [text],
model: dataset.vectorModel,
Expand Down
Loading