Skip to content

Commit

Permalink
Merge pull request #152 from mediachain/yn-aleph-merge
Browse files Browse the repository at this point in the history
Merge from concat -> aleph nodes
  • Loading branch information
parkan committed Jan 11, 2017
2 parents 0ffc5de + fe46bd6 commit 1b8aa89
Show file tree
Hide file tree
Showing 16 changed files with 666 additions and 153 deletions.
86 changes: 86 additions & 0 deletions integration-test/merge_test.js
@@ -0,0 +1,86 @@
// @flow
/* eslint-env mocha */

const assert = require('assert')
const { describe, it, before, after } = require('mocha')
const uuid = require('node-uuid')

const { getTestNodeId } = require('../test/util')
const { MediachainNode: AlephNode } = require('../src/peer/node')
const { concatNodeClient, concatNodePeerInfo } = require('./util')

const TEST_NAMESPACE = 'scratch.merge-test'
const INVALID_STATEMENT_NAMESPACE = 'scratch.merge-test.invalid-stmt'

const seedObjects = [
{id: uuid.v4(), hello: 'world'},
{id: uuid.v4(), foo: 'bar'},
{id: uuid.v4(), etc: 'and so on'}
]

describe('Merge (concat -> aleph)', () => {
let objectIds
let seedStatements
let concatClient
let concatPeerInfo

before(() => {
return concatNodeClient()
.then(_client => { concatClient = _client })
.then(() => concatNodePeerInfo())
.then(_pInfo => { concatPeerInfo = _pInfo })
.then(() => concatClient.setStatus('online'))
.then(() => concatClient.putData(...seedObjects))
.then(_objectIds => { objectIds = _objectIds })
.then(() => {
seedStatements = objectIds.map((object, idx) => ({
object,
refs: [`test:obj:${idx.toString()}`],
tags: ['test'],
deps: []
}))
return concatClient.publish({namespace: TEST_NAMESPACE}, ...seedStatements)
})
.then(() =>
// add a statement with a reference to a non-existent object
concatClient.publish({namespace: INVALID_STATEMENT_NAMESPACE}, {
object: 'QmNLftPEMzsadpbTsGaVP3haETYJb4GfnCgQiaFj5Red9G',
refs: ['test:invalid:ref'],
tags: [],
deps: []
}))
})

after(() =>
concatClient.delete(`DELETE FROM ${TEST_NAMESPACE}`)
.then(() => concatClient.delete(`DELETE FROM ${INVALID_STATEMENT_NAMESPACE}`))
)

it('merges statements from a concat node', () => {
let alephNode
return getTestNodeId().then(peerId => { alephNode = new AlephNode({ peerId }) })
.then(() => alephNode.start())
.then(() => alephNode.merge(concatPeerInfo, `SELECT * FROM ${TEST_NAMESPACE}`))
.then(results => {
assert.notEqual(results, null, 'merge did not return a result')
assert.equal(results.statementCount, seedStatements.length, 'aleph node merged an unexpected number of statements')
assert.equal(results.objectCount, objectIds.length, 'aleph node merged an unexpected number of objects')
})
})

it('returns counts + error message for partially successful merge', () => {
let alephNode
return getTestNodeId()
.then(peerId => { alephNode = new AlephNode({ peerId }) })
.then(() => alephNode.start())
.then(() => alephNode.merge(concatPeerInfo, `SELECT * FROM ${TEST_NAMESPACE}.* ORDER BY counter`))
.catch(err => {
assert.fail(err, 'no error', '', '!==')
})
.then(result => {
assert.notEqual(result, null, 'partially-successful merge should return a result')
assert(typeof result.error === 'string' && result.error.length > 0,
'partially successful merge should return an error message')
})
})
})
40 changes: 25 additions & 15 deletions integration-test/push_test.js
Expand Up @@ -2,53 +2,58 @@
/* eslint-env mocha */

const assert = require('assert')
const { describe, it, before } = require('mocha')
const { PromiseHash } = require('../src/common/util')
const { describe, it, before, after } = require('mocha')
const uuid = require('node-uuid')
const { promiseHash } = require('../src/common/util')

const { getTestNodeId } = require('../test/util')
const { MediachainNode: AlephNode } = require('../src/peer/node')
const { concatNodeClient, concatNodePeerInfo } = require('./util')
const { generatePublisherId } = require('../src/peer/identity')
const { makeSimpleStatement } = require('../src/metadata/statement')

