Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,20 @@ module.exports = {
rootDir: 'src',
testRegex: '.*\\.spec\\.ts$',
transform: {
'^.+\\.(t|j)s$': 'ts-jest',
'^.+\\.(t|j)s$': [
'ts-jest',
{
tsconfig: {
types: ['node', 'jest'],
skipLibCheck: true,
strict: false,
strictNullChecks: false,
noImplicitAny: false,
emitDecoratorMetadata: true,
experimentalDecorators: true,
},
},
],
},

// ─── Coverage ──────────────────────────────────────────────────────────────
Expand Down
2 changes: 2 additions & 0 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { SearchModule } from './search/search.module';
import { DebuggingModule } from './debugging/debugging.module';
import { DataPipelineModule } from './data-pipeline/data-pipeline.module';

@Module({
imports: [
SearchModule,
DebuggingModule,
DataPipelineModule,
],
controllers: [AppController],
providers: [],
Expand Down
74 changes: 74 additions & 0 deletions src/data-pipeline/bi-integration.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { Test, TestingModule } from '@nestjs/testing';
import { BiIntegrationService } from './bi-integration.service';
import { DataWarehouseService } from './data-warehouse.service';
import { EtlRecord } from './etl.service';

const makeRecord = (source: string, id: string): EtlRecord => ({
id,
source,
payload: { v: 1 },
timestamp: new Date(),
});

describe('BiIntegrationService', () => {
let service: BiIntegrationService;
let warehouse: DataWarehouseService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [BiIntegrationService, DataWarehouseService],
}).compile();
service = module.get(BiIntegrationService);
warehouse = module.get(DataWarehouseService);
});

describe('generateReport', () => {
it('returns report with zero records when empty', () => {
const report = service.generateReport();
expect(report.totalRecords).toBe(0);
expect(report.entries).toHaveLength(0);
expect(report.generatedAt).toBeDefined();
});

it('includes ingested records', () => {
warehouse.ingest(makeRecord('events', 'r1'));
warehouse.ingest(makeRecord('events', 'r2'));
const report = service.generateReport();
expect(report.totalRecords).toBe(2);
expect(report.bySource['events']).toBe(2);
});

it('filters by source', () => {
warehouse.ingest(makeRecord('events', 'r1'));
warehouse.ingest(makeRecord('metrics', 'r2'));
const report = service.generateReport({ source: 'events' });
expect(report.entries).toHaveLength(1);
});
});

describe('export', () => {
beforeEach(() => {
warehouse.ingest(makeRecord('events', 'r1'));
});

it('exports JSON by default', () => {
const result = service.export();
expect(result.format).toBe('json');
expect(() => JSON.parse(result.data)).not.toThrow();
});

it('exports CSV format', () => {
const result = service.export({}, 'csv');
expect(result.format).toBe('csv');
expect(result.data).toContain(',');
});

it('exports empty CSV with default headers when no entries', () => {
const emptyWarehouse = new DataWarehouseService();
const emptyBi = new BiIntegrationService(emptyWarehouse);
const result = emptyBi.export({}, 'csv');
expect(result.format).toBe('csv');
expect(result.data).toContain('id,source,storedAt');
});
});
});
49 changes: 49 additions & 0 deletions src/data-pipeline/bi-integration.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { Injectable, Logger } from '@nestjs/common';
import { DataWarehouseService, WarehouseQueryOptions } from './data-warehouse.service';

export interface BiReport {
generatedAt: string;
totalRecords: number;
bySource: Record<string, number>;
entries: unknown[];
}

export interface BiExportFormat {
format: 'json' | 'csv';
data: string;
}

@Injectable()
export class BiIntegrationService {
private readonly logger = new Logger(BiIntegrationService.name);

constructor(private readonly warehouse: DataWarehouseService) {}

generateReport(options: WarehouseQueryOptions = {}): BiReport {
const entries = this.warehouse.query(options);
const bySource = this.warehouse.aggregate(options.source);

const report: BiReport = {
generatedAt: new Date().toISOString(),
totalRecords: this.warehouse.count(),
bySource,
entries,
};

this.logger.log(`BI report generated: total=${report.totalRecords}`);
return report;
}

export(options: WarehouseQueryOptions = {}, format: 'json' | 'csv' = 'json'): BiExportFormat {
const entries = this.warehouse.query(options);

if (format === 'csv') {
const headers =
entries.length > 0 ? Object.keys(entries[0] as object).join(',') : 'id,source,storedAt';
const rows = entries.map((e) => Object.values(e as object).join(','));
return { format: 'csv', data: [headers, ...rows].join('\n') };
}

return { format: 'json', data: JSON.stringify(entries, null, 2) };
}
}
58 changes: 58 additions & 0 deletions src/data-pipeline/dashboard.gateway.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
MessageBody,
OnGatewayConnection,
OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { EtlRecord, EtlResult } from './etl.service';
import { DataWarehouseService } from './data-warehouse.service';

@WebSocketGateway({ namespace: '/dashboard', cors: { origin: '*' } })
export class DashboardGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server: Server;

private readonly logger = new Logger(DashboardGateway.name);

constructor(private readonly warehouse: DataWarehouseService) {}

handleConnection(client: Socket): void {
this.logger.log(`Dashboard client connected: ${client.id}`);
// Send current snapshot on connect
client.emit('snapshot', {
totalRecords: this.warehouse.count(),
bySource: this.warehouse.aggregate(),
});
}

