Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge from concat -> aleph nodes #152

Merged
merged 12 commits into from Jan 11, 2017
86 changes: 86 additions & 0 deletions integration-test/merge_test.js
@@ -0,0 +1,86 @@
// @flow
/* eslint-env mocha */

const assert = require('assert')
const { describe, it, before, after } = require('mocha')
const uuid = require('node-uuid')

const { getTestNodeId } = require('../test/util')
const { MediachainNode: AlephNode } = require('../src/peer/node')
const { concatNodeClient, concatNodePeerInfo } = require('./util')

const TEST_NAMESPACE = 'scratch.merge-test'
const INVALID_STATEMENT_NAMESPACE = 'scratch.merge-test.invalid-stmt'

const seedObjects = [
{id: uuid.v4(), hello: 'world'},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, was there a specific reason for doing this? Are the tests noticeably faster this way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to make sure that data from one test wouldn't "contaminate" the results for a subsequent test, since I'm asserting on e.g. the number of objects merged, and if I unthinkingly reuse the same {foo: "bar"} object in multiple tests, I might get the wrong result if the node already had it in the store from a previous test.

I tried doing a garbage collection pass on the datastore after each test, but you have to take the node offline first, and I was getting weird connection reset errors. So I figured I'd just make sure that all the data objects had a random component, so there's no chance of a duplicate object throwing things off.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, that makes sense

{id: uuid.v4(), foo: 'bar'},
{id: uuid.v4(), etc: 'and so on'}
]

describe('Merge (concat -> aleph)', () => {
let objectIds
let seedStatements
let concatClient
let concatPeerInfo

before(() => {
return concatNodeClient()
.then(_client => { concatClient = _client })
.then(() => concatNodePeerInfo())
.then(_pInfo => { concatPeerInfo = _pInfo })
.then(() => concatClient.setStatus('online'))
.then(() => concatClient.putData(...seedObjects))
.then(_objectIds => { objectIds = _objectIds })
.then(() => {
seedStatements = objectIds.map((object, idx) => ({
object,
refs: [`test:obj:${idx.toString()}`],
tags: ['test'],
deps: []
}))
return concatClient.publish({namespace: TEST_NAMESPACE}, ...seedStatements)
})
.then(() =>
// add a statement with a reference to a non-existent object
concatClient.publish({namespace: INVALID_STATEMENT_NAMESPACE}, {
object: 'QmNLftPEMzsadpbTsGaVP3haETYJb4GfnCgQiaFj5Red9G',
refs: ['test:invalid:ref'],
tags: [],
deps: []
}))
})

after(() =>
concatClient.delete(`DELETE FROM ${TEST_NAMESPACE}`)
.then(() => concatClient.delete(`DELETE FROM ${INVALID_STATEMENT_NAMESPACE}`))
)

it('merges statements from a concat node', () => {
let alephNode
return getTestNodeId().then(peerId => { alephNode = new AlephNode({ peerId }) })
.then(() => alephNode.start())
.then(() => alephNode.merge(concatPeerInfo, `SELECT * FROM ${TEST_NAMESPACE}`))
.then(results => {
assert.notEqual(results, null, 'merge did not return a result')
assert.equal(results.statementCount, seedStatements.length, 'aleph node merged an unexpected number of statements')
assert.equal(results.objectCount, objectIds.length, 'aleph node merged an unexpected number of objects')
})
})

it('returns counts + error message for partially successful merge', () => {
let alephNode
return getTestNodeId()
.then(peerId => { alephNode = new AlephNode({ peerId }) })
.then(() => alephNode.start())
.then(() => alephNode.merge(concatPeerInfo, `SELECT * FROM ${TEST_NAMESPACE}.* ORDER BY counter`))
.catch(err => {
assert.fail(err, 'no error', '', '!==')
})
.then(result => {
assert.notEqual(result, null, 'partially-successful merge should return a result')
assert(typeof result.error === 'string' && result.error.length > 0,
'partially successful merge should return an error message')
})
})
})
40 changes: 25 additions & 15 deletions integration-test/push_test.js
Expand Up @@ -2,53 +2,58 @@
/* eslint-env mocha */

const assert = require('assert')
const { describe, it, before } = require('mocha')
const { PromiseHash } = require('../src/common/util')
const { describe, it, before, after } = require('mocha')
const uuid = require('node-uuid')
const { promiseHash } = require('../src/common/util')