const TEST_NAMESPACE = 'scratch.push-test'
const UNAUTHORIZED_NAMESPACE = 'scratch.unauthorized-push-test'

const seedObjects = [
{id: 'foo:1', foo: 'bar'},
{id: 'foo:2', foo: 'baz'}
{id: uuid.v4(), foo: 'bar'},
{id: uuid.v4(), foo: 'baz'}
]

function seedStatementsToAleph (alephNode: AlephNode): Promise<Array<string>> {
return Promise.all(
seedObjects.map(obj =>
alephNode.ingestSimpleStatement('scratch.test', obj, { refs: [obj.id] })
alephNode.ingestSimpleStatement(TEST_NAMESPACE, obj, { refs: [obj.id] })
)
)
}

function seedUnauthorizedStatement (alephNode: AlephNode): Promise<string> {
const obj = {letMeIn: 'please'}
return alephNode.ingestSimpleStatement('members.only', obj, { refs: ['foo'] })
return alephNode.ingestSimpleStatement(UNAUTHORIZED_NAMESPACE, obj, { refs: ['foo'] })
}

function preparePartiallyValidStatements (alephNode: AlephNode, numValid: number): Promise<Array<Object>> {
return alephNode.putData({hello: 'world'})
.then(([object]) => {
const promises = []
for (let i = 0; i < numValid; i++) {
promises.push(alephNode.makeStatement('scratch.test', {simple: {
promises.push(makeSimpleStatement(alephNode.publisherId, TEST_NAMESPACE, {
object,
refs: [`test:${i.toString()}`],
deps: [],
tags: []
}}))
refs: [`test:${i.toString()}`]
},
alephNode.statementCounter))
}
// add a statement with an invalid object reference
promises.push(alephNode.makeStatement('scratch.test', {simple: {
promises.push(makeSimpleStatement(alephNode.publisherId, TEST_NAMESPACE, {
object: 'QmNLftPEMzsadpbTsGaVP3haETYJb4GfnCgQiaFj5Red9G', refs: [], deps: [], tags: []
}}))
}))
return Promise.all(promises)
})
}

