Permalink
Browse files

Refactor query API to use EventEmitter with pause/resume

  • Loading branch information...
1 parent 89d59e8 commit bc338451e38af34dcfcd9b75af767381873e8e8d @wdavidw committed Nov 21, 2011
View
115 README.md
@@ -1,53 +1,75 @@
-# Thrift Hive - Hive client using the Apache Thrift RPC system.
+# Thrift Hive - Hive client with multi versions support and a streaming API.
-The project export the Hive API throught Thrift. Multiple versions of hive are
-supported.
+The project export the [Hive API][1] using [Apache Thrift RPC system][2]. It
+support multiple versions and a readable stream API.
-The only function added to the generated Thrift code is `hive.createClient`. It
-take an `options` object as its argument and return an object with the following
-properties:
+## Hive Client
+
+We've added a function `hive.createClient` to simplify coding. However, you
+are free to use the raw Thrift API. The client take an `options` object as its
+argument andexpose an `execute` and a `query` methods.
+
+Available options
+- `version`
+ default to '0.7.1-cdh3u2'
+- `server`
+ default to '127.0.0.1'
+- `port`
+ default to 10000
+- `timeout`
+ default to 1000 milliseconds
+
+Available API
- `client`
- A reference to the hive client returned by `thrift.createClient`
+ A reference to the thrift client returned by `thrift.createClient`
- `connection`
- A reference to the hive connection returned by `thrift.createConnection`
+ A reference to the thrift connection returned by `thrift.createConnection`
- `end([callback])`
Close the Thrift connection
- `execute(query, [callback])`
Execute a query
-- `query(query, [size], [callback])`
+- `query(query, [size])`
Execute a query and return its results as an array of arrays (rows and
- columns)
+ columns). The size argument is optional and indicate the number of row to
+ return on each fetch.
-## Hive connection: sugar example
-
-```javascript
- var hive = require('thrift-hive');
- // Client connection
- var client = hive.createClient({
- version: '0.7.1-cdh3u2'
- server: '127.0.0.1'
- port: 10000
- timeout: 1000
- });
- // Execute query
- client.query('show databases', function(err, databases){
- assert.ifError(err);
- console.log(databases);
- client.end();
- });
+```coffeescript
+ hive = require 'thrift-hive'
+ # Client connection
+ client = hive.createClient
+ version: '0.7.1-cdh3u2'
+ server: '127.0.0.1'
+ port: 10000
+ timeout: 1000
+ # Execute
+ client.execute 'USE default', (err) ->
+ console.log err.message if err
+ client.end()
```
-## Hive connection: raw example
+
+## Hive Query
+
+The `client.query` function return an object similar to the
+[Readable Stream API][3]. At the moment, we have only implemented `pause` and
+`resume`.
+
+
+
+## Raw versus sugar API
+
+Here's an exemple using the raw API
```javascript
- var thrift = require('thrift');
- var transport = require('thrift/transport');
- var ThriftHive = require('thrift-hive/0.7.1-cdh3u2/ThriftHive');
- // Client connection
+ var assert = require('assert');
+ var thrift = require('thrift');
+ var transport = require('thrift/lib/thrift/transport');
+ var ThriftHive = require('../lib/0.7.1-cdh3u2/ThriftHive');
+ // Client connection
var options = {transport: transport.TBufferedTransport, timeout: 1000};
var connection = thrift.createConnection('127.0.0.1', 10000, options);
var client = thrift.createClient(ThriftHive, connection);
- // Execute with fetchAll
+ // Execute query
client.execute('show databases', function(err){
assert.ifError(err);
client.fetchAll(function(err, databases){
@@ -58,3 +80,30 @@ properties:
});
```
+Here's an exemple using our sugar API
+
+```javascript
+ var assert = require('assert');
+ var hive = require('thrift-hive');
+ // Client connection
+ var client = hive.createClient({
+ version: '0.7.1-cdh3u2',
+ server: '127.0.0.1',
+ port: 10000,
+ timeout: 1000
+ });
+ // Execute query
+ client.query('show databases')
+ .on('row', function(database){
+ console.log(database);
+ })
+ .on('end', function(err){
+ assert.ifError(err);
+ client.end();
+ });
+```
+
+
+[1]: http://hive.apache.org "Apache Hive"
+[2]: http://thrift.apache.org "Apache Thrift"
+[3]: http://nodejs.org/docs/v0.6.2/api/streams.html#readable_Stream "Readable Stream API"
View
@@ -1,6 +1,7 @@
thrift = require 'thrift'
transport = require 'thrift/lib/thrift/transport'
+EventEmitter = require('events').EventEmitter
module.exports.createClient = (options = {}) ->
options.version ?= '0.7.1-cdh3u2'
@@ -17,18 +18,43 @@ module.exports.createClient = (options = {}) ->
execute: (query, callback) ->
client.execute query, (err) ->
callback err if callback
- query: (query, size, callback) ->
- if arguments.length is 2 and typeof callback is 'function'
+ query: (query, size) ->
+ if arguments.length is 2 and typeof size is 'function'
callback = size
size = -1
client.execute query, (err) ->
- callback err if err and callback
- if size is -1
- client.fetchAll (err, data) ->
- data = data.map (row) -> row.split '\t'
- callback err, data if callback
+ if err
+ emitter.emit 'error', err if emitter.listeners('error').length
+ emitter.emit 'end', err
+ return
+ fetch()
+ closed = false
+ emitter = new EventEmitter
+ emitter.paused = 0
+ emitter.pause = ->
+ @paused++
+ emitter.resume = ->
+ @paused--
+ fetch() if @paused is 0
+ handle = (err, rows) =>
+ if err
+ closed = true
+ emitter.emit 'error', err
+ emitter.emit 'end', err
+ return
+ rows = rows.map (row) -> row.split '\t'
+ for row in rows
+ emitter.emit 'row', row
+ if rows.length is size
+ fetch()
else
- client.fetchN size, (err, data) ->
- data = data.map (row) -> row.split '\t'
- callback err, data if callback
+ closed = true
+ emitter.emit 'success', err
+ emitter.emit 'end'
+ fetch = ->
+ return if emitter.paused or closed
+ if size
+ then client.fetchN size, handle
+ else client.fetchAll handle
+ emitter
View
@@ -0,0 +1,13 @@
+#!/usr/bin/env node
+
+ hive = require 'thrift-hive'
+ # Client connection
+ client = hive.createClient
+ version: '0.7.1-cdh3u2'
+ server: '127.0.0.1'
+ port: 10000
+ timeout: 1000
+ # Execute
+ client.execute 'USE default', (err) ->
+ console.log err.message if err
+ client.end()
View
@@ -0,0 +1,20 @@
+#!/usr/bin/env coffee
+
+ assert = require 'assert'
+ hive = require "#{__dirname}/.."
+
+ client = hive.createClient()
+
+ client.execute 'use test_database', (err) ->
+ assert.ifError err
+ query = client.query('select * from test_table limit 10', 10)
+ .on 'row', (row) ->
+ query.pause()
+ setTimeout ->
+ console.log row
+ query.resume()
+ , 100
+ .on 'end', (err) ->
+ console.log err.message if err
+ client.end()
+
View
@@ -1,16 +0,0 @@
-#!/usr/bin/env coffee
-
- assert = require 'assert'
- hive = require '../../'
-
- client = hive.createClient
- version: '0.7.1-cdh3u2'
- server: '127.0.0.1'
- port: 10000
- timeout: 1000
-
- client.execute 'use my_db', (err) ->
- client.query 'select * from my_table limit 10', (err, results) ->
- assert.ifError(err)
- console.log(results)
- client.end()
View
@@ -1,16 +0,0 @@
-#!/usr/bin/env coffee
-
- assert = require 'assert'
- hive = require '../../'
-
- client = hive.createClient
- version: '0.7.1-cdh3u2'
- server: '127.0.0.1'
- port: 10000
- timeout: 1000
-
- client.execute 'use my_db', (err) ->
- client.query 'select * from my_table', 10, (err, results) ->
- assert.ifError(err)
- console.log(results)
- client.end()
@@ -1,6 +1,5 @@
#!/usr/bin/env node
- // Client conenction
var assert = require('assert');
var thrift = require('thrift');
var transport = require('thrift/lib/thrift/transport');
@@ -9,7 +8,7 @@
var options = {transport: transport.TBufferedTransport, timeout: 1000};
var connection = thrift.createConnection('127.0.0.1', 10000, options);
var client = thrift.createClient(ThriftHive, connection);
- // Execute with fetchAll
+ // Execute query
client.execute('show databases', function(err){
assert.ifError(err);
client.fetchAll(function(err, databases){
@@ -1,7 +1,7 @@
#!/usr/bin/env node
var assert = require('assert');
- var hive = require('..');
+ var hive = require('thrift-hive');
// Client connection
var client = hive.createClient({
version: '0.7.1-cdh3u2',
@@ -10,8 +10,11 @@
timeout: 1000
});
// Execute query
- client.query('show databases', function(err, databases){
+ client.query('show databases')
+ .on('row', function(database){
+ console.log(database);
+ })
+ .on('end', function(err){
assert.ifError(err);
- console.log(databases);
client.end();
});
Oops, something went wrong.

0 comments on commit bc33845

Please sign in to comment.