Skip to content
This repository has been archived by the owner on Jul 28, 2023. It is now read-only.

Commit

Permalink
feat: Global poller for updating worker status
Browse files Browse the repository at this point in the history
Uses a shared poller to fetch and update the status of workers; helps reduce the no. of requests made to BrowserStack.
Ref #30
  • Loading branch information
shirish87 committed Nov 5, 2015
1 parent ac1e838 commit b81cafb
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 37 deletions.
2 changes: 2 additions & 0 deletions gruntfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ module.exports = function (grunt) {
eslint: {
target: [
'index.js',
'worker.js',
'worker-manager.js',
'gruntfile.js'
]
}
Expand Down
81 changes: 44 additions & 37 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ var q = require('q')
var api = require('browserstack')
var BrowserStackTunnel = require('browserstacktunnel-wrapper')
var os = require('os')
var workerManager = require('./worker-manager')

var createBrowserStackTunnel = function (logger, config, emitter) {
var log = logger.create('launcher.browserstack')
Expand Down Expand Up @@ -74,21 +75,37 @@ var createBrowserStackTunnel = function (logger, config, emitter) {
log.error(error)
}

if (workerManager.isPolling) {
workerManager.stopPolling()
}

done()
})
})

return deferred.promise
}

var createBrowserStackClient = function (/* config.browserStack */ config) {
var createBrowserStackClient = function (/* config.browserStack */config) {
var env = process.env

// TODO(vojta): handle no username/pwd
return api.createClient({
var client = api.createClient({
username: env.BROWSER_STACK_USERNAME || config.username,
password: env.BROWSER_STACK_ACCESS_KEY || config.accessKey
})

var pollingTimeout = config.pollingTimeout || 1000

if (!workerManager.isPolling) {
workerManager.startPolling(client, pollingTimeout, function (err) {
if (err) {
console.error(err)
}
})
}

return client
}

var formatError = function (error) {
Expand All @@ -104,6 +121,7 @@ var BrowserStackBrowser = function (id, emitter, args, logger,
/* config */ config,
/* browserStackTunnel */ tunnel, /* browserStackClient */ client) {
var self = this

var workerId = null
var captured = false
var alreadyKilling = null
Expand All @@ -118,7 +136,6 @@ var BrowserStackBrowser = function (id, emitter, args, logger,
var captureTimeout = config.captureTimeout || 0
var captureTimeoutId
var retryLimit = bsConfig.retryLimit || 3
var pollingTimeout = bsConfig.pollingTimeout || 1000

this.start = function (url) {
// TODO(vojta): handle non os/browser/version
Expand Down Expand Up @@ -156,43 +173,33 @@ var BrowserStackBrowser = function (id, emitter, args, logger,
workerId = worker.id
alreadyKilling = null

var whenRunning = function () {
log.debug('%s job started with id %s', browserName, workerId)

if (captureTimeout) {
captureTimeoutId = setTimeout(self._onTimeout, captureTimeout)
worker = workerManager.registerWorker(worker)
worker.on('status', function (status) {
// TODO(vojta): show immediately in createClient callback once this gets fixed:
// https://github.com/browserstack/api/issues/10
if (!sessionUrlShowed) {
log.info('%s session at %s', browserName, worker.browser_url)
sessionUrlShowed = true
}
}

var waitForWorkerRunning = function () {
client.getWorker(workerId, function (error, w) {
if (error) {
log.error('Can not get worker %s status %s\n %s', workerId, browserName, formatError(error))
return emitter.emit('browser_process_failure', self)
}

// TODO(vojta): show immediately in createClient callback once this gets fixed:
// https://github.com/browserstack/api/issues/10
if (!sessionUrlShowed) {
log.info('%s session at %s', browserName, w.browser_url)
sessionUrlShowed = true
}

if (w.status === 'running') {
whenRunning()
} else {
log.debug('%s job with id %s still in queue.', browserName, workerId)
setTimeout(waitForWorkerRunning, pollingTimeout)
}
})
}
switch (status) {
case 'running':
log.debug('%s job started with id %s', browserName, workerId)

if (worker.status === 'running') {
whenRunning()
} else {
log.debug('%s job queued with id %s.', browserName, workerId)
setTimeout(waitForWorkerRunning, pollingTimeout)
}
if (captureTimeout) {
captureTimeoutId = setTimeout(self._onTimeout, captureTimeout)
}
break

case 'queue':
log.debug('%s job with id %s in queue.', browserName, workerId)
break

case 'delete':
log.debug('%s job with id %s has been deleted.', browserName, workerId)
break
}
})
})
}, function () {
emitter.emit('browser_process_failure', self)
Expand Down
95 changes: 95 additions & 0 deletions worker-manager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
var Worker = require('./worker')

/**
* Tracks worker state across runs.
*/
function WorkerManager () {
this._pollHandle = null
this.workers = {}
this.isPolling = false
}

WorkerManager.prototype.registerWorker = function registerWorker (workerData) {
if (this.workers[workerData.id]) {
this.unregisterWorker(this.workers[workerData.id])
}

var worker = new Worker(workerData)
worker.emit('status', worker.status)

this.workers[workerData.id] = worker
return worker
}

WorkerManager.prototype.unregisterWorker = function unregisterWorker (worker) {
worker.emit('delete', worker)
worker.removeAllListeners()

delete this.workers[worker.id]
return worker
}

WorkerManager.prototype.updateWorker = function updateWorker (workerData) {
var workers = this.workers

if (workers[workerData.id]) {
var worker = workers[workerData.id]
var prevStatus = worker.status

Object.keys(workerData).forEach(function (k) {
worker[k] = workerData[k]
})

if (worker.status !== prevStatus) {
worker.emit('status', worker.status)
}
}
}

WorkerManager.prototype.startPolling = function startPolling (client, pollingTimeout, callback) {
if (this.isPolling) {
return
}

var self = this
this.isPolling = true

client.getWorkers(function (err, updatedWorkers) {
if (err) {
self.isPolling = false
return (callback ? callback(err) : null)
}

var activeWorkers = (updatedWorkers || []).reduce(function (o, worker) {
o[worker.id] = worker
return o
}, {})

Object.keys(self.workers).forEach(function (workerId) {
if (activeWorkers[workerId]) {
// process updates
self.updateWorker(activeWorkers[workerId])
} else {
// process deletions
self.unregisterWorker(self.workers[workerId])
}
})

self._pollHandle = setTimeout(function () {
self.isPolling = false
self.startPolling(client, pollingTimeout, callback)
}, pollingTimeout)
})
}

WorkerManager.prototype.stopPolling = function stopPolling () {
if (this._pollHandle) {
clearTimeout(this._pollHandle)
this._pollHandle = null
}

this.isPolling = false
}

// expose a single, shared instance of WorkerManager
module.exports = new WorkerManager()
18 changes: 18 additions & 0 deletions worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
var EventEmitter = require('events').EventEmitter
var util = require('util')

function Worker (data) {
EventEmitter.call(this)

if (typeof data === 'object' && !Array.isArray(data)) {
var self = this

Object.keys(data).forEach(function (k) {
self[k] = data[k]
})
}
}

util.inherits(Worker, EventEmitter)

module.exports = Worker

0 comments on commit b81cafb

Please sign in to comment.