describe('Push', () => {
let concatClient
let alephNode
let alephPeerIdB58
let publisherId
Expand All @@ -68,7 +73,12 @@ describe('Push', () => {
.then(() => seedUnauthorizedStatement(alephNode))
.then(_stmtId => { unauthorizedStatementId = _stmtId })
.then(() => concatNodeClient())
.then(concat => concat.authorize(alephPeerIdB58, ['scratch.*']))
.then(client => { concatClient = client })
.then(() => concatClient.authorize(alephPeerIdB58, [TEST_NAMESPACE]))
)

after(() =>
concatClient.delete(`DELETE FROM ${TEST_NAMESPACE}`)
)

it('pushes data to a concat node', () => {
Expand Down Expand Up @@ -96,7 +106,7 @@ describe('Push', () => {
it('returns counts + error message for partially successful push', () => {
const numValid = 10
return alephNode.start()
.then(() => PromiseHash({
.then(() => promiseHash({
pInfo: concatNodePeerInfo(),
statements: preparePartiallyValidStatements(alephNode, numValid)
}))
Expand Down
15 changes: 10 additions & 5 deletions integration-test/query_test.js
Expand Up @@ -8,28 +8,33 @@ const { getTestNodeId } = require('../test/util')
const { MediachainNode: AlephNode } = require('../src/peer/node')
const { concatNodeClient, concatNodePeerInfo } = require('./util')

const TEST_NAMESPACE = 'scratch.query-test'

const seedStatements = [
{object: 'QmF00123', tags: [], refs: [], deps: []},
{object: 'QmF00456', tags: ['foo'], refs: [], deps: []},
{object: 'QmFoo789', refs: ['bar'], tags: ['foo'], deps: []}
]

describe('Query', () => {
let concatClient
before(() =>
concatNodeClient()
.then(client => client.publish({namespace: 'foo.bar'}, ...seedStatements))
.then(client => { concatClient = client })
.then(() => concatClient.setStatus('online'))
.then(() => concatClient.publish({namespace: TEST_NAMESPACE}, ...seedStatements))
)

after(() => {
return concatNodeClient().then(client => client.delete('DELETE FROM foo.bar'))
})
after(() =>
concatClient.delete(`DELETE FROM ${TEST_NAMESPACE}`)
)

it('queries a remote concat node from aleph node', () => {
let alephNode
return getTestNodeId().then(peerId => { alephNode = new AlephNode({ peerId }) })
.then(() => alephNode.start())
.then(() => concatNodePeerInfo())
.then(concatInfo => alephNode.remoteQuery(concatInfo, 'SELECT * FROM foo.bar ORDER BY counter'))
.then(concatInfo => alephNode.remoteQuery(concatInfo, `SELECT * FROM ${TEST_NAMESPACE} ORDER BY counter`))
.then(results => {
assert(results != null && results.length > 0, 'query returned no results')

Expand Down
9 changes: 6 additions & 3 deletions integration-test/remote_data_test.js
Expand Up @@ -3,22 +3,25 @@

const assert = require('assert')
const { describe, it, before } = require('mocha')
const uuid = require('node-uuid')

const { getTestNodeId } = require('../test/util')
const { MediachainNode: AlephNode } = require('../src/peer/node')
const { concatNodeClient, concatNodePeerInfo } = require('./util')

const seedObjects = [
{foo: 'bar'},
{hello: 'world'}
{id: uuid.v4(), foo: 'bar'},
{id: uuid.v4(), hello: 'world'}
]

describe('Remote Data Fetching', () => {
let dataIds = []
let concatClient

before(() => {
return concatNodeClient()
.then(client => client.putData(...seedObjects))
.then(client => { concatClient = client })
.then(() => concatClient.putData(...seedObjects))
.then(ids => { dataIds = ids })
})

Expand Down
3 changes: 3 additions & 0 deletions package.json
Expand Up @@ -64,7 +64,10 @@
"pull-abortable": "^4.1.0",
"pull-length-prefixed": "^1.2.0",
"pull-paramap": "^1.2.1",
"pull-promise": "^2.0.0",
"pull-pushable": "https://registry.npmjs.org/pull-pushable/-/pull-pushable-2.0.1.tgz",
"pull-stream": "3.5.0",
"pull-window": "^2.1.4",
"sqlite3": "^3.1.8",
"thenify": "https://registry.npmjs.org/thenify/-/thenify-3.2.1.tgz",
"thenify-all": "^1.6.0",
Expand Down
43 changes: 1 addition & 42 deletions src/client/cli/util.js
Expand Up @@ -2,13 +2,11 @@

const { clone, set } = require('lodash')
const fs = require('fs')
const Multihash = require('multihashes')
const { JQ_PATH } = require('../../metadata/jqStream')
const childProcess = require('child_process')
const sshTunnel = require('tunnel-ssh')
const { RestClient } = require('../api')
import type { Writable } from 'stream'
import type { WriteStream } from 'tty'
const { println, printlnErr, isB58Multihash } = require('../../common/util')

function formatJSON (obj: ?mixed,
options: {color?: ?boolean, pretty?: boolean} = {}): string {
Expand Down Expand Up @@ -44,15 +42,6 @@ function pluralizeCount (count: number, word: string): string {
return count.toString() + ' ' + plural
}

function isB58Multihash (str: string): boolean {
try {
Multihash.fromB58String(str)
return true
} catch (err) {
return false
}
}

function setupSSHTunnel (config: Object): Promise<Object> {
return new Promise((resolve, reject) => {
sshTunnel(config, (err, server) => {
Expand Down Expand Up @@ -146,36 +135,6 @@ function subcommand<T: SubcommandGlobalOptions> (handler: (argv: T) => Promise<*
}
}

/**
* Print `output` to the `destination` stream and append a newline.
* @param output
* @param destination
*/
function writeln (output: string, destination: Writable | WriteStream) {
destination.write(output + '\n')
}

/**
* Print `output` to stdout and append a newline.
* Always use this instead of console.log for non-debug output!
* console.log keeps a strong reference to whatever you pass in,
* which can result in memory leaks for long-running processes.
* @param output
*/
function println (output: string) {
writeln(output, process.stdout)
}

/**
* Print `output` to stderr and append a newline.
* Use if you don't want console.error to keep a strong reference
* to whatever you pass in.
* @param output
*/
function printlnErr (output: string) {
writeln(output, process.stderr)
}

module.exports = {
println,
printlnErr,
Expand Down

0 comments on commit 1b8aa89

Please sign in to comment.