Skip to content

Commit

Permalink
refactor export functions
Browse files Browse the repository at this point in the history
  • Loading branch information
dmfenton committed Oct 29, 2016
1 parent 5d5a9f1 commit ecc22b8
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 151 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ test/config/*
*.log
..*
test.log.*
.vscode
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](http://semver.org/).

## Unreleased
### Changed
* Refactor export functions into separate model

### Fixed
* Remove possible exception in datasets serialization

Expand Down
5 changes: 3 additions & 2 deletions controllers/bulk.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
module.exports = function (agol, controller) {
var actions = {
import: agol.bulkImport,
export: agol.bulkExport
// Henious hack to please Mocha
import: agol.bulkImport.bind(agol),
export: agol.exporter.bulk.bind(agol.exporter)
}
return function (req, res) {
var action = actions[req.params.action]
Expand Down
4 changes: 2 additions & 2 deletions controllers/getResource.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ module.exports = function (agol, controller) {
if (!outdated || isFullGeojson(req)) return returnFile(req, res, options.output, dataInfo, fileInfo)
// if we are not storing data in the db we should never be writing new full geojson exports
var exportStatus = Utils.determineExportStatus(req, dataInfo)
if (!exportStatus && dataInfo.version === 3.0) agol.generateExport(options, function (err, status) { if (err) agol.log.error(err) })
if (!exportStatus && dataInfo.version === 3.0) agol.exporter.generate(options, function (err, status) { if (err) agol.log.error(err) })

// always serve filtered data from the same cache as the full export
var isFiltered = req.query.where || req.query.geometry
Expand All @@ -57,7 +57,7 @@ module.exports = function (agol, controller) {
if (exportStatus || info.status === 'Processing') return returnStatus(req, res, info, error)
var options = Utils.createExportOptions(req, info)
// only enqueue a job if it's not already queued or running
agol.generateExport(options, function (err, status, created) {
agol.exporter.generate(options, function (err, status, created) {
returnStatus(req, res, status, err)
})
}
Expand Down
127 changes: 6 additions & 121 deletions models/agol.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ var Dataset = require('./dataset')
var Utils = require('../lib/utils')
var async = require('async')
var SpatialReference = require('spatialreference')
var formatSpatialRef = require('format-spatial-ref')

var Exporter = require('./exporter')
var AGOL = function (koop) {
/**
* inherits from the base model
Expand Down Expand Up @@ -52,6 +51,10 @@ var AGOL = function (koop) {
cache: koop.cache,
log: koop.log
})
agol.exporter = new Exporter({
cache: koop.cache,
log: koop.log
})

agol.files = koop.fs

Expand Down Expand Up @@ -200,16 +203,6 @@ var AGOL = function (koop) {
agol.cache.drop(item, layer, options, callback)
}

/**
* Wraps export enqueing
*
* @param {object} options - directions for what to export
* @return {object} new export job
*/
agol.enqueueExport = function (options) {
return koop.queue.enqueue('exportFile', options)
}

/**
* Wraps copy enqueing
*
Expand All @@ -220,81 +213,6 @@ var AGOL = function (koop) {
return koop.queue.enqueue('copyFile', options)
}

/**
* Exports a dataset to a file
*
* @param {object} options - file export parameters
* @param {function} callback - calls back with an error or status and whether a new job was created
*/
agol.generateExport = function (options, callback) {
getWkt(options.outSR, function (err, wkt) {
if (err) return callback(err)
options.srs = wkt
var xport = agol.enqueueExport(options)

xport
.once('start', function () { agol.updateJob('start', options) })
.once('progress', function () { agol.updateJob('progress', options) })
.once('finish', function () {
xport.removeAllListeners()
// Hack to make sure this fires after other progress updates have been saved
setTimeout(function () {
agol.updateJob('finish', options)
}, 1000)
})
.once('fail', function (status) {
xport.removeAllListeners()
var error = status.errorReport && status.errorReport.message
agol.updateJob('Error: ' + error, options)
})
agol.updateJob('queued', options, callback)
})
}

agol.updateJob = function (status, options, callback) {
agol.log.info('Export Job', status, options)
agol.cache.getInfo(options.table, function (err, info) {
if (err) {
if (callback) callback(err, info)
return agol.log.error(err)
}
info.generating = info.generating || {}
var generating = info.generating[options.key] = info.generating[options.key] || {}
if (status === 'finish') {
info.generated = info.generated || {}
info.generated[options.key] = info.generated[options.key] || {}
info.generated[options.key][options.format] = info.retrieved_at
delete info.generating[options.key][options.format]
} else {
generating[options.format] = status
}
agol.cache.updateInfo(options.table, info, function (err) {
if (err) agol.log.error(err)
if (callback) callback(err, info)
})
})
}

/**
* Gets projection information for a shapefile export
* @param {object} options - contains info on spatial reference, wkid and wkt
* @param {function} callback - calls back with an error or wkt
* @private
*/
function getWkt (outSr, callback) {
var wkt
// if there is a passed in WKT just use that
if (!outSr) return callback()
if (outSr.wkt) {
wkt = outSr.wkt.replace(/lambert_conformal_conic(?!_)/i, 'Lambert_Conformal_Conic_2SP')
return callback(null, wkt)
}
var spatialRef = formatSpatialRef(outSr)
// latest WKID is the more modern value
var wkid = spatialRef.latestWkid || spatialRef.wkid
agol.spatialReference.wkidToWkt(wkid, callback)
}

