Skip to content

Commit

Permalink
[import] Support importing from folder
Browse files Browse the repository at this point in the history
  • Loading branch information
rexxars committed Apr 9, 2018
1 parent c21ccd3 commit 97564ec
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 87 deletions.
20 changes: 15 additions & 5 deletions packages/@sanity/import/src/import.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,20 @@ const importers = {
fromArray
}

module.exports = (input, opts) => {
const options = validateOptions(input, opts)
module.exports = async (input, opts) => {
const options = await validateOptions(input, opts)

return Array.isArray(input)
? fromArray(input, options, importers)
: fromStream(input, options, importers)
if (typeof input.pipe === 'function') {
return fromStream(input, options, importers)
}

if (Array.isArray(input)) {
return fromArray(input, options, importers)
}

if (typeof input === 'string') {
return fromFolder(input, options, importers)
}

throw new Error('Stream does not seem to be a readable stream, an array or a path to a directory')
}
44 changes: 41 additions & 3 deletions packages/@sanity/import/src/importFromStream.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const os = require('os')
const fs = require('fs')
const path = require('path')
const miss = require('mississippi')
const gunzipMaybe = require('gunzip-maybe')
Expand All @@ -7,6 +8,7 @@ const isTar = require('is-tar')
const tar = require('tar-fs')
const globby = require('globby')
const debug = require('debug')('sanity:import:stream')
const {noop} = require('lodash')
const getJsonStreamer = require('./util/getJsonStreamer')

module.exports = (stream, options, importers) =>
Expand All @@ -17,7 +19,8 @@ module.exports = (stream, options, importers) =>
let isTarStream = false
let jsonDocuments

miss.pipe(stream, gunzipMaybe(), untarMaybe(), err => {
const uncompressStream = miss.pipeline(gunzipMaybe(), untarMaybe())
miss.pipe(stream, uncompressStream, err => {
if (err) {
reject(err)
return
Expand All @@ -39,8 +42,14 @@ module.exports = (stream, options, importers) =>
}

debug('Stream is an ndjson file, streaming JSON')
const ndjsonStream = miss.pipeline(getJsonStreamer(), miss.concat(resolveNdjsonStream))
ndjsonStream.on('error', reject)
const jsonStreamer = getJsonStreamer()
const concatter = miss.concat(resolveNdjsonStream)
const ndjsonStream = miss.pipeline(jsonStreamer, concatter)
ndjsonStream.on('error', err => {
uncompressStream.emit('error', err)
destroy([uncompressStream, jsonStreamer, concatter, ndjsonStream])
reject(err)
})
return swap(null, ndjsonStream)
})
}
Expand All @@ -63,3 +72,32 @@ module.exports = (stream, options, importers) =>
resolve(importers.fromFolder(importBaseDir, {...options, deleteOnComplete: true}, importers))
}
})

function destroy(streams) {
streams.forEach(stream => {
if (isFS(stream)) {
// use close for fs streams to avoid fd leaks
stream.close(noop)
} else if (isRequest(stream)) {
// request.destroy just do .end - .abort is what we want
stream.abort()
} else if (isFn(stream.destroy)) {
stream.destroy()
}
})
}

function isFn(fn) {
return typeof fn === 'function'
}

function isFS(stream) {
return (
(stream instanceof (fs.ReadStream || noop) || stream instanceof (fs.WriteStream || noop)) &&
isFn(stream.close)
)
}

function isRequest(stream) {
return stream.setHeader && isFn(stream.abort)
}
1 change: 0 additions & 1 deletion packages/@sanity/import/src/uploadAssets.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const basename = require('path').basename
const parseUrl = require('url').parse
const crypto = require('crypto')
const debug = require('debug')('sanity:import')
const pMap = require('p-map')
const progressStepper = require('./util/progressStepper')
Expand Down
38 changes: 20 additions & 18 deletions packages/@sanity/import/src/util/getHashedBufferForUri.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,30 @@ const miss = require('mississippi')
const getUri = require('@rexxars/get-uri')

