-
Notifications
You must be signed in to change notification settings - Fork 6
/
Etl.ts
123 lines (105 loc) · 3.85 KB
/
Etl.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import { Observable } from 'rxjs';
import { Extractor } from './interfaces/Extractor';
import { GeneralTransformer } from './interfaces/GeneralTransformer';
import { Loader } from './interfaces/Loader';
import { Transformer } from './interfaces/Transformer';
import { MapTransformer } from './transformers/MapTransformer';
export enum EtlState {
Running,
Stopped,
Error,
}
/**
* ETL Class. Instantiate one and add as many extractors, transformers and loaders as you want.
* Then start the whole process with ".start()".
*
* This processor is modular, you can find other implemented loaders and extractors in the README
*/
export class Etl {
private _extractors: Extractor[] = [];
private _generalTransformers: GeneralTransformer[] = [];
private _transformers: Transformer[] = [];
private _loaders: Loader[] = [];
private _state: EtlState = EtlState.Stopped;
private _context: any = null;
public constructor(context?: any) {
this.setContext(context);
}
public get extractors(): Extractor[] {
return this._extractors;
}
public get generalTransformers(): GeneralTransformer[] {
return this._generalTransformers;
}
public get transformers(): Transformer[] {
return this._transformers;
}
public get loaders(): Loader[] {
return this._loaders;
}
public get state(): EtlState {
return this._state;
}
public setContext(context: any): this {
if (this._state !== EtlState.Stopped) {
this._state = EtlState.Error;
throw new Error('Tried to set context on invalid state.');
}
this._context = context;
return this;
}
public addExtractor(extract: Extractor): Etl {
this._extractors.push(extract);
return this;
}
public addGeneralTransformer(transformer: GeneralTransformer): Etl {
this._generalTransformers.push(transformer);
return this;
}
public addTransformer(transformer: Transformer): Etl {
this.addGeneralTransformer(new MapTransformer(transformer));
this._transformers.push(transformer);
return this;
}
public addLoader(loader: Loader): Etl {
this._loaders.push(loader);
return this;
}
/**
* Starts the etl process. First, all extractors are run in parallel and deliver their results into an observable.
* Once the buffer gets a result, it transfers all objects through the transformers (one by one).
* After that, the transformed results are run through all loaders in parallel.
*
* @returns {Observable<any>} Observable that completes when the process is finished,
* during the "next" process step you get update on how many are processed yet.
* Throws when any step produces an error.
*/
public start(): Observable<any> {
this._state = EtlState.Running;
const observable = Observable
.merge(...this._extractors.map(extractor => extractor.read(this._context)));
return this._generalTransformers
.reduce((observable, transformer) => transformer.process(observable, this._context), observable)
.flatMap(object => Observable.merge(...this._loaders.map(loader => loader.write(object, this._context))))
.do(
() => { },
(err) => {
this._state = EtlState.Error;
return Observable.throw(err);
},
() => {
this._state = EtlState.Stopped;
},
);
}
/**
* Resets the whole Etl object. Deletes all modifiers and resets the state.
*/
public reset(): void {
this._extractors = [];
this._transformers = [];
this._loaders = [];
this._state = EtlState.Stopped;
this._context = null;
}
}