const { getTestNodeId } = require('../test/util')
const { MediachainNode: AlephNode } = require('../src/peer/node')
const { concatNodeClient, concatNodePeerInfo } = require('./util')
const { generatePublisherId } = require('../src/peer/identity')
const { makeSimpleStatement } = require('../src/metadata/statement')

const TEST_NAMESPACE = 'scratch.push-test'
const UNAUTHORIZED_NAMESPACE = 'scratch.unauthorized-push-test'

const seedObjects = [
{id: 'foo:1', foo: 'bar'},
{id: 'foo:2', foo: 'baz'}
{id: uuid.v4(), foo: 'bar'},
{id: uuid.v4(), foo: 'baz'}
]

function seedStatementsToAleph (alephNode: AlephNode): Promise<Array<string>> {
return Promise.all(
seedObjects.map(obj =>
alephNode.ingestSimpleStatement('scratch.test', obj, { refs: [obj.id] })
alephNode.ingestSimpleStatement(TEST_NAMESPACE, obj, { refs: [obj.id] })
)
)
}

function seedUnauthorizedStatement (alephNode: AlephNode): Promise<string> {
const obj = {letMeIn: 'please'}
return alephNode.ingestSimpleStatement('members.only', obj, { refs: ['foo'] })
return alephNode.ingestSimpleStatement(UNAUTHORIZED_NAMESPACE, obj, { refs: ['foo'] })
}

function preparePartiallyValidStatements (alephNode: AlephNode, numValid: number): Promise<Array<Object>> {
return alephNode.putData({hello: 'world'})
.then(([object]) => {
const promises = []
for (let i = 0; i < numValid; i++) {
promises.push(alephNode.makeStatement('scratch.test', {simple: {
promises.push(makeSimpleStatement(alephNode.publisherId, TEST_NAMESPACE, {
object,
refs: [`test:${i.toString()}`],
deps: [],
tags: []
}}))
refs: [`test:${i.toString()}`]
},
alephNode.statementCounter))
}
// add a statement with an invalid object reference
promises.push(alephNode.makeStatement('scratch.test', {simple: {
promises.push(makeSimpleStatement(alephNode.publisherId, TEST_NAMESPACE, {
object: 'QmNLftPEMzsadpbTsGaVP3haETYJb4GfnCgQiaFj5Red9G', refs: [], deps: [], tags: []
}}))
}))
return Promise.all(promises)
})
}