module.exports = function getHashedBufferForUri(uri) {
return new Promise(async (resolve, reject) => {
const stream = await getStream(uri)
const hash = crypto.createHash('sha1')
const chunks = []
return getStream(uri).then(
stream =>
new Promise((resolve, reject) => {
const hash = crypto.createHash('sha1')
const chunks = []

stream.on('data', chunk => {
chunks.push(chunk)
hash.update(chunk)
})
stream.on('data', chunk => {
chunks.push(chunk)
hash.update(chunk)
})

miss.finished(stream, err => {
if (err) {
reject(err)
return
}
miss.finished(stream, err => {
if (err) {
reject(err)
return
}

resolve({
buffer: Buffer.concat(chunks),
sha1hash: hash.digest('hex')
resolve({
buffer: Buffer.concat(chunks),
sha1hash: hash.digest('hex')
})
})
})
})
})
)
}

function getStream(uri) {
Expand Down
3 changes: 2 additions & 1 deletion packages/@sanity/import/src/util/getJsonStreamer.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ module.exports = function getJsonStreamer() {

return doc
} catch (err) {
this.emit('error', new Error(getErrorMessage(err)))
const errorMessage = getErrorMessage(err)
this.emit('error', new Error(errorMessage))
}

return undefined
Expand Down
37 changes: 35 additions & 2 deletions packages/@sanity/import/src/validateOptions.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const fse = require('fs-extra')
const noop = require('lodash/noop')
const defaults = require('lodash/defaults')

Expand All @@ -11,8 +12,10 @@ function validateOptions(input, opts) {
onProgress: noop
})

if (!input || (typeof input.pipe !== 'function' && !Array.isArray(input))) {
throw new Error('Stream does not seem to be a readable stream or an array')
if (!isValidInput(input)) {
throw new Error(
'Stream does not seem to be a readable stream, an array or a path to a directory'
)
}

if (!options.client) {
Expand All @@ -39,4 +42,34 @@ function validateOptions(input, opts) {
return options
}

function isValidInput(input) {
if (!input) {
return false
}

if (typeof input.pipe === 'function') {
return true
}

if (Array.isArray(input)) {
return true
}

if (typeof input === 'string' && isDirectory(input)) {
return true
}

return false
}

function isDirectory(path) {
try {
// eslint-disable-next-line no-sync
const stats = fse.statSync(path)
return stats.isDirectory()
} catch (err) {
return false
}
}

module.exports = validateOptions
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`employee creation 1`] = `
exports[`accepts a stream as source: employee creation 1`] = `
Object {
"mutations": Array [
Object {
Expand All @@ -21,7 +21,7 @@ Object {
}
`;

exports[`employee creation 2`] = `
exports[`accepts an array as source: employee creation 1`] = `
Object {
"mutations": Array [
Object {
Expand Down
100 changes: 50 additions & 50 deletions packages/@sanity/import/test/__snapshots__/uploadAssets.test.js.snap
Original file line number Diff line number Diff line change
@@ -1,6 +1,54 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`batch patching (batch #1) 1`] = `
exports[`fails if asset download fails 1`] = `
[Error: Error while fetching asset from "http://127.0.0.1:49999/img.gif":
connect ECONNREFUSED 127.0.0.1:49999]
`;

exports[`fails if asset lookup fails 1`] = `
[Error: Error while attempt to query Sanity API:
Some network err]
`;

exports[`will reuse an existing asset if it exists: single asset mutation 1`] = `
Object {
"mutations": Array [
Object {
"patch": Object {
"id": "movie_1",
"set": Object {
"metadata.poster._type": "someAssetI",
"metadata.poster.asset": Object {
"_ref": "someAssetId",
"_type": "reference",
},
},
},
},
],
}
`;

exports[`will upload asset that do not already exist: single create mutation 1`] = `
Object {
"mutations": Array [
Object {
"patch": Object {
"id": "movie_1",
"set": Object {
"metadata.poster._type": "newAssetI",
"metadata.poster.asset": Object {
"_ref": "newAssetId",
"_type": "reference",
},
},
},
},
],
}
`;

exports[`will upload once but batch patches: batch patching (batch #1) 1`] = `
Object {
"mutations": Array [
Object {
Expand Down Expand Up @@ -607,7 +655,7 @@ Object {
}
`;

exports[`batch patching (batch #2) 1`] = `
exports[`will upload once but batch patches: batch patching (batch #2) 1`] = `
Object {
"mutations": Array [
Object {
Expand Down Expand Up @@ -733,51 +781,3 @@ Object {
],
}
`;

exports[`fails if asset download fails 1`] = `
[Error: Error while fetching asset from "http://127.0.0.1:49999/img.gif":
connect ECONNREFUSED 127.0.0.1:49999]
`;

exports[`fails if asset lookup fails 1`] = `
[Error: Error while attempt to query Sanity API:
Some network err]
`;

exports[`single asset mutation 1`] = `
Object {
"mutations": Array [
Object {
"patch": Object {
"id": "movie_1",
"set": Object {
"metadata.poster._type": "someAssetI",
"metadata.poster.asset": Object {
"_ref": "someAssetId",
"_type": "reference",
},
},
},
},
],
}
`;

exports[`single create mutation 1`] = `
Object {
"mutations": Array [
Object {
"patch": Object {
"id": "movie_1",
"set": Object {
"metadata.poster._type": "newAssetI",
"metadata.poster.asset": Object {
"_ref": "newAssetId",
"_type": "reference",
},
},
},
},
],
}
`;
5 changes: 5 additions & 0 deletions packages/@sanity/import/test/helpers/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ const noop = require('lodash/noop')
const sanityClient = require('@sanity/client')
const {injectResponse} = require('get-it/middleware')

process.on('unhandledRejection', reason => {
// eslint-disable-next-line no-console
console.error('UNHANDLED REJECTION', reason)
})

const defaultClientOptions = {
projectId: 'foo',
dataset: 'bar',
Expand Down

0 comments on commit 97564ec

Please sign in to comment.