Skip to content

Commit

Permalink
feat: start working on REST datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
albanm committed Dec 6, 2018
1 parent dc77a31 commit 6474fec
Show file tree
Hide file tree
Showing 18 changed files with 524 additions and 75 deletions.
38 changes: 32 additions & 6 deletions .cache-require-paths.json
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@
},
"/home/alban/github/data-fair/server/utils/db.js": {
"config": "/home/alban/github/data-fair/node_modules/config/lib/config.js",
"mongodb": "/home/alban/github/data-fair/node_modules/mongodb/index.js"
"mongodb": "/home/alban/github/data-fair/node_modules/mongodb/index.js",
"debug": "/home/alban/github/data-fair/node_modules/debug/src/index.js"
},
"/home/alban/github/data-fair/node_modules/mongodb/index.js": {
"mongodb-core": "/home/alban/github/data-fair/node_modules/mongodb-core/index.js",
Expand Down Expand Up @@ -6110,7 +6111,8 @@
"../utils/cache": "/home/alban/github/data-fair/server/utils/cache.js",
"../workers/converter": "/home/alban/github/data-fair/server/workers/converter.js",
"../../contract/dataset-patch": "/home/alban/github/data-fair/contract/dataset-patch.js",
"../utils/dataset-file-sample": "/home/alban/github/data-fair/server/utils/dataset-file-sample.js"
"../utils/dataset-file-sample": "/home/alban/github/data-fair/server/utils/dataset-file-sample.js",
"../utils/rest-datasets": "/home/alban/github/data-fair/server/utils/rest-datasets.js"
},
"/home/alban/github/data-fair/contract/dataset-api-docs.js": {
"config": "/home/alban/github/data-fair/node_modules/config/lib/config.js",
Expand Down Expand Up @@ -7374,7 +7376,8 @@
"node-dir": "/home/alban/github/data-fair/node_modules/node-dir/index.js",
"axios": "/home/alban/github/data-fair/node_modules/axios/index.js",
"./fields-sniffer": "/home/alban/github/data-fair/server/utils/fields-sniffer.js",
"./geo": "/home/alban/github/data-fair/server/utils/geo.js"
"./geo": "/home/alban/github/data-fair/server/utils/geo.js",
"./rest-datasets": "/home/alban/github/data-fair/server/utils/rest-datasets.js"
},
"/home/alban/github/data-fair/node_modules/stream-combiner/index.js": {
"duplexer": "/home/alban/github/data-fair/node_modules/duplexer/index.js",
Expand Down Expand Up @@ -8135,7 +8138,8 @@
"inherits": "/home/alban/github/data-fair/node_modules/inherits/inherits.js",
"util": "util",
"./internal/streams/BufferList": "/home/alban/github/data-fair/node_modules/readable-stream/lib/internal/streams/BufferList.js",
"./internal/streams/destroy": "/home/alban/github/data-fair/node_modules/readable-stream/lib/internal/streams/destroy.js"
"./internal/streams/destroy": "/home/alban/github/data-fair/node_modules/readable-stream/lib/internal/streams/destroy.js",
"./_stream_duplex": "/home/alban/github/data-fair/node_modules/readable-stream/lib/_stream_duplex.js"
},
"/home/alban/github/data-fair/node_modules/readable-stream/lib/internal/streams/stream.js": {
"stream": "stream"
Expand All @@ -8154,7 +8158,8 @@
"util-deprecate": "/home/alban/github/data-fair/node_modules/util-deprecate/node.js",
"./internal/streams/stream": "/home/alban/github/data-fair/node_modules/readable-stream/lib/internal/streams/stream.js",
"safe-buffer": "/home/alban/github/data-fair/node_modules/safe-buffer/index.js",
"./internal/streams/destroy": "/home/alban/github/data-fair/node_modules/readable-stream/lib/internal/streams/destroy.js"
"./internal/streams/destroy": "/home/alban/github/data-fair/node_modules/readable-stream/lib/internal/streams/destroy.js",
"./_stream_duplex": "/home/alban/github/data-fair/node_modules/readable-stream/lib/_stream_duplex.js"
},
"/home/alban/github/data-fair/node_modules/util-deprecate/node.js": {
"util": "util"
Expand Down Expand Up @@ -8659,7 +8664,9 @@
"../utils/dataset": "/home/alban/github/data-fair/server/utils/dataset.js",
"../utils/extensions": "/home/alban/github/data-fair/server/utils/extensions.js",
"../utils/journals": "/home/alban/github/data-fair/server/utils/journals.js",
"debug": "/home/alban/github/data-fair/node_modules/debug/src/index.js"
"debug": "/home/alban/github/data-fair/node_modules/debug/src/index.js",
"../utils/rest-datasets": "/home/alban/github/data-fair/server/utils/rest-datasets.js",
"stream": "stream"
},
"/home/alban/github/data-fair/server/utils/es/index.js": {
"config": "/home/alban/github/data-fair/node_modules/config/lib/config.js",
Expand Down Expand Up @@ -10021,5 +10028,24 @@
},
"/home/alban/github/data-fair/node_modules/iconv-lite/encodings/dbcs-codec.js": {
"safer-buffer": "/home/alban/github/data-fair/node_modules/safer-buffer/safer.js"
},
"/home/alban/github/data-fair/test/datasets.js": {
"../server/workers": "/home/alban/github/data-fair/server/workers/index.js",
"./resources/app-notifier.js": "/home/alban/github/data-fair/test/resources/app-notifier.js"
},
"/home/alban/github/data-fair/test/applications.js": {
"nock": "/home/alban/github/data-fair/node_modules/nock/index.js"
},
"/home/alban/github/data-fair/server/utils/rest-datasets.js": {
"http-errors": "/home/alban/github/data-fair/node_modules/http-errors/index.js",
"shortid": "/home/alban/github/data-fair/node_modules/shortid/index.js",
"util": "util",
"pump": "/home/alban/github/data-fair/node_modules/pump/index.js",
"stream": "stream",
"mime-type-stream": "/home/alban/github/data-fair/node_modules/mime-type-stream/index.js",
"stream-combiner": "/home/alban/github/data-fair/node_modules/stream-combiner/index.js"
},
"/home/alban/github/data-fair/test/rest-datasets.js": {
"../server/workers": "/home/alban/github/data-fair/server/workers/index.js"
}
}
1 change: 1 addition & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module.exports = {
'no-debugger': process.env.NODE_ENV === 'production' ? 2 : 0,
// This rule is required because atom vue-format package remove the space
'space-before-function-paren': 0,
'no-new': 'off',
// Turn off vuejs rules, not because ther are not good, but mostly because
// they are very present in initial code base.. Maybe we should clean that up someday..
'vue/max-attributes-per-line': 'off',
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ RUN apk add --no-cache --virtual .build-deps cmake linux-headers boost-dev gmp g
RUN ln -s /usr/lib/libproj.so.13 /usr/lib/libproj.so

ENV NODE_ENV production
ENV DEBUG db,upgrade*
WORKDIR /webapp
ADD package.json .
ADD package-lock.json .
Expand Down
2 changes: 1 addition & 1 deletion contract/dataset-patch.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// the rest is read only fields

const dataset = require('./dataset')
const patchKeys = ['schema', 'description', 'title', 'license', 'origin', 'extensions', 'publications', 'virtual', 'extras']
const patchKeys = ['schema', 'description', 'title', 'license', 'origin', 'extensions', 'publications', 'virtual', 'rest', 'extras']
module.exports = {
title: 'Dataset patch',
type: 'object',
Expand Down
9 changes: 9 additions & 0 deletions contract/dataset.js
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,15 @@ module.exports = {
}
}
},
isRest: {
type: 'boolean',
default: false,
description: 'Used to identify REST datasets. A REST dataset is not created from a data file, but instead is based on a dynamic collection in a database.'
},
rest: {
type: 'object',
description: 'A configuration object dedicated to REST datasets.'
},
extras: {
type: 'object',
description: 'An object for extra content from client services of data-fair'
Expand Down
8 changes: 6 additions & 2 deletions server/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const debug = require('debug')('app')
debug('enter app.js')
const config = require('config')
const express = require('express')
const bodyParser = require('body-parser')
const bodyParser = require('body-parser').json({ limit: '1000kb' })
const cookieParser = require('cookie-parser')
const WebSocket = require('ws')
const http = require('http')
Expand Down Expand Up @@ -45,7 +45,11 @@ if (process.env.NODE_ENV === 'development') {
app.use('/capture', proxy({ target: 'http://localhost:8087', pathRewrite: { '^/capture': '' } }))
}

app.use(bodyParser.json({ limit: '1000kb' }))
app.use((req, res, next) => {
// routes with _bulk are streamed, others are parsed as JSON
if (req.url.split('/').pop().indexOf('_bulk') === 0) return next()
bodyParser(req, res, next)
})
app.use(cookieParser())
// In production CORS is taken care of by the reverse proxy if necessary
if (config.cors.active) app.use(require('cors')(config.cors.opts))
Expand Down
78 changes: 56 additions & 22 deletions server/routers/datasets.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const permissions = require('../utils/permissions')
const usersUtils = require('../utils/users')
const datasetUtils = require('../utils/dataset')
const virtualDatasetsUtils = require('../utils/virtual-datasets')
const restDatasetsUtils = require('../utils/rest-datasets')
const findUtils = require('../utils/find')
const asyncWrap = require('../utils/async-wrap')
const extensions = require('../utils/extensions')
Expand Down Expand Up @@ -144,6 +145,7 @@ router.patch('/:datasetId', readDataset, permissions.middleware('writeDescriptio
// Except download.. We only try it again if the fetch failed.
if (req.dataset.status === 'error') {
if (req.dataset.isVirtual) patch.status = 'indexed'
else if (req.dataset.isRest) patch.status = 'schematized'
else if (req.dataset.remoteFile && !req.dataset.originalFile) patch.status = 'imported'
else if (!baseTypes.has(req.dataset.originalFile.mimetype)) patch.status = 'uploaded'
else patch.status = 'loaded'
Expand Down Expand Up @@ -268,39 +270,41 @@ const beforeUpload = asyncWrap(async(req, res, next) => {
})
router.post('', beforeUpload, filesUtils.uploadFile(), asyncWrap(async(req, res) => {
const db = req.app.get('db')

let dataset
// After uploadFile, req.file contains the metadata of an uploaded file, and req.body the content of additional text fields
if (req.file) {
if (req.body.isVirtual) return res.status(400).send('Un jeu de données virtuel ne peut pas être initialisé avec un fichier')
dataset = await setFileInfo(req.file, initNew(req))
await db.collection('datasets').insertOne(dataset)
} else {
if (!req.body.isVirtual) return res.status(400).send('Un jeu de données doit être initialisé avec un fichier ou déclaré "virtuel"')
} else if (req.body.isVirtual) {
if (!req.body.title) return res.status(400).send('Un jeu de données virtuel doit être créé avec un titre')
const { isVirtual, ...patch } = req.body
if (!validatePatch(patch)) {
console.log(validatePatch.errors)
return res.status(400).send(ajvErrorMessages(validatePatch.errors))
}
dataset = initNew(req)
dataset.virtual = dataset.virtual || { children: [] }
dataset.schema = await virtualDatasetsUtils.prepareSchema(db, dataset)
dataset.status = 'indexed'
// Generate ids and try insertion until there is no conflict on id
const baseId = slug(req.body.title)
dataset.id = baseId
let insertOk = false
let i = 1
while (!insertOk) {
try {
await req.app.get('db').collection('datasets').insertOne(dataset)
insertOk = true
} catch (err) {
if (err.code !== 11000) throw err
i += 1
dataset.id = `${baseId}-${i}`
}
await datasetUtils.insertWithBaseId(db, dataset, baseId)
} else if (req.body.isRest) {
if (!req.body.title) return res.status(400).send('Un jeu de données REST doit être créé avec un titre')
const { isRest, ...patch } = req.body
if (!validatePatch(patch)) {
return res.status(400).send(ajvErrorMessages(validatePatch.errors))
}
dataset = initNew(req)
dataset.rest = dataset.rest || {}
dataset.schema = dataset.schema || []
restDatasetsUtils.prepareSchema(dataset.schema)
dataset.status = 'schematized'
const baseId = slug(req.body.title)
await datasetUtils.insertWithBaseId(db, dataset, baseId)
await restDatasetsUtils.initDataset(db, dataset)
} else {
return res.status(400).send('Un jeu de données doit être initialisé avec un fichier ou déclaré "virtuel" ou "REST"')
}

delete dataset._id
Expand Down Expand Up @@ -329,22 +333,33 @@ const attemptInsert = asyncWrap(async(req, res, next) => {
const updateDataset = asyncWrap(async(req, res) => {
const db = req.app.get('db')
// After uploadFile, req.file contains the metadata of an uploaded file, and req.body the content of additional text fields
if (!req.file && !req.dataset.isVirtual) return res.status(400).send('Un jeu de données doit être initialisé avec un fichier ou déclaré "virtuel"')
if (req.file && req.dataset.isVirtual) return res.status(400).send('Un jeu de données est soit initialisé avec un fichier soit déclaré "virtuel"')
if (!req.file && !req.dataset.isVirtual && !req.dataset.isRest) return res.status(400).send('Un jeu de données doit être initialisé avec un fichier ou déclaré "virtuel"')
if (req.file && (req.dataset.isVirtual || req.dataset.isRest)) return res.status(400).send('Un jeu de données est soit initialisé avec un fichier soit déclaré "virtuel"')
if (req.dataset.isVirtual && !req.dataset.title) return res.status(400).send('Un jeu de données virtuel doit être créé avec un titre')
if (req.dataset.isRest && !req.dataset.title) return res.status(400).send('Un jeu de données REST doit être créé avec un titre')
if (req.dataset.status && !acceptedStatuses.includes(req.dataset.status)) return res.status(409).send('Dataset is not in proper state to be updated')

let dataset = req.dataset
if (req.file) {
dataset = await setFileInfo(req.file, req.dataset)
} else {
} else if (dataset.isVirtual) {
const { isVirtual, ...patch } = req.body
if (!validatePatch(patch)) {
return res.status(400).send(ajvErrorMessages(validatePatch.errors))
return res.status(400).send(validatePatch.errors)
}
req.body.virtual = req.body.virtual || { children: [] }
req.body.schema = await virtualDatasetsUtils.prepareSchema(db, { ...dataset, ...req.body })
req.body.status = 'indexed'
} else if (dataset.isRest) {
const { isRest, ...patch } = req.body
if (!validatePatch(patch)) {
return res.status(400).send(validatePatch.errors)
}
req.body.rest = req.body.rest || {}
dataset.schema = dataset.schema || []
restDatasetsUtils.prepareSchema(dataset.schema)
await restDatasetsUtils.initDataset(db, dataset)
dataset.status = 'schematized'
}
Object.assign(dataset, req.body)

Expand All @@ -359,6 +374,21 @@ const updateDataset = asyncWrap(async(req, res) => {
router.post('/:datasetId', attemptInsert, readDataset, permissions.middleware('writeData', 'write'), filesUtils.uploadFile(), updateDataset)
router.put('/:datasetId', attemptInsert, readDataset, permissions.middleware('writeData', 'write'), filesUtils.uploadFile(), updateDataset)

// CRUD operations for REST datasets
function isRest(req, res, next) {
if (!req.dataset.isRest) {
return res.status(501)
.send('Les opérations de modifications sur les lignes sont uniquement accessibles pour les jeux de données de type REST.')
}
next()
}
router.post('/:datasetId/lines', readDataset, isRest, permissions.middleware('createLine', 'write'), asyncWrap(restDatasetsUtils.createLine))
router.get('/:datasetId/lines/:lineId', readDataset, isRest, permissions.middleware('readLine', 'read'), asyncWrap(restDatasetsUtils.readLine))
router.put('/:datasetId/lines/:lineId', readDataset, isRest, permissions.middleware('updateLine', 'write'), asyncWrap(restDatasetsUtils.updateLine))
router.delete('/:datasetId/lines/:lineId', readDataset, isRest, permissions.middleware('deleteLine', 'write'), asyncWrap(restDatasetsUtils.deleteLine))
router.patch('/:datasetId/lines/:lineId', readDataset, isRest, permissions.middleware('patchLine', 'write'), asyncWrap(restDatasetsUtils.patchLine))
router.post('/:datasetId/_bulk_lines', readDataset, isRest, permissions.middleware('bulkLines', 'write'), asyncWrap(restDatasetsUtils.bulkLines))

// Set max-age
// Also compare last-modified and if-modified-since headers for cache revalidation
// only send data if the dataset was finalized since then
Expand Down Expand Up @@ -539,13 +569,17 @@ router.get('/:datasetId/convert', readDataset, permissions.middleware('downloadO

// Download the full dataset with extensions
router.get('/:datasetId/full', readDataset, permissions.middleware('downloadFullData', 'read'), asyncWrap(async (req, res, next) => {
const db = req.app.get('db')
if (req.dataset.isVirtual) req.dataset.descendants = await virtualDatasetsUtils.descendants(req.app.get('db'), req.dataset)
res.setHeader('Content-disposition', 'attachment; filename=' + req.dataset.file.name)
res.setHeader('Content-type', 'text/csv')
const relevantSchema = req.dataset.schema.filter(f => f.key.startsWith('_ext_') || !f.key.startsWith('_'))
let readStream
if (req.dataset.isRest) readStream = restDatasetsUtils.readStream(db, req.dataset)
else readStream = datasetUtils.readStream(req.dataset)
await pump(
datasetUtils.readStream(req.dataset),
extensions.preserveExtensionStream({ db: req.app.get('db'), esClient: req.app.get('es'), dataset: req.dataset }),
readStream,
extensions.preserveExtensionStream({ db, esClient: req.app.get('es'), dataset: req.dataset }),
new Transform({ transform(chunk, encoding, callback) {
const flatChunk = flatten(chunk)
callback(null, relevantSchema.map(field => flatChunk[field.key]))
Expand Down
4 changes: 2 additions & 2 deletions server/routers/remote-services.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ router.use('/:remoteServiceId/proxy*', readService, (req, res, next) => { req.ap
try {
await nbLimiter.consume(ip, 1)
} catch (err) {
console.log('nbLimiter error', err)
// console.log('nbLimiter error', err)
return res.status(429).send('Trop de requêtes dans un interval restreint pour cette exposition de service.')
}
kbLimiter = kbLimiter || new RateLimiterMongo({
Expand All @@ -268,7 +268,7 @@ router.use('/:remoteServiceId/proxy*', readService, (req, res, next) => { req.ap
try {
await kbLimiter.consume(ip, 1)
} catch (err) {
console.log('kbLimiter error', err)
// console.log('kbLimiter error', err)
return res.status(429).send('Trop de traffic dans un interval restreint pour cette exposition de service.')
}

Expand Down
21 changes: 21 additions & 0 deletions server/utils/dataset.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const dir = require('node-dir')
const axios = require('axios')
const fieldsSniffer = require('./fields-sniffer')
const geoUtils = require('./geo')
const restDatasetsUtils = require('./rest-datasets')

const baseTypes = new Set(['text/csv', 'application/geo+json'])

Expand Down Expand Up @@ -57,6 +58,8 @@ exports.lsFiles = async (dataset) => {

// Read the dataset file and get a stream of line items
exports.readStream = (dataset) => {
if (dataset.isRest) return restDatasetsUtils.readStream(dataset)

let parser, transformer
if (dataset.file.mimetype === 'text/csv') {
// use result from csv-sniffer to configure parser
Expand Down Expand Up @@ -170,8 +173,26 @@ exports.extendedSchema = (dataset) => {
exports.reindex = async (db, dataset) => {
const patch = { status: 'loaded' }
if (dataset.isVirtual) patch.status = 'indexed'
else if (dataset.isRest) patch.status = 'schematized'
else if (dataset.originalFile && !baseTypes.has(dataset.originalFile.mimetype)) patch.status = 'uploaded'
await db.collection('datasets').updateOne({ id: dataset.id }, { '$set': patch })
return (await db.collection('datasets')
.findOneAndUpdate({ id: dataset.id }, { '$set': patch }, { returnOriginal: false })).value
}

// Generate ids and try insertion until there is no conflict on id
exports.insertWithBaseId = async (db, dataset, baseId) => {
dataset.id = baseId
let insertOk = false
let i = 1
while (!insertOk) {
try {
await db.collection('datasets').insertOne(dataset)
insertOk = true
} catch (err) {
if (err.code !== 11000) throw err
i += 1
dataset.id = `${baseId}-${i}`
}
}
}
4 changes: 2 additions & 2 deletions server/utils/db.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// TODO add ensureIndex instructions to init logic.

const debug = require('debug')('db')
const config = require('config')
const { MongoClient } = require('mongodb')

Expand All @@ -13,7 +13,7 @@ async function ensureIndex(db, collection, key, options) {

exports.connect = async () => {
let client
console.log('Connecting to mongodb ' + config.mongoUrl)
debug('Connecting to mongodb ' + config.mongoUrl)
try {
client = await MongoClient.connect(config.mongoUrl, { useNewUrlParser: true })
} catch (err) {
Expand Down
Loading

0 comments on commit 6474fec

Please sign in to comment.