Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions redisinsight/api/src/constants/error-messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,5 @@ export default {
RDI_DEPLOY_PIPELINE_FAILURE: 'Failed to deploy pipeline',
RDI_TIMEOUT_ERROR: 'Encountered a timeout error while attempting to retrieve data',
RDI_VALIDATION_ERROR: 'Validation error',
INVALID_RDI_INSTANCE_ID: 'Invalid rdi instance id.',
};
15 changes: 10 additions & 5 deletions redisinsight/api/src/modules/rdi/client/api.rdi.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import { RdiClient } from 'src/modules/rdi/client/rdi.client';
import {
RdiUrl,
RDI_TIMEOUT,
TOKEN_TRESHOLD,
TOKEN_THRESHOLD,
POLLING_INTERVAL,
MAX_POLLING_TIME,
WAIT_BEFORE_POLLING,
} from 'src/modules/rdi/constants';
import {
RdiDryRunJobDto,
Expand All @@ -17,6 +18,7 @@ import {
RdiTestConnectionsResponseDto,
} from 'src/modules/rdi/dto';
import {
RdiPipelineDeployFailedException,
RdiPipelineInternalServerErrorException,
wrapRdiPipelineError,
} from 'src/modules/rdi/exceptions';
Expand Down Expand Up @@ -96,7 +98,7 @@ export class ApiRdiClient extends RdiClient {
const response = await this.client.post(RdiUrl.Deploy, { ...pipeline });
const actionId = response.data.action_id;

return this.pollActionStatus(actionId);
return await this.pollActionStatus(actionId);
} catch (error) {
throw wrapRdiPipelineError(error, error.response.data.message);
}
Expand Down Expand Up @@ -174,16 +176,19 @@ export class ApiRdiClient extends RdiClient {
async ensureAuth(): Promise<void> {
const expiresIn = this.auth.exp * 1_000 - Date.now();

if (expiresIn < TOKEN_TRESHOLD) {
if (expiresIn < TOKEN_THRESHOLD) {
await this.connect();
}
}

private async pollActionStatus(actionId: string, abortSignal?: AbortSignal): Promise<any> {
await new Promise((resolve) => setTimeout(resolve, WAIT_BEFORE_POLLING));

const startTime = Date.now();

while (true) {
if (abortSignal?.aborted) {
throw new RdiPipelineInternalServerErrorException();
throw new RdiPipelineInternalServerErrorException('Operation is aborted');
}
if (Date.now() - startTime > MAX_POLLING_TIME) {
throw new RdiPipelineTimeoutException();
Expand All @@ -197,7 +202,7 @@ export class ApiRdiClient extends RdiClient {
const { status, data, error } = response.data;

if (status === 'failed') {
throw new RdiPipelineInternalServerErrorException(error?.message);
throw new RdiPipelineDeployFailedException(error?.message);
}

if (status === 'completed') {
Expand Down
4 changes: 2 additions & 2 deletions redisinsight/api/src/modules/rdi/client/rdi.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
import {
RdiDryRunJobDto, RdiDryRunJobResponseDto, RdiTemplateResponseDto, RdiTestConnectionsResponseDto,
} from 'src/modules/rdi/dto';
import { IDLE_TRESHOLD } from 'src/modules/rdi/constants';
import { IDLE_THRESHOLD } from 'src/modules/rdi/constants';

export abstract class RdiClient {
public readonly id: string;
Expand All @@ -20,7 +20,7 @@ export abstract class RdiClient {
}

public isIdle(): boolean {
return Date.now() - this.lastUsed > IDLE_TRESHOLD;
return Date.now() - this.lastUsed > IDLE_THRESHOLD;
}

abstract getSchema(): Promise<object>;
Expand Down
5 changes: 3 additions & 2 deletions redisinsight/api/src/modules/rdi/constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ export enum RdiUrl {
Action = 'api/v1/actions',
}

export const IDLE_TRESHOLD = 10 * 60 * 1000; // 10 min
export const IDLE_THRESHOLD = 10 * 60 * 1000; // 10 min
export const RDI_TIMEOUT = 30_000; // 30 sec
export const TOKEN_TRESHOLD = 2 * 60 * 1000; // 2 min
export const TOKEN_THRESHOLD = 2 * 60 * 1000; // 2 min
export const RDI_SYNC_INTERVAL = 5 * 60 * 1_000; // 5 min
export const POLLING_INTERVAL = 1_000;
export const MAX_POLLING_TIME = 2 * 60 * 1000; // 2 min
export const WAIT_BEFORE_POLLING = 1_000;
1 change: 0 additions & 1 deletion redisinsight/api/src/modules/rdi/models/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
export * from './rdi.client.metadata';
export * from './rdi';
export * from './rdi-job';
export * from './rdi-pipeline';
export * from './rdi-dry-run';
export * from './rdi-statistics';
4 changes: 0 additions & 4 deletions redisinsight/api/src/modules/rdi/models/rdi-job.ts

This file was deleted.

22 changes: 11 additions & 11 deletions redisinsight/api/src/modules/rdi/models/rdi-pipeline.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { RdiJob } from 'src/modules/rdi/models/rdi-job';

export enum RdiDeployStatus {
Success = 'success',
Error = 'error',
}
import {
IsObject, IsOptional,
} from 'class-validator';

export class RdiPipeline {
// todo: defined high-level schema. not sure if we need it at all since we are not going to validate it or we are?

connection: unknown;

jobs: RdiJob[];
@IsOptional()
@IsObject()
// todo add validation
jobs: { [key: string]: object };

@IsOptional()
@IsObject()
config: object;
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import { RdiClient } from 'src/modules/rdi/client/rdi.client';
import { Injectable } from '@nestjs/common';
import { Injectable, Logger, NotFoundException } from '@nestjs/common';
import { RdiClientMetadata } from 'src/modules/rdi/models';
import { RdiClientStorage } from 'src/modules/rdi/providers/rdi.client.storage';
import { RdiClientFactory } from 'src/modules/rdi/providers/rdi.client.factory';
import { RdiRepository } from 'src/modules/rdi/repository/rdi.repository';
import ERROR_MESSAGES from 'src/constants/error-messages';

@Injectable()
export class RdiClientProvider {
private logger: Logger = new Logger('RdiClientProvider');

constructor(
private readonly repository: RdiRepository,
private readonly rdiClientStorage: RdiClientStorage,
Expand All @@ -28,6 +31,10 @@ export class RdiClientProvider {
async create(clientMetadata: RdiClientMetadata): Promise<RdiClient> {
const rdi = await this.repository.get(clientMetadata.id);

if (!rdi) {
this.logger.error(`RDI with ${clientMetadata.id} was not Found`);
throw new NotFoundException(ERROR_MESSAGES.INVALID_RDI_INSTANCE_ID);
}
return this.rdiClientFactory.createClient(clientMetadata, rdi);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Test, TestingModule } from '@nestjs/testing';
import { BadRequestException } from '@nestjs/common';
import { generateMockRdiClient } from 'src/__mocks__';
import { RdiClientStorage } from 'src/modules/rdi/providers/rdi.client.storage';
import { IDLE_TRESHOLD } from 'src/modules/rdi/constants';
import { IDLE_THRESHOLD } from 'src/modules/rdi/constants';
import { SessionMetadata } from 'src/common/models';

const mockClientMetadata1 = {
Expand Down Expand Up @@ -69,7 +69,7 @@ describe('RdiClientStorage', () => {
it('should remove client with exceeded time in idle', async () => {
expect(service['clients'].size).toEqual(4);
const toDelete = service['clients'].get(mockRdiClient1.id);
toDelete['lastUsed'] = Date.now() - IDLE_TRESHOLD - 1;
toDelete['lastUsed'] = Date.now() - IDLE_THRESHOLD - 1;
service['syncClients']();

expect(service['clients'].size).toEqual(3);
Expand Down
20 changes: 12 additions & 8 deletions redisinsight/api/src/modules/rdi/rdi-pipeline.analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@ export class RdiPipelineAnalytics extends TelemetryBaseService {
}

sendRdiPipelineFetched(id: string, pipeline: any) {
this.sendEvent(
TelemetryEvents.RdiPipelineDeploymentSucceeded,
{
id,
jobsNumber: Object.keys(pipeline.jobs).length,
source: 'server',
},
);
try {
this.sendEvent(
TelemetryEvents.RdiPipelineDeploymentSucceeded,
{
id,
jobsNumber: pipeline?.jobs ? Object.keys(pipeline.jobs).length : 0,
source: 'server',
},
);
} catch (e) {
// ignore
}
}

sendRdiPipelineFetchFailed(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class RdiPipelineController {
})
async getPipeline(
@RequestRdiClientMetadata() rdiClientMetadata: RdiClientMetadata,
): Promise<object> {
): Promise<RdiPipeline> {
return this.rdiPipelineService.getPipeline(rdiClientMetadata);
}

Expand Down
7 changes: 4 additions & 3 deletions redisinsight/api/src/modules/rdi/rdi-pipeline.service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { Injectable, Logger } from '@nestjs/common';
import { Request } from 'express';
import { RdiClientMetadata, RdiPipeline } from 'src/modules/rdi/models';
import { RdiClientProvider } from 'src/modules/rdi/providers/rdi.client.provider';
import { RdiDryRunJobDto, RdiTemplateResponseDto, RdiTestConnectionsResponseDto } from 'src/modules/rdi/dto';
import { RdiDryRunJobResponseDto } from 'src/modules/rdi/dto/rdi.dry-run.job.response.dto';
import { RdiPipelineAnalytics } from 'src/modules/rdi/rdi-pipeline.analytics';
import { wrapHttpError } from 'src/common/utils';
import { plainToClass } from 'class-transformer';

@Injectable()
export class RdiPipelineService {
Expand All @@ -23,7 +23,7 @@ export class RdiPipelineService {
return await client.getSchema();
}

async getPipeline(rdiClientMetadata: RdiClientMetadata): Promise<object> {
async getPipeline(rdiClientMetadata: RdiClientMetadata): Promise<RdiPipeline> {
this.logger.log('Getting RDI pipeline');

try {
Expand All @@ -34,7 +34,8 @@ export class RdiPipelineService {
this.analytics.sendRdiPipelineFetched(rdiClientMetadata.id, response);

this.logger.log('Succeed to get RDI pipeline');
return response;

return plainToClass(RdiPipeline, response);
} catch (e) {
this.logger.error('Failed to get RDI pipeline', e);

Expand Down
3 changes: 1 addition & 2 deletions redisinsight/ui/src/pages/rdi/instance/InstancePage.spec.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
} from 'uiSrc/slices/instances/instances'
import { setConnectedInstance } from 'uiSrc/slices/rdi/instances'
import { PageNames, Pages } from 'uiSrc/constants'
import { getPipelineStatus, setPipelineInitialState } from 'uiSrc/slices/rdi/pipeline'
import { setPipelineInitialState } from 'uiSrc/slices/rdi/pipeline'
import { clearExpertChatHistory } from 'uiSrc/slices/panels/aiAssistant'

import InstancePage, { Props } from './InstancePage'
Expand Down Expand Up @@ -100,7 +100,6 @@ describe('InstancePage', () => {
setAppContextConnectedRdiInstanceId('rdiInstanceId'),
resetConnectedDatabaseInstance(),
...resetContextActions,
getPipelineStatus(),
]

expect(store.getActions()).toEqual(expectedActions)
Expand Down
2 changes: 0 additions & 2 deletions redisinsight/ui/src/pages/rdi/instance/InstancePage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import {
} from 'uiSrc/slices/instances/instances'
import {
deployPipelineAction,
getPipelineStatusAction,
rdiPipelineSelector,
setPipelineInitialState,
} from 'uiSrc/slices/rdi/pipeline'
Expand Down Expand Up @@ -67,7 +66,6 @@ const RdiInstancePage = ({ routes = [] }: Props) => {

useEffect(() => {
dispatch(fetchConnectedInstanceAction(rdiInstanceId))
dispatch(getPipelineStatusAction(rdiInstanceId))
// redirect only if there is no exact path
if (pathname === Pages.rdiPipeline(rdiInstanceId)) {
if (lastPage === PageNames.rdiStatistics && contextRdiInstanceId === rdiInstanceId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { cloneDeep } from 'lodash'
import React from 'react'
import reactRouterDom from 'react-router-dom'

import { rdiPipelineStatusSelector } from 'uiSrc/slices/rdi/pipeline'
import { rdiPipelineStatusSelector, getPipelineStatus } from 'uiSrc/slices/rdi/pipeline'
import { getStatistics, rdiStatisticsSelector } from 'uiSrc/slices/rdi/statistics'
import { TelemetryEvent, TelemetryPageView, sendEventTelemetry, sendPageViewTelemetry } from 'uiSrc/telemetry'
import { cleanup, fireEvent, mockedStore, render, screen } from 'uiSrc/utils/test-utils'
Expand Down Expand Up @@ -315,6 +315,7 @@ describe('StatisticsPage', () => {
render(<StatisticsPage />)

const expectedActions = [
getPipelineStatus(),
getStatistics(),
]

Expand Down
3 changes: 2 additions & 1 deletion redisinsight/ui/src/pages/rdi/statistics/StatisticsPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { useDispatch, useSelector } from 'react-redux'
import { useParams } from 'react-router-dom'

import { connectedInstanceSelector } from 'uiSrc/slices/rdi/instances'
import { rdiPipelineStatusSelector } from 'uiSrc/slices/rdi/pipeline'
import { getPipelineStatusAction, rdiPipelineStatusSelector } from 'uiSrc/slices/rdi/pipeline'
import { fetchRdiStatistics, rdiStatisticsSelector } from 'uiSrc/slices/rdi/statistics'
import { TelemetryEvent, TelemetryPageView, sendEventTelemetry, sendPageViewTelemetry } from 'uiSrc/telemetry'
import RdiInstancePageTemplate from 'uiSrc/templates/rdi-instance-page-template'
Expand Down Expand Up @@ -69,6 +69,7 @@ const StatisticsPage = () => {
}

useEffect(() => {
dispatch(getPipelineStatusAction(rdiInstanceId))
dispatch(fetchRdiStatistics(rdiInstanceId))

sendPageViewTelemetry({
Expand Down
13 changes: 8 additions & 5 deletions redisinsight/ui/src/utils/transformers/transformRdiPipeline.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import yaml, { YAMLException } from 'js-yaml'
import { isEmpty } from 'lodash'
import {
IPipeline,
IPipelineJSON,
Expand All @@ -20,11 +21,13 @@ export const yamlToJson = (value: string, onError: (e: string) => void) => {
}

export const pipelineToYaml = (pipeline: IPipelineJSON) => ({
config: yaml.dump(pipeline.config),
jobs: Object.entries(pipeline.jobs)?.map(([key, value]) => ({
name: key,
value: yaml.dump(value)
}))
config: isEmpty(pipeline?.config) ? '' : yaml.dump(pipeline.config),
jobs: pipeline?.jobs
? Object.entries(pipeline.jobs)?.map(([key, value]) => ({
name: key,
value: yaml.dump(value)
}))
: []
})

export const pipelineToJson = ({ config, jobs }: IPipeline, onError: (errors: IYamlFormatError[]) => void) => {
Expand Down