handleDisconnect(client: Socket): void {
this.logger.log(`Dashboard client disconnected: ${client.id}`);
}

@SubscribeMessage('getSnapshot')
handleGetSnapshot(@MessageBody() _data: unknown) {
return {
totalRecords: this.warehouse.count(),
bySource: this.warehouse.aggregate(),
};
}

@OnEvent('etl.record.loaded')
broadcastRecord(record: EtlRecord): void {
this.server?.emit('record', {
id: record.id,
source: record.source,
timestamp: record.timestamp,
});
}

@OnEvent('etl.run.complete')
broadcastRunComplete(payload: { source: string; result: EtlResult }): void {
this.server?.emit('etlComplete', payload);
}
}
49 changes: 49 additions & 0 deletions src/data-pipeline/data-pipeline.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { Body, Controller, Get, Post, Query } from '@nestjs/common';
import { EtlService, EtlResult } from './etl.service';
import { DataWarehouseService, WarehouseEntry } from './data-warehouse.service';
import { BiIntegrationService, BiReport, BiExportFormat } from './bi-integration.service';

class RunEtlDto {
source: string;
data: Record<string, unknown>[];
}

@Controller('data-pipeline')
export class DataPipelineController {
constructor(
private readonly etl: EtlService,
private readonly warehouse: DataWarehouseService,
private readonly bi: BiIntegrationService,
) {}

@Post('etl/run')
async runEtl(@Body() body: RunEtlDto): Promise<EtlResult> {
return this.etl.run(body.source, body.data);
}

@Get('warehouse')
queryWarehouse(
@Query('source') source?: string,
@Query('limit') limit?: string,
): WarehouseEntry[] {
return this.warehouse.query({ source, limit: limit ? parseInt(limit, 10) : 100 });
}

@Get('warehouse/aggregate')
aggregate(@Query('source') source?: string): Record<string, number> {
return this.warehouse.aggregate(source);
}

@Get('bi/report')
getReport(@Query('source') source?: string): BiReport {
return this.bi.generateReport({ source });
}

@Get('bi/export')
exportData(
@Query('source') source?: string,
@Query('format') format?: 'json' | 'csv',
): BiExportFormat {
return this.bi.export({ source }, format ?? 'json');
}
}
15 changes: 15 additions & 0 deletions src/data-pipeline/data-pipeline.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { Module } from '@nestjs/common';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { EtlService } from './etl.service';
import { DataWarehouseService } from './data-warehouse.service';
import { BiIntegrationService } from './bi-integration.service';
import { DashboardGateway } from './dashboard.gateway';
import { DataPipelineController } from './data-pipeline.controller';

@Module({
imports: [EventEmitterModule.forRoot()],
controllers: [DataPipelineController],
providers: [EtlService, DataWarehouseService, BiIntegrationService, DashboardGateway],
exports: [EtlService, DataWarehouseService, BiIntegrationService],
})
export class DataPipelineModule {}
88 changes: 88 additions & 0 deletions src/data-pipeline/data-warehouse.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { Test, TestingModule } from '@nestjs/testing';
import { DataWarehouseService } from './data-warehouse.service';
import { EtlRecord } from './etl.service';

const makeRecord = (source: string, id = '1'): EtlRecord => ({
id,
source,
payload: { value: 42 },
timestamp: new Date(),
});

describe('DataWarehouseService', () => {
let service: DataWarehouseService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [DataWarehouseService],
}).compile();
service = module.get(DataWarehouseService);
});

it('starts empty', () => {
expect(service.count()).toBe(0);
});

describe('ingest', () => {
it('stores a record', () => {
service.ingest(makeRecord('events', 'r1'));
expect(service.count()).toBe(1);
});
});

describe('handleEtlRecord', () => {
it('ingests via event handler', () => {
service.handleEtlRecord(makeRecord('events', 'r2'));
expect(service.count()).toBe(1);
});
});

describe('query', () => {
beforeEach(() => {
service.ingest(makeRecord('events', 'a'));
service.ingest(makeRecord('metrics', 'b'));
service.ingest(makeRecord('events', 'c'));
});

it('returns all entries without filter', () => {
expect(service.query()).toHaveLength(3);
});

it('filters by source', () => {
const results = service.query({ source: 'events' });
expect(results).toHaveLength(2);
expect(results.every((e) => e.source === 'events')).toBe(true);
});

it('respects limit', () => {
expect(service.query({ limit: 2 })).toHaveLength(2);
});

it('filters by date range', () => {
const future = new Date(Date.now() + 60_000);
expect(service.query({ to: future })).toHaveLength(3);
const past = new Date(Date.now() - 60_000);
expect(service.query({ from: future })).toHaveLength(0);
expect(service.query({ from: past })).toHaveLength(3);
});
});

describe('aggregate', () => {
it('counts by source', () => {
service.ingest(makeRecord('events', 'x'));
service.ingest(makeRecord('events', 'y'));
service.ingest(makeRecord('metrics', 'z'));
const agg = service.aggregate();
expect(agg['events']).toBe(2);
expect(agg['metrics']).toBe(1);
});

it('filters aggregate by source', () => {
service.ingest(makeRecord('events', 'x'));
service.ingest(makeRecord('metrics', 'z'));
const agg = service.aggregate('events');
expect(agg['events']).toBe(1);
expect(agg['metrics']).toBeUndefined();
});
});
});
Loading
Loading