-
Notifications
You must be signed in to change notification settings - Fork 0
/
kue-worker.coffee
127 lines (119 loc) · 5.33 KB
/
kue-worker.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
async = require('async')
echonest = require('echonest')
fs = require('fs')
kue = require('kue')
LastFmNode = require('lastfm').LastFmNode
mongodb = require('mongodb')
# load config
config = JSON.parse(fs.readFileSync('config.json', 'utf8'))
# Setup Last.FM API endpoint
lastfm = new LastFmNode
api_key: config.lastfm.api_key
secret: config.lastfm.secret
# connect to MongoDB
mongo = {}
mongo.server = new mongodb.Server(config.mongodb.host, config.mongodb.port,
auto_reconnect: true
)
mongo.db_connector = new mongodb.Db(config.mongodb.db, mongo.server,
journal: true
)
# Setup echonest API endpoint
nest = new echonest.Echonest
api_key: config.echonest.api_key
rate_limit: 3000 # Pause 3s between 2 requests
# Create job queue
jobs = kue.createQueue()
async.waterfall [
(cb) ->
mongo.db_connector.open cb
(db, cb) ->
artist_collection = new mongodb.Collection(db, 'artists')
user_collection = new mongodb.Collection(db, 'users')
lastfmArtist2Location = (item, icb) ->
# Check if we have cached the location
artist_collection.find(name: item.name).toArray (err, result) ->
if err?
icb err
else
if result.length > 0
# TODO: Cache should be refreshed after some time
splits = result[0].location.split(',')
location = splits[splits.length - 1].trim()
if config.songride.corrections[location]?
location = config.songride.corrections[location]
icb null, [location, parseInt(result[0].playcount)]
else
# Nothing cached so we need to ask the echonest.
req = bucket: "artist_location", name: item.name
nest.artist.profile req, (err, res) ->
if err?
icb err
else
doc = {}
doc.playcount = parseInt(item.playcount)
doc.mbid = item.mbid
doc.name = item.name
doc.updated_at = parseInt(new Date().getTime() / 1000)
if res.status.code == 0 and res.artist? and res.artist.artist_location? and res.artist.artist_location.location?
doc.location = res.artist.artist_location.location
else
doc.location = 'Unknown'
console.log(doc.location)
# Store the result we got from the echonest
# as we do not want to ask them contiously
# the same thing.
# TODO: If we are updating, use update
splits = doc.location.split(',')
location = splits[splits.length - 1].trim()
if config.songride.corrections[location]?
location = config.songride.corrections[location]
artist_collection.insert doc, journal: true, (err, result) ->
icb err, [location, doc.playcount]
reduceLocationCount = (memo, item, cb) ->
if memo[item[0]]?
memo[item[0]] += item[1]
else
memo[item[0]] = item[1]
cb null, memo
jobs.process 'lastfm-top50', (job, done) ->
async.waterfall [
(cb) ->
# Get the users top50 artists
# TODO: Cache result
console.log("# lastfm-top50 for " + job.data.username)
lastfm.request 'user.getTopArtists',
user: job.data.username
period: 'overall'
handlers:
success: (data) -> cb null, data
error: cb
(data, cb) ->
job.progress(10, 100)
cb null, data.topartists.artist
(artists, cb) ->
# Get the artists country
if artists?
async.mapSeries artists, lastfmArtist2Location, cb
else
cb "No artists", null
(artists, cb) ->
job.progress(80, 100)
async.reduce artists, {}, reduceLocationCount, cb
(countries, cb) ->
job.progress(90, 100)
# Save result into the database
doc = {}
doc.username = job.data.username
doc.updated_at = parseInt(new Date().getTime() / 1000)
doc.countries = countries
user_collection.insert doc, journal: true, cb
], (err, result) ->
job.progress(100, 100)
console.log("# FINISHED: lastfm-top50 for " + job.data.username)
if err?
done(err)
else
done()
], (err, db) ->
mongo.db_connector.close()