describe('Push', () => {
let concatClient
let alephNode
let alephPeerIdB58
let publisherId
Expand All @@ -68,7 +73,12 @@ describe('Push', () => {
.then(() => seedUnauthorizedStatement(alephNode))
.then(_stmtId => { unauthorizedStatementId = _stmtId })
.then(() => concatNodeClient())
.then(concat => concat.authorize(alephPeerIdB58, ['scratch.*']))
.then(client => { concatClient = client })
.then(() => concatClient.authorize(alephPeerIdB58, [TEST_NAMESPACE]))
)

after(() =>
concatClient.delete(`DELETE FROM ${TEST_NAMESPACE}`)
)

it('pushes data to a concat node', () => {
Expand Down Expand Up @@ -96,7 +106,7 @@ describe('Push', () => {
it('returns counts + error message for partially successful push', () => {
const numValid = 10
return alephNode.start()
.then(() => PromiseHash({
.then(() => promiseHash({
pInfo: concatNodePeerInfo(),
statements: preparePartiallyValidStatements(alephNode, numValid)
}))
Expand Down
15 changes: 10 additions & 5 deletions integration-test/query_test.js
Expand Up @@ -8,28 +8,33 @@ const { getTestNodeId } = require('../test/util')
const { MediachainNode: AlephNode } = require('../src/peer/node')
const { concatNodeClient, concatNodePeerInfo } = require('./util')

const TEST_NAMESPACE = 'scratch.query-test'

const seedStatements = [
{object: 'QmF00123', tags: [], refs: [], deps: []},
{object: 'QmF00456', tags: ['foo'], refs: [], deps: []},
{object: 'QmFoo789', refs: ['bar'], tags: ['foo'], deps: []}
]

describe('Query', () => {
let concatClient
before(() =>
concatNodeClient()
.then(client => client.publish({namespace: 'foo.bar'}, ...seedStatements))
.then(client => { concatClient = client })
.then(() => concatClient.setStatus('online'))
.then(() => concatClient.publish({namespace: TEST_NAMESPACE}, ...seedStatements))
)

after(() => {
return concatNodeClient().then(client => client.delete('DELETE FROM foo.bar'))
})
after(() =>
concatClient.delete(`DELETE FROM ${TEST_NAMESPACE}`)
)

it('queries a remote concat node from aleph node', () => {
let alephNode
return getTestNodeId().then(peerId => { alephNode = new AlephNode({ peerId }) })
.then(() => alephNode.start())
.then(() => concatNodePeerInfo())
.then(concatInfo => alephNode.remoteQuery(concatInfo, 'SELECT * FROM foo.bar ORDER BY counter'))
.then(concatInfo => alephNode.remoteQuery(concatInfo, `SELECT * FROM ${TEST_NAMESPACE} ORDER BY counter`))
.then(results => {
assert(results != null && results.length > 0, 'query returned no results')

Expand Down
9 changes: 6 additions & 3 deletions integration-test/remote_data_test.js
Expand Up @@ -3,22 +3,25 @@

const assert = require('assert')
const { describe, it, before } = require('mocha')
const uuid = require('node-uuid')

const { getTestNodeId } = require('../test/util')
const { MediachainNode: AlephNode } = require('../src/peer/node')
const { concatNodeClient, concatNodePeerInfo } = require('./util')

const seedObjects = [
{foo: 'bar'},
{hello: 'world'}
{id: uuid.v4(), foo: 'bar'},
{id: uuid.v4(), hello: 'world'}
]

describe('Remote Data Fetching', () => {
let dataIds = []
let concatClient

before(() => {
return concatNodeClient()
.then(client => client.putData(...seedObjects))
.then(client => { concatClient = client })
.then(() => concatClient.putData(...seedObjects))
.then(ids => { dataIds = ids })
})

Expand Down
3 changes: 3 additions & 0 deletions package.json
Expand Up @@ -64,7 +64,10 @@
"pull-abortable": "^4.1.0",
"pull-length-prefixed": "^1.2.0",
"pull-paramap": "^1.2.1",
"pull-promise": "^2.0.0",
"pull-pushable": "https://registry.npmjs.org/pull-pushable/-/pull-pushable-2.0.1.tgz",
"pull-stream": "3.5.0",
"pull-window": "^2.1.4",
"sqlite3": "^3.1.8",
"thenify": "https://registry.npmjs.org/thenify/-/thenify-3.2.1.tgz",
"thenify-all": "^1.6.0",
Expand Down
43 changes: 1 addition & 42 deletions src/client/cli/util.js
Expand Up @@ -2,13 +2,11 @@

const { clone, set } = require('lodash')
const fs = require('fs')
const Multihash = require('multihashes')
const { JQ_PATH } = require('../../metadata/jqStream')
const childProcess = require('child_process')
const sshTunnel = require('tunnel-ssh')
const { RestClient } = require('../api')
import type { Writable } from 'stream'
import type { WriteStream } from 'tty'
const { println, printlnErr, isB58Multihash } = require('../../common/util')

function formatJSON (obj: ?mixed,
options: {color?: ?boolean, pretty?: boolean} = {}): string {
Expand Down Expand Up @@ -44,15 +42,6 @@ function pluralizeCount (count: number, word: string): string {
return count.toString() + ' ' + plural
}

function isB58Multihash (str: string): boolean {
try {
Multihash.fromB58String(str)
return true
} catch (err) {
return false
}
}

function setupSSHTunnel (config: Object): Promise<Object> {
return new Promise((resolve, reject) => {
sshTunnel(config, (err, server) => {
Expand Down Expand Up @@ -146,36 +135,6 @@ function subcommand<T: SubcommandGlobalOptions> (handler: (argv: T) => Promise<*
}
}

/**
* Print `output` to the `destination` stream and append a newline.
* @param output
* @param destination
*/
function writeln (output: string, destination: Writable | WriteStream) {
destination.write(output + '\n')
}

/**
* Print `output` to stdout and append a newline.
* Always use this instead of console.log for non-debug output!
* console.log keeps a strong reference to whatever you pass in,
* which can result in memory leaks for long-running processes.
* @param output
*/
function println (output: string) {
writeln(output, process.stdout)
}

/**
* Print `output` to stderr and append a newline.
* Use if you don't want console.error to keep a strong reference
* to whatever you pass in.
* @param output
*/
function printlnErr (output: string) {
writeln(output, process.stderr)
}

module.exports = {
println,
printlnErr,
Expand Down