-
-
Notifications
You must be signed in to change notification settings - Fork 248
/
Copy pathgrabber.ts
80 lines (65 loc) · 2.41 KB
/
grabber.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import { EPGGrabber, GrabCallbackData, EPGGrabberMock, SiteConfig, Channel } from 'epg-grabber'
import { Logger, Collection } from '@freearhey/core'
import { Queue } from './'
import { GrabOptions } from '../commands/epg/grab'
import { TaskQueue, PromisyClass } from 'cwait'
type GrabberProps = {
logger: Logger
queue: Queue
options: GrabOptions
}
export class Grabber {
logger: Logger
queue: Queue
options: GrabOptions
constructor({ logger, queue, options }: GrabberProps) {
this.logger = logger
this.queue = queue
this.options = options
}
async grab(): Promise<{ channels: Collection; programs: Collection }> {
const taskQueue = new TaskQueue(Promise as PromisyClass, this.options.maxConnections)
const total = this.queue.size()
const channels = new Collection()
let programs = new Collection()
let i = 1
await Promise.all(
this.queue.items().map(
taskQueue.wrap(
async (queueItem: { channel: Channel; config: SiteConfig; date: string }) => {
const { channel, config, date } = queueItem
channels.add(channel)
if (this.options.timeout !== undefined) {
const timeout = parseInt(this.options.timeout)
config.request = { ...config.request, ...{ timeout } }
}
if (this.options.delay !== undefined) {
const delay = parseInt(this.options.delay)
config.delay = delay
}
const grabber =
process.env.NODE_ENV === 'test' ? new EPGGrabberMock(config) : new EPGGrabber(config)
const _programs = await grabber.grab(
channel,
date,
(data: GrabCallbackData, error: Error | null) => {
const { programs, date } = data
this.logger.info(
` [${i}/${total}] ${channel.site} (${channel.lang}) - ${
channel.xmltv_id
} - ${date.format('MMM D, YYYY')} (${programs.length} programs)`
)
if (i < total) i++
if (error) {
this.logger.info(` ERR: ${error.message}`)
}
}
)
programs = programs.concat(new Collection(_programs))
}
)
)
)
return { channels, programs }
}
}