Skip to content

Commit

Permalink
Working kv store cli.
Browse files Browse the repository at this point in the history
  • Loading branch information
mcollina committed Oct 28, 2016
1 parent c2d4b9f commit 848fcd6
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 94 deletions.
53 changes: 52 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,58 @@ npm i upring-kv --save

## Usage

See [./example.js](./example.js) for exposing upring-kv over HTTP.
See [./bin.js](./bin.js) for exposing upring-kv over HTTP.
This file contains a small http API to get/put data into the
key-value store. Each URL equals to a given key.

To use is, follow these instructions. First, install some
dependencies:

```
npm i upring-kv pino baseswim -g
```

Then, we need to figure out what is our ip.

On Linux:

```sh
export MYIP=`ip addr show wlan0 | grep -Po 'inet \K[\d.]+'`
```

On Mac:

```sh
export MYIP=`ipconfig getifaddr en0`
```

The export phase needs to be done for every opened shell.

Then we can start our upring cluster. We will use a
[baseswim](http://npm.im/baseswim) node to simplify bootstrapping.

```sh
# on one shell
baseswim --host $MYIP --port 7979 | pino
# on another shell
upring-kv -p 3042 $MYIP:7979 | pino
# on another shell
upring-kv -p 3043 $MYIP:7979 | pino
# on another shell
upring-kv -p 3044 $MYIP:7979 | pino
```

Then we can query our key/value storage using basic curl.

```
curl -v localhost:3042
curl -X POST -d 'hello upring' localhost:3043
curl -v localhost:3044
# on another shell
curl localhost:3042?live=true # use SSE to send updates
# one more shell
curl -X POST -d 'by Matteo' localhost:3043
```

## API

Expand Down
144 changes: 144 additions & 0 deletions bin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#! /usr/bin/env node
'use strict'

const UpRingKV = require('.')
const fs = require('fs')
const path = require('path')
const http = require('http')
const querystring = require('querystring')
const Transform = require('readable-stream').Transform
const pino = require('pino-http')
const pump = require('pump')
const args = require('minimist')(process.argv.slice(2), {
boolean: ['help', 'verbose'],
default: {
port: 0,
points: 100,
timeout: 200,
verbose: false
},
alias: {
port: 'p',
points: 'P',
help: 'h',
timeout: 't',
verbose: 'V'
}
})

if (args.help) {
console.log(fs.readFileSync(path.join(__dirname, 'help.txt')))
process.exit(1)
}

if (args.version) {
console.log('upring-kv', 'v' + require('./package').version)
process.exit(1)
}

const db = UpRingKV({
base: args._,
logLevel: args.verbose ? 'debug' : 'info',
hashring: {
replicaPoints: args.points,
joinTimeout: args.timeout
}
})

db.upring.on('up', function () {
console.log('to start a new peer, copy and paste the following in a new terminal:')
console.log('node example', this.whoami())

const logger = pino(db.upring.logger)

const server = http.createServer(function (req, res) {
logger(req, res)
switch (req.method) {
case 'PUT':
case 'POST':
handlePost(req, res)
break
case 'GET':
handleGet(req, res)
break
default:
res.statusCode = 404
res.end()
}
})

server.listen(args.port, function (err) {
if (err) {
throw err
}

console.log('server listening on', server.address())
})

function handleGet (req, res) {
const split = req.url.split('?')
const key = split[0]
const query = querystring.parse(split[1])
if (query.live) {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
})
var transform = new Transform({
objectMode: true,
transform (chunk, enc, cb) {
this.push('data:' + JSON.stringify(chunk.value) + '\n\n')
cb()
}
})
pump(db.liveUpdates(key), transform, res)
return
} else {
db.get(key, function (err, data) {
if (err) {
res.statusCode = 500
res.end(err.message)
return
}

if (!data) {
res.statusCode = 404
res.end()
return
}

res.setHeader('Content-Type', data.contentType)
res.end(data.value)
})
}
}

function handlePost (req, res) {
var str = ''

req.on('data', function (chunk) {
str += chunk.toString()
})

req.on('error', function (err) {
res.statusCode = 500
res.end(err.message)
})

req.on('end', function () {
db.put(req.url, {
contentType: req.headers['content-type'],
value: str
}, function (err) {
if (err) {
res.statusCode = 500
res.end(err.message)
} else {
res.statusCode = 200
res.end()
}
})
})
}
})
86 changes: 0 additions & 86 deletions example.js

This file was deleted.

9 changes: 9 additions & 0 deletions help.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Usage: upring-kv [options] base..

Options:
-h/--help print this help
-p/--port PORT the HTTP port to listen to
-P/--points POINTS the number of points each peer has
-t/--timeout MS millis to wait the peer to join the base
-V/--verbose verbose mode on
-v/--version the version of upring-kv
2 changes: 1 addition & 1 deletion kv.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
'use strict'

const UpRing = require('upring')
const commands = require('./lib/commands')
const clone = require('clone')
const nes = require('never-ending-stream')
const commands = require('./lib/commands')
const ns = 'kv'

function UpRingKV (opts) {
Expand Down
18 changes: 12 additions & 6 deletions lib/commands.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ function load (kv) {
}

function sendData (peer, cb) {
cb = cb || bigError
if (typeof cb !== 'function') {
cb = bigError
}
upring.peerConn(peer).request(req, function (err) {
if (err) {
cb(err)
Expand All @@ -60,10 +62,13 @@ function load (kv) {
upring.add('ns:kv,cmd:get', function (req, reply) {
const value = pairs.get(req.key)
const key = req.key
const dest = upring._hashring.next(key)
if (value || !dest) {
req.skipList = req.skipList || []
req.skipList.push(upring.whoami())
const dest = upring._hashring.next(key, req.skipList)

if (value || !(dest && upring.allocatedToMe(key))) {
reply(null, { key, value })
} else if (upring.allocatedToMe(key)) {
} else {
logger.debug({ key }, 'checking if we are in the middle of a migration')
upring.peerConn(dest)
.request(req, function (err, res) {
Expand All @@ -85,10 +90,11 @@ function load (kv) {
}

const updates = new Readable({
objectMode: true,
read: function () {}
objectMode: true
})

updates._read = function () {}

eos(updates, function () {
array.splice(array.indexOf(updates), 1)
})
Expand Down
6 changes: 6 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
"version": "0.1.0",
"description": "Key-Value store on top of UpRing",
"main": "kv.js",
"bin": {
"upring-kv": "./bin.js"
},
"scripts": {
"test": "standard | snazzy && tap test/*test.js",
"coverage": "tap --cov --coverage-report=html test/*.test.js",
Expand Down Expand Up @@ -37,7 +40,10 @@
"end-of-stream": "^1.1.0",
"flush-write-stream": "^1.0.0",
"level": "^1.4.0",
"minimist": "^1.2.0",
"never-ending-stream": "^2.0.0",
"pino-http": "^1.3.0",
"pump": "^1.0.1",
"readable-stream": "^2.1.5",
"upring": "^0.12.0"
}
Expand Down
Loading

0 comments on commit 848fcd6

Please sign in to comment.