/
http-filmliste.ts
79 lines (63 loc) · 2.47 KB
/
http-filmliste.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import { Observable, Subscriber } from 'rxjs';
import { AsyncRequest } from './async-request';
import * as LZMA from 'lzma-native';
import { IFilmliste, BatchType } from './interfaces/filmliste';
import { AsyncFS } from './async-fs';
import { Entry } from './model';
import { CacheManager } from './cache-manager';
import { Utils } from './utils';
import { NativeFilmlisteParser, BatchCallbackType } from './native-filmliste-parser';
export class HTTPFilmliste implements IFilmliste {
url: string;
cachable: boolean;
timestamp: number;
constructor(url: string, cachable: boolean) {
this.url = url;
this.cachable = cachable;
}
async getTimestamp(): Promise<number> {
if (this.timestamp != undefined) {
return this.timestamp;
}
let response = await AsyncRequest.head(this.url);
if (response.statusCode == 200 && response.headers['last-modified'] != undefined) {
this.timestamp = Math.floor(new Date(response.headers['last-modified']).getTime() / 1000);
}
else if (response.statusCode != 200) {
throw new Error(`HTTP statuscode ${response.statusCode}`);
}
else if (response.headers['last-modified'] == undefined) {
throw new Error(`No Last-Modified Header from ${this.url}`);
}
return this.timestamp;
}
getEntries(): Observable<BatchType> {
let observable: Observable<BatchType> = new Observable<BatchType>((observer) => {
this.getEntriesObserverHandler(observer);
});
return observable;
}
private async getEntriesObserverHandler(observer: Subscriber<BatchType>) {
let cache = CacheManager.get(this.url);
let has = this.cachable && (await cache.has());
if (!has) {
let fileStream = await cache.getWriteStream();
await Utils.streamToPromise(await this.pipe(fileStream));
await cache.finalize();
}
await NativeFilmlisteParser.parseFilmliste(cache.path, '({|,|\\s*?)?"(Filmliste|X)"\\s*?:\\s*?', 150, (batch, next) => observer.next({ data: batch, next: next }));
observer.complete();
}
async pipe<T>(destination: T, options?: { end?: boolean }): Promise<T> {
console.log('download');
if (await this.streamIsCompressed) {
let decompressor = LZMA.createDecompressor();
return AsyncRequest.get(this.url).pipe(decompressor).pipe(destination, options);
} else {
return AsyncRequest.get(this.url).pipe(destination, options);
}
}
get streamIsCompressed(): Promise<boolean> {
return Promise.resolve(this.url.endsWith('xz'));
}
}