/**
* Get the expiration date of a resource from the info doc in the db
*
Expand Down Expand Up @@ -371,40 +289,7 @@ var AGOL = function (koop) {
}
}

/**
* Enqueues a set of export jobs
* @param {object} req - the incoming request object
* @param {array} jobs - the set of jobs to enqueue
* @param {function} callback - calls back with information about the enqueued jobs
*/
agol.bulkExport = function (req, jobs, callback) {
var errors = []
async.each(jobs, xport, function () {
finishBulk(jobs, errors, callback)
})

function xport (job, next) {
agol.cache.getInfo('agol' + ':' + job.item + ':' + job.layer, function (err, info) {
if (err) {
errors.push(formatJobError(job, err))
return next()
}
var formats = job.formats || ['kml', 'csv', 'zip', 'geohash']
async.each(formats, function (format, done) {
req.optionKey = Utils.createCacheKey(job, {
where: job.where,
outSR: job.outSr,
geometry: job.geometry
})
var options = Utils.createExportOptions(req, info, job, format)
agol.generateExport(options, function (err) {
if (err) errors.push(formatJobError(job, err))
done()
})
}, function () { next() })
})
}
}
// TODO (low priority) these two functions are now duplicated in Exporter

function formatJobError (job, error) {
return {
Expand Down
160 changes: 160 additions & 0 deletions models/exporter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
var Exporter = module.exports = function (options) {
this.cache = options.cache
this.log = options.log
try {
this.spatialReference = new SpatialReference({db: this.cache.db, logger: this.log})
} catch (e) {
this.spatialReference = new SpatialReference({})
}
}

var async = require('async')
var Utils = require('../lib/utils')
var config = require('config')
var koop = require('koop')(config)
var SpatialReference = require('spatialreference')
var formatSpatialRef = require('format-spatial-ref')

/**
* Wraps export enqueing
*
* @param {object} options - directions for what to export
* @return {object} new export job
*/
Exporter.prototype.enqueue = function (options) {
return koop.queue.enqueue('exportFile', options)
}

/**
* Exports a dataset to a file
*
* @param {object} options - file export parameters
* @param {function} callback - calls back with an error or status and whether a new job was created
*/
Exporter.prototype.generate = function (options, callback) {
var self = this
self.getWkt(options.outSR, function (err, wkt) {
if (err) return callback(err)
options.srs = wkt
var job = self.enqueue(options)

job
.once('start', function () { self.updateJob('start', options) })
.once('progress', function () { self.updateJob('progress', options) })
.once('finish', function () {
job.removeAllListeners()
// Hack to make sure this fires after other progress updates have been saved
setTimeout(function () {
self.updateJob('finish', options)
}, 1000)
})
.once('fail', function (status) {
job.removeAllListeners()
var error = status.errorReport && status.errorReport.message
self.updateJob('Error: ' + error, options)
})
self.updateJob('queued', options, callback)
})
}

/**
* Enqueues a set of export jobs
* @param {object} req - the incoming request object
* @param {array} jobs - the set of jobs to enqueue
* @param {function} callback - calls back with information about the enqueued jobs
*/
Exporter.prototype.bulk = function (req, jobs, callback) {
var self = this
var errors = []
var xport = function (job, next) {
self.cache.getInfo('agol' + ':' + job.item + ':' + job.layer, function (err, info) {
if (err) {
errors.push(formatJobError(job, err))
return next()
}
var formats = job.formats || ['kml', 'csv', 'zip', 'geohash']
async.each(formats, function (format, done) {
req.optionKey = Utils.createCacheKey(job, {
where: job.where,
outSR: job.outSr,
geometry: job.geometry
})
var options = Utils.createExportOptions(req, info, job, format)
self.generate(options, function (err) {
if (err) errors.push(formatJobError(job, err))
done()
})
}, function () { next() })
})
}

async.each(jobs, xport, function () {
finishBulk(jobs, errors, callback)
})
}

Exporter.prototype.updateJob = function (status, options, callback) {
var self = this
self.log.info('Export Job', status, options)
self.cache.getInfo(options.table, function (err, info) {
if (err) {
if (callback) callback(err, info)
return self.log.error(err)
}
info.generating = info.generating || {}
var generating = info.generating[options.key] = info.generating[options.key] || {}
if (status === 'finish') {
info.generated = info.generated || {}
info.generated[options.key] = info.generated[options.key] || {}
info.generated[options.key][options.format] = info.retrieved_at
delete info.generating[options.key][options.format]
} else {
generating[options.format] = status
}
self.cache.updateInfo(options.table, info, function (err) {
if (err) self.log.error(err)
if (callback) callback(err, info)
})
})
}

/**
* Gets projection information for a shapefile export
* @param {object} options - contains info on spatial reference, wkid and wkt
* @param {function} callback - calls back with an error or wkt
*/
Exporter.prototype.getWkt = function (outSr, callback) {
var wkt
if (!outSr) return callback()
// if there is a passed in WKT just use that
if (outSr.wkt) {
wkt = outSr.wkt.replace(/lambert_conformal_conic(?!_)/i, 'Lambert_Conformal_Conic_2SP')
return callback(null, wkt)
}
var spatialRef = formatSpatialRef(outSr)
// latest WKID is the more modern value
var wkid = spatialRef.latestWkid || spatialRef.wkid
this.spatialReference.wkidToWkt(wkid, callback)
}

function formatJobError (job, error) {
return {
item: job.item,
layer: job.layer,
message: error.message,
url: error.url,
response: error.body
}
}

function finishBulk (jobs, errors, callback) {
var response = {
meta: {
total: jobs.length,
succeeded: jobs.length - errors.length,
failed: errors.length
},
failed: errors
}
callback(null, response)
}
Loading

0 comments on commit ecc22b8

Please sign in to comment.