-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.coffee
159 lines (122 loc) · 4.37 KB
/
stream.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
util = require 'util'
EventSource = require 'eventsource'
STREAM_TTL = 60 # seconds
__meteor_runtime_config__.STREAM_TTL = STREAM_TTL
# Auto-expire messages after STREAM_TTL seconds.
Stream._ensureIndex
_ts: 1
,
expireAfterSeconds: STREAM_TTL
Stream._ensureIndex
wiki: 1
Stream._ensureIndex
timestamp: 1
Stream._ensureIndex
id: 1
Stream._ensureIndex
wiki: 1
timestamp: 1
id: 1
'log_params.log': 1
,
unique: true
mediawikiAPI = (url, params) ->
response = HTTP.get url,
params: params
headers:
'User-Agent': "WikiMedia Meteor DDP stream (#{ Meteor.absoluteUrl() }, mitar.wikimediastream@tnode.com)"
npmRequestOptions:
forever: true # Enable keep-alive.
data = response.data
if data.error
if _.isObject data.error
error = util.inspect data.error, showHidden: false, depth: null
else
error = data.error
throw new Error "API Error: #{ url }, #{ util.inspect params, showHidden: false, depth: null }, #{ error }"
console.warn data.warnings if data.warnings
data
handleException = (error) ->
console.error "Exception in stream change processing: #{ error.stack or error }"
Meteor.startup ->
# Connect to WikiMedia stream.
# TODO: Are there other streams for other MediaWiki installations? Should this be configurable?
eventSource = new EventSource('https://stream.wikimedia.org/v2/stream/recentchange');
eventSource.on 'open', (event) ->
console.log "Stream connected"
eventSource.on 'error', (event) ->
# TODO: Do we have to reconnect?
console.log "Stream error", event
eventSource.on 'message', Meteor.bindEnvironment (event) ->
# Store receive (and expiry) timestamp.
timestamp = new Date()
data = JSON.parse event.data
try
if data.type is 'new'
responseData = mediawikiAPI "#{ data.server_url }#{ data.server_script_path }/api.php",
format: 'json'
action: 'query'
prop: 'revisions'
revids: data.revision.new
rvprop: 'content'
rvslots: '*'
continue: ''
# It should be only one result, so nothing to continue ever.
assert not responseData.continue, "Continue for revids #{ data.revision.new }"
data.query = responseData.query
else if data.type is 'edit'
responseData = mediawikiAPI "#{ data.server_url }#{ data.server_script_path }/api.php",
format: 'json'
action: 'compare'
fromrev: data.revision.old
torev: data.revision.new
data.compare = responseData.compare
catch error
console.error "Exception in fetching API data for #{ util.inspect data, showHidden: false, depth: null }: #{ error.stack or error }"
# Set receive (and expiry) timestamp. We do it last so that it is the last in the object.
# It just looks a bit better when printing the objects out.
data._ts = timestamp
# We remove $schema because it cannot be stored in MongoDB.
delete data.$schema
Stream.upsert
$and: [
wiki: data.wiki
,
timestamp: data.timestamp
,
id: data.id
,
'log_params.log': data.log_params?.log
]
,
$setOnInsert: data
,
handleException
Meteor.publish 'mediawiki-stream', (selector, projectionFields, includeCached) ->
check selector, Object
check projectionFields, Match.Optional Match.OneOf null, Object
check includeCached, Match.Optional Match.OneOf null, Boolean
projectionFields ?= {}
includeCached ?= false
remove = (id) =>
# Because we are potentially not including cached documents, or we are removing an already
# removed document, we should check if we are publishing a document before removing it.
stringId = @_idFilter.idStringify id
@removed 'mediawiki_stream', id if @_documents.get('mediawiki_stream').has(stringId)
initializing = true
handle = Stream.find(selector, fields: projectionFields).observeChanges
added: (id, fields) =>
if includeCached or not initializing
@added 'mediawiki_stream', id, fields
# Make sure document is removed after STREAM_TTL seconds. MongoDB does not always expire
# documents on time, or observeChanges does not always detect expired documents quickly.
Meteor.setTimeout =>
remove id
,
STREAM_TTL * 1000 # ms
removed: (id) =>
remove id
initializing = false
@ready()
@onStop =>
handle.stop()