Skip to content
9 changes: 9 additions & 0 deletions redisinsight/api/src/__mocks__/rdi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ import { ApiRdiClient } from 'src/modules/rdi/client/api.rdi.client';
import { RdiEntity } from 'src/modules/rdi/entities/rdi.entity';
import { EncryptionStrategy } from 'src/modules/encryption/models';
import { RdiDryRunJobDto } from 'src/modules/rdi/dto';
import { sign } from 'jsonwebtoken';

export const mockRdiId = 'rdiId';
export const mockRdiPasswordEncrypted = 'password_ENCRYPTED';

export const mockRdiPasswordPlain = 'some pass';

export const mockedRdiAccessToken = sign({ exp: Math.trunc(Date.now() / 1000) + 3600 }, 'test');

export class MockRdiClient extends ApiRdiClient {
constructor(metadata: RdiClientMetadata, client: any = jest.fn()) {
super(metadata, client);
Expand All @@ -31,6 +34,12 @@ export class MockRdiClient extends ApiRdiClient {

public deploy = jest.fn();

public startPipeline = jest.fn();

public stopPipeline = jest.fn();

public resetPipeline = jest.fn();

public deployJob = jest.fn();

public dryRunJob = jest.fn();
Expand Down
3 changes: 3 additions & 0 deletions redisinsight/api/src/constants/custom-error-codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,7 @@ export enum CustomErrorCodes {
RdiValidationError = 11_404,
RdiNotFound = 11_405,
RdiForbidden = 11_406,
RdiResetPipelineFailure = 11_407,
RdiStartPipelineFailure = 11_408,
RdiStopPipelineFailure = 11_409,
}
3 changes: 3 additions & 0 deletions redisinsight/api/src/constants/error-messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ export default {
AI_QUERY_MAX_TOKENS_RATE_LIMIT: 'Token count exceeds the conversation limit',

RDI_DEPLOY_PIPELINE_FAILURE: 'Failed to deploy pipeline',
RDI_RESET_PIPELINE_FAILURE: 'Failed to reset pipeline',
RDI_STOP_PIPELINE_FAILURE: 'Failed to stop pipeline',
RDI_START_PIPELINE_FAILURE: 'Failed to start pipeline',
RDI_TIMEOUT_ERROR: 'Encountered a timeout error while attempting to retrieve data',
RDI_VALIDATION_ERROR: 'Validation error',
INVALID_RDI_INSTANCE_ID: 'Invalid rdi instance id.',
Expand Down
93 changes: 89 additions & 4 deletions redisinsight/api/src/modules/rdi/client/api.rdi.client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
import { sign } from 'jsonwebtoken';
import { ApiRdiClient } from './api.rdi.client';
import { RdiDyRunJobStatus, RdiPipeline, RdiStatisticsStatus } from '../models';
import { RdiUrl, TOKEN_THRESHOLD } from '../constants';
import { PipelineActions, RdiUrl, TOKEN_THRESHOLD } from '../constants';

const mockedAxios = axios as jest.Mocked<typeof axios>;
jest.mock('axios');
Expand Down Expand Up @@ -164,6 +164,90 @@ describe('ApiRdiClient', () => {
});
});

describe('startPipeline', () => {
it('should start the pipeline and poll for status', async () => {
const actionId = '123';
const postResponse = { data: { action_id: actionId } };
const getResponse = {
data: {
status: 'completed',
data: 'some data',
error: '',
},
};

mockedAxios.post.mockResolvedValueOnce(postResponse);
mockedAxios.get.mockResolvedValueOnce(getResponse);

const result = await client.startPipeline();

expect(mockedAxios.post).toHaveBeenCalledWith(RdiUrl.StartPipeline, expect.any(Object));
expect(result).toEqual(getResponse.data.data);
});

it('should throw an error if start pipeline fails', async () => {
mockedAxios.post.mockRejectedValueOnce(mockRdiUnauthorizedError);

await expect(client.startPipeline()).rejects.toThrow(mockRdiUnauthorizedError.message);
});
});

describe('stopPipeline', () => {
it('should stop the pipeline and poll for status', async () => {
const actionId = '123';
const postResponse = { data: { action_id: actionId } };
const getResponse = {
data: {
status: 'completed',
data: 'some data',
error: '',
},
};

mockedAxios.post.mockResolvedValueOnce(postResponse);
mockedAxios.get.mockResolvedValueOnce(getResponse);

const result = await client.stopPipeline();

expect(mockedAxios.post).toHaveBeenCalledWith(RdiUrl.StopPipeline, expect.any(Object));
expect(result).toEqual(getResponse.data.data);
});

it('should throw an error if stop pipeline fails', async () => {
mockedAxios.post.mockRejectedValueOnce(mockRdiUnauthorizedError);

await expect(client.stopPipeline()).rejects.toThrow(mockRdiUnauthorizedError.message);
});
});

describe('resetPipeline', () => {
it('should reset the pipeline and poll for status', async () => {
const actionId = '123';
const postResponse = { data: { action_id: actionId } };
const getResponse = {
data: {
status: 'completed',
data: 'some data',
error: '',
},
};

mockedAxios.post.mockResolvedValueOnce(postResponse);
mockedAxios.get.mockResolvedValueOnce(getResponse);

const result = await client.resetPipeline();

expect(mockedAxios.post).toHaveBeenCalledWith(RdiUrl.ResetPipeline, expect.any(Object));
expect(result).toEqual(getResponse.data.data);
});

it('should throw an error if reset pipeline fails', async () => {
mockedAxios.post.mockRejectedValueOnce(mockRdiUnauthorizedError);

await expect(client.resetPipeline()).rejects.toThrow(mockRdiUnauthorizedError.message);
});
});

describe('dryRunJob', () => {
it('should call the RDI client with the correct URL and data', async () => {
const mockResponse = {
Expand Down Expand Up @@ -332,7 +416,7 @@ describe('ApiRdiClient', () => {
it('should return response data on success', async () => {
mockedAxios.get.mockResolvedValueOnce({ data: { status: 'completed', data: responseData } });

const result = await client['pollActionStatus'](actionId);
const result = await client['pollActionStatus'](actionId, PipelineActions.Deploy);

expect(mockedAxios.get).toHaveBeenCalledWith(`${RdiUrl.Action}/${actionId}`, { signal: undefined });
expect(result).toEqual(responseData);
Expand All @@ -341,13 +425,14 @@ describe('ApiRdiClient', () => {
it('should throw an error if action status is failed', async () => {
mockedAxios.get.mockResolvedValueOnce({ data: { status: 'failed', error: { message: 'Test error' } } });

await expect(client['pollActionStatus'](actionId)).rejects.toThrow('Test error');
await expect(client['pollActionStatus'](actionId, PipelineActions.Deploy)).rejects.toThrow('Test error');
});

it('should throw an error if an error occurs during polling', async () => {
mockedAxios.get.mockRejectedValueOnce(mockRdiUnauthorizedError);

await expect(client['pollActionStatus'](actionId)).rejects.toThrow(mockRdiUnauthorizedError.message);
await expect(client['pollActionStatus'](actionId, PipelineActions.Deploy))
.rejects.toThrow(mockRdiUnauthorizedError.message);
expect(mockedAxios.get).toHaveBeenCalledWith(`${RdiUrl.Action}/${actionId}`, { signal: undefined });
});
});
Expand Down
61 changes: 57 additions & 4 deletions redisinsight/api/src/modules/rdi/client/api.rdi.client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import axios, { AxiosInstance } from 'axios';
import { plainToClass } from 'class-transformer';
import { decode } from 'jsonwebtoken';

import { RdiClient } from 'src/modules/rdi/client/rdi.client';
import {
Expand All @@ -10,6 +9,7 @@ import {
POLLING_INTERVAL,
MAX_POLLING_TIME,
WAIT_BEFORE_POLLING,
PipelineActions,
} from 'src/modules/rdi/constants';
import {
RdiDryRunJobDto,
Expand All @@ -33,6 +33,9 @@ import { convertKeysToCamelCase } from 'src/utils/base.helper';
import { RdiPipelineTimeoutException } from 'src/modules/rdi/exceptions/rdi-pipeline.timeout-error.exception';
import * as https from 'https';
import { convertApiDataToRdiPipeline, convertRdiPipelineToApiPayload } from 'src/modules/rdi/utils/pipeline.util';
import { RdiResetPipelineFailedException } from '../exceptions/rdi-reset-pipeline-failed.exception';
import { RdiStartPipelineFailedException } from '../exceptions/rdi-start-pipeline-failed.exception';
import { RdiStopPipelineFailedException } from '../exceptions/rdi-stop-pipeline-failed.exception';

export class ApiRdiClient extends RdiClient {
protected readonly client: AxiosInstance;
Expand Down Expand Up @@ -113,7 +116,46 @@ export class ApiRdiClient extends RdiClient {

const actionId = response.data.action_id;

return await this.pollActionStatus(actionId);
return await this.pollActionStatus(actionId, PipelineActions.Deploy);
} catch (e) {
throw wrapRdiPipelineError(e);
}
}

async stopPipeline(): Promise<void> {
try {
const response = await this.client.post(
RdiUrl.StopPipeline, {},
);
const actionId = response.data.action_id;

return await this.pollActionStatus(actionId, PipelineActions.Stop);
} catch (e) {
throw wrapRdiPipelineError(e);
}
}

async startPipeline(): Promise<void> {
try {
const response = await this.client.post(
RdiUrl.StartPipeline, {},
);
const actionId = response.data.action_id;

return await this.pollActionStatus(actionId, PipelineActions.Start);
} catch (e) {
throw wrapRdiPipelineError(e);
}
}

async resetPipeline(): Promise<void> {
try {
const response = await this.client.post(
RdiUrl.ResetPipeline, {},
);
const actionId = response.data.action_id;

return await this.pollActionStatus(actionId, PipelineActions.Reset);
} catch (e) {
throw wrapRdiPipelineError(e);
}
Expand Down Expand Up @@ -199,7 +241,7 @@ export class ApiRdiClient extends RdiClient {
}
}

private async pollActionStatus(actionId: string, abortSignal?: AbortSignal): Promise<any> {
private async pollActionStatus(actionId: string, action: PipelineActions, abortSignal?: AbortSignal): Promise<any> {
await new Promise((resolve) => setTimeout(resolve, WAIT_BEFORE_POLLING));

const startTime = Date.now();
Expand All @@ -220,7 +262,18 @@ export class ApiRdiClient extends RdiClient {
const { status, data, error } = response.data;

if (status === 'failed') {
throw new RdiPipelineDeployFailedException(error?.message);
switch (action) {
case PipelineActions.Deploy:
throw new RdiPipelineDeployFailedException(error?.message);
case PipelineActions.Reset:
throw new RdiResetPipelineFailedException(error?.message);
case PipelineActions.Start:
throw new RdiStartPipelineFailedException(error?.message);
case PipelineActions.Stop:
throw new RdiStopPipelineFailedException(error?.message);
default:
throw new RdiPipelineDeployFailedException(error?.message);
}
}

if (status === 'completed') {
Expand Down
6 changes: 6 additions & 0 deletions redisinsight/api/src/modules/rdi/client/rdi.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ export abstract class RdiClient {

abstract deploy(pipeline: RdiPipeline): Promise<void>;

abstract stopPipeline(): Promise<void>;

abstract startPipeline(): Promise<void>;

abstract resetPipeline(): Promise<void>;

abstract dryRunJob(data: RdiDryRunJobDto): Promise<RdiDryRunJobResponseDto>;

abstract testConnections(config: object): Promise<RdiTestConnectionsResponseDto>;
Expand Down
10 changes: 10 additions & 0 deletions redisinsight/api/src/modules/rdi/constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ export enum RdiUrl {
DryRunJob = 'api/v1/pipelines/jobs/dry-run',
JobFunctions = '/api/v1/pipelines/jobs/functions',
Deploy = 'api/v1/pipelines',
StopPipeline = 'api/v1/pipelines/stop',
StartPipeline = 'api/v1/pipelines/start',
ResetPipeline = 'api/v1/pipelines/reset',
TestConnections = 'api/v1/pipelines/targets/dry-run',
GetStatistics = 'api/v1/monitoring/statistics',
GetPipelineStatus = 'api/v1/status',
Expand All @@ -22,3 +25,10 @@ export const RDI_SYNC_INTERVAL = 5 * 60 * 1_000; // 5 min
export const POLLING_INTERVAL = 1_000;
export const MAX_POLLING_TIME = 2 * 60 * 1000; // 2 min
export const WAIT_BEFORE_POLLING = 1_000;

export enum PipelineActions {
Deploy = 'Deploy',
Reset = 'Reset',
Start = 'Start',
Stop = 'Stop',
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ import {
import { RdiPipelineForbiddenException } from './rdi-pipeline.forbidden.exception';

export const parseErrorMessage = (error: AxiosError<any>): string => {
const detail = error.response?.data?.detail;
const data = error.response?.data;
if (typeof data === 'string') {
return data;
}

const detail = data?.detail;
if (!detail) return error.message;

if (typeof detail === 'string') return detail;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import ERROR_MESSAGES from 'src/constants/error-messages';
import { CustomErrorCodes } from 'src/constants';
import { HttpStatus } from '@nestjs/common';
import { RdiResetPipelineFailedException } from './rdi-reset-pipeline-failed.exception';

describe('RdiResetPipelineFailedException', () => {
it('should create an exception with default message and status code', () => {
const exception = new RdiResetPipelineFailedException();
expect(exception.message).toEqual(ERROR_MESSAGES.RDI_RESET_PIPELINE_FAILURE);
expect(exception.getStatus()).toEqual(HttpStatus.BAD_REQUEST);
expect(exception.getResponse()).toEqual({
message: ERROR_MESSAGES.RDI_RESET_PIPELINE_FAILURE,
statusCode: HttpStatus.BAD_REQUEST,
error: 'RdiResetPipelineFailed',
errorCode: CustomErrorCodes.RdiResetPipelineFailure,
errors: [undefined],
});
});

it('should create an exception with custom message and error', () => {
const customMessage = 'Custom error message';
const customError = 'Custom error';
const exception = new RdiResetPipelineFailedException(customMessage, { error: customError });
expect(exception.message).toEqual(customMessage);
expect(exception.getStatus()).toEqual(HttpStatus.BAD_REQUEST);
expect(exception.getResponse()).toEqual({
message: customMessage,
statusCode: HttpStatus.BAD_REQUEST,
error: 'RdiResetPipelineFailed',
errorCode: CustomErrorCodes.RdiResetPipelineFailure,
errors: [customError],
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { HttpException, HttpExceptionOptions, HttpStatus } from '@nestjs/common';
import ERROR_MESSAGES from 'src/constants/error-messages';
import { CustomErrorCodes } from 'src/constants';

export class RdiResetPipelineFailedException extends HttpException {
constructor(
message = ERROR_MESSAGES.RDI_RESET_PIPELINE_FAILURE,
options?: HttpExceptionOptions & { error?: string },
) {
const response = {
message,
statusCode: HttpStatus.BAD_REQUEST,
error: 'RdiResetPipelineFailed',
errorCode: CustomErrorCodes.RdiResetPipelineFailure,
errors: [options?.error],
};

super(response, response.statusCode, options);
}
}
Loading
Loading