Skip to content
Browse files

Initial rewrite

  • Loading branch information...
1 parent ba7c437 commit bf3a03375469922421d299cf97f62a78d092702e @horaci horaci committed
Showing with 798 additions and 249 deletions.
  1. +4 −0 .gitignore
  2. +60 −0 conf/hdfs-site.xml
  3. +8 −0 conf/mapred-site.xml
  4. +1 −0 conf/slaves
  5. +34 −20 demo/demo.js
  6. +129 −0 node-hdfs.js
  7. +14 −0 package.json
  8. +11 −0 run
  9. +0 −226 src/hdfs.cc
  10. +522 −0 src/hdfs_bindings.cc
  11. +15 −3 wscript
View
4 .gitignore
@@ -1,2 +1,6 @@
build/
.lock-wscript
+hdfs_bindings.node
+hadoop_cluster.xml
+core-site.xml
+
View
60 conf/hdfs-site.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+<property>
+ <name>dfs.replication</name>
+ <value>1</value>
+ <description>Default block replication.
+ The actual number of replications can be specified when the file is created.
+ The default is used if replication is not specified in create time.
+ </description>
+</property>
+
+<property>
+ <name>dfs.support.append</name>
+ <value>true</value>
+ <description>Allow appends to files.
+ </description>
+</property>
+
+<property>
+ <name>dfs.datanode.address</name>
+ <value>0.0.0.0:50012</value>
+ <description>
+ The address where the datanode server will listen to.
+ If the port is 0 then the server will start on a free port.
+ </description>
+</property>
+
+<property>
+ <name>dfs.datanode.http.address</name>
+ <value>0.0.0.0:50079</value>
+ <description>
+ The datanode http server address and port.
+ If the port is 0 then the server will start on a free port.
+ </description>
+</property>
+
+<property>
+ <name>dfs.datanode.ipc.address</name>
+ <value>0.0.0.0:50022</value>
+ <description>
+ The datanode ipc server address and port.
+ If the port is 0 then the server will start on a free port.
+ </description>
+</property>
+
+<property>
+ <name>dfs.http.address</name>
+ <value>0.0.0.0:50072</value>
+ <description>
+ The address and the base port where the dfs namenode web ui will listen on.
+ If the port is 0 then the server will start on a free port.
+ </description>
+</property>
+
+</configuration>
View
8 conf/mapred-site.xml
@@ -0,0 +1,8 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+</configuration>
View
1 conf/slaves
@@ -0,0 +1 @@
+localhost
View
54 demo/demo.js
@@ -1,25 +1,39 @@
-var h = require('../build/default/hdfs.node');
-var hi = new h.Hdfs();
-var data = new Buffer("Hello, my name is Paul. This is an example of what to do.", encoding='utf8')
+var sys = require('sys')
+ , HDFS = require('../node-hdfs')
-var writtenBytes = hi.write("/tmp/testfile.txt", data, function(bytes) {
- console.log("Wrote file with " + bytes + " bytes.\n")
- console.log("About to start reading byes...")
-
- hi.read("/tmp/testfile.txt", function(data) {
- console.log("Data was: " + data)
- })
+var hdfs = new HDFS({host:"default", port:0});
- console.log("Finished asking to read byes...")
-})
-console.log("Finished outer write\n")
+var hdfs_file_path = "/tmp/test.txt"
+var local_out_path = "/tmp/test.out";
-// console.log("Wrote " + writtenBytes + " bytes")
+// Stat File
+hdfs.stat(hdfs_file_path, function(err, data) {
+ if(!err) {
+ console.log("File stat of '" + data.path + "' is: " + JSON.stringify(data));
+ // => {"type":"file","path":"/tmp/test","size":183,"replication":1,"block_size":33554432,"owner":"horaci","group":"wheel","permissions":420,"last_mod":1315326370,"last_access":0}
+ }
+})
+// Read file
+hdfs.read(hdfs_file_path, 1024*1024, function(reader) {
+ var readed = 0;
+ reader.on("open", function(handle) {
+ console.log("File " + hdfs_file_path + " opened.")
+ });
+ reader.on("data", function(data) {
+ readed += data.length;
+ console.log("readed " + data.length + " bytes (" + readed +")");
+ });
+ reader.on("end", function(err) {
+ if(!err) {
+ console.log("Finished reading data - Total readed: " + readed);
+ }
+ });
+})
-// hi.openForWriting("/tmp/tetfile.txt", function(f) {
-// f.write(buffer, function(bytes) {
-// console.log("I just wrote some bytes");
-// })
-// })
-//
+// Copy file to local fs (in parallel with previous read file)
+hdfs.copyToLocalPath(hdfs_file_path, local_out_path, function(err, readed) {
+ if(!err) {
+ console.log(readed + " bytes copied from " + hdfs_file_path + " to " + local_out_path);
+ }
+})
View
129 node-hdfs.js
@@ -0,0 +1,129 @@
+var sys = require('sys')
+ , fs = require('fs')
+ , EventEmitter = require('events').EventEmitter
+ , HDFSBindings = require('./hdfs_bindings')
+
+var HDFS = new HDFSBindings.Hdfs();
+
+var modes = {
+ O_RDONLY : 0x0000,
+ O_WRONLY : 0x0001,
+ O_RDWR : 0x0002,
+ O_APPEND : 0x0008,
+ O_CREAT : 0x0200,
+ O_TRUNC : 0x0400
+}
+
+module.exports = function(options) {
+ this.host = options.host || "default";
+ this.port = options.port || 0;
+ this.connected = false;
+
+ var self = this;
+
+ this.connect = function() {
+ if(!this.connected) {
+ HDFS.connect(self.host, self.port);
+ this.connected = true;
+ }
+ }
+
+ this.stat = function(path, cb) {
+ self.connect();
+ HDFS.stat(path, cb);
+ }
+
+ this.open = function(path, mode, cb) {
+ self.connect();
+ HDFS.open(path, mode, cb);
+ }
+
+ this.close = function(handle, cb) {
+ self.connect();
+ HDFS.close(handle, cb);
+ }
+
+ this.read = function(path, bufferSize, cb) {
+ if (!cb || typeof cb != "function") {
+ cb = bufferSize;
+ bufferSize = 1024*1024;
+ }
+
+ self.connect();
+ var reader = new HDFSReader(path, bufferSize);
+ if(cb) {
+ cb(reader);
+ } else {
+ return reader;
+ }
+ }
+
+ this.copyToLocalPath = function(srcPath, dstPath, options, cb) {
+ if (!cb || typeof cb != "function") {
+ cb = options;
+ options = {encoding: null, mode:0666, flags: 'w'};
+ }
+ var stream = fs.createWriteStream(dstPath, options);
+ var readed = 0;
+ var bufferSize = options.bufferSize || 1024*1024; // 1mb chunks by default
+
+ stream.once('open', function(fd) {
+ self.read(srcPath, bufferSize, function(rh) {
+ rh.on('data', function(data) {
+ stream.write(data);
+ readed += data.length;
+ });
+ rh.once('end', function(err) {
+ stream.end();
+ cb(err, readed);
+ });
+ })
+ });
+ }
+}
+
+
+var HDFSReader = function(path, bufferSize) {
+ var self = this;
+
+ this.handle = null;
+ this.offset = 0;
+ this.length = 0;
+ this.bufferSize = bufferSize || 1024*1024;
+
+ this.read = function() {
+ HDFS.read(self.handle, self.offset, self.bufferSize, function(data) {
+ if(!data || data.length == 0) {
+ self.end();
+ } else {
+ self.emit("data", data);
+ self.offset += data.length;
+ data.length < self.bufferSize ? self.end() : self.read();
+ }
+ });
+ };
+
+ this.end = function(err) {
+ if(self.handle) {
+ HDFS.close(self.handle, function() {
+ self.emit("end", err);
+ })
+ } else {
+ self.emit("end", err);
+ }
+ }
+
+ HDFS.open(path, modes.O_RDONLY, function(err, handle) {
+ if(err) {
+ self.end(err);
+ } else {
+ self.emit("open", handle);
+ self.handle = handle;
+ self.read();
+ }
+ });
+
+ EventEmitter.call(this);
+}
+
+sys.inherits(HDFSReader, EventEmitter);
View
14 package.json
@@ -0,0 +1,14 @@
+{ "name": "node-hdfs",
+ "version": "0.0.1",
+ "author": "Forward",
+ "contributors": [
+ { "name": "Paul Ingles", "email": "paul@forward.co.uk"},
+ { "name": "Horaci Cuevas", "email": "horaci@forward.co.uk" }
+ ]
+ "description": "A node module for accessing Hadoop's file system (HDFS)",
+ "scripts": { "preinstall": "node-waf configure build" },
+ "main": "./node-hdfs",
+ "engines": { "node": ">0.4.11" },
+ "keywords": [ "hdfs", "hadoop", "fs", "libhdfs" ],
+ "repository": { "type": "git", "url": "http://github.com/forward/node-hdfs.git" }
+}
View
11 run
@@ -0,0 +1,11 @@
+export HADOOP_HOME=/usr/local/hadoop
+
+# Path to hadoop libs
+export CLASSPATH=$HADOOP_HOME/hadoop-core-0.20.2-cdh3u0.jar:$HADOOP_HOME/lib/commons-logging-1.0.4.jar
+
+# Add conf path where core-site.xml is
+export CLASSPATH=$CLASSPATH::./conf
+node-waf clean
+node-waf configure
+node-waf build
+node demo/demo.js
View
226 src/hdfs.cc
@@ -1,226 +0,0 @@
-/* This code is PUBLIC DOMAIN, and is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND. See the accompanying
- * LICENSE file.
- */
-
-#include <v8.h>
-#include <node.h>
-#include <node_buffer.h>
-#include "../vendor/hdfs.h"
-
-using namespace node;
-using namespace v8;
-
-#define REQ_FUN_ARG(I, VAR) \
- if (args.Length() <= (I) || !args[I]->IsFunction()) \
- return ThrowException(Exception::TypeError( \
- String::New("Argument " #I " must be a function"))); \
- Local<Function> VAR = Local<Function>::Cast(args[I]);
-
-class HdfsClient: ObjectWrap
-{
-private:
- int m_count;
-public:
-
- static Persistent<FunctionTemplate> s_ct;
- static void Init(Handle<Object> target)
- {
- HandleScope scope;
-
- Local<FunctionTemplate> t = FunctionTemplate::New(New);
-
- s_ct = Persistent<FunctionTemplate>::New(t);
- s_ct->InstanceTemplate()->SetInternalFieldCount(1);
- s_ct->SetClassName(String::NewSymbol("Hdfs"));
-
- NODE_SET_PROTOTYPE_METHOD(s_ct, "write", Write);
- NODE_SET_PROTOTYPE_METHOD(s_ct, "read", Read);
-
- target->Set(String::NewSymbol("Hdfs"), s_ct->GetFunction());
- }
-
- HdfsClient() :
- m_count(0)
- {
- }
-
- ~HdfsClient()
- {
- }
-
- static Handle<Value> New(const Arguments& args)
- {
- HandleScope scope;
- HdfsClient* hw = new HdfsClient();
- hw->Wrap(args.This());
- return args.This();
- }
-
- struct hdfs_write_baton_t {
- HdfsClient *client;
- char* filePath;
- tSize writtenBytes;
- Persistent<Function> cb;
- Persistent<Object> buffer;
- };
-
- struct hdfs_read_baton_t {
- HdfsClient *client;
- char *filePath;
- Persistent<Function> cb;
- Persistent<Object> buffer;
- };
-
- static Handle<Value> Read(const Arguments& args)
- {
- HandleScope scope;
-
- REQ_FUN_ARG(1, cb);
-
- v8::String::Utf8Value pathStr(args[0]);
- char* readPath = new char[strlen(*pathStr) + 1];
- strcpy(readPath, *pathStr);
-
- HdfsClient* client = ObjectWrap::Unwrap<HdfsClient>(args.This());
-
- hdfs_read_baton_t *baton = new hdfs_read_baton_t();
- baton->client = client;
- baton->cb = Persistent<Function>::New(cb);
- baton->filePath = readPath;
-
- client->Ref();
-
- eio_custom(eio_hdfs_read, EIO_PRI_DEFAULT, eio_after_hdfs_read, baton);
- uv_ref();
-
- return Undefined();
- }
-
- static int eio_hdfs_read(eio_req *req)
- {
- hdfs_read_baton_t *baton = static_cast<hdfs_read_baton_t*>(req->data);
- char* readPath = baton->filePath;
-
- hdfsFS fs = hdfsConnect("default", 0);
- hdfsFile readFile = hdfsOpenFile(fs, readPath, O_RDONLY, 0, 0, 0);
- int bytesAvailable = hdfsAvailable(fs, readFile);
- char *buf = new char[bytesAvailable + 1];
- memset(buf, 0, bytesAvailable + 1);
-
- int readBytes = hdfsRead(fs, readFile, (void*)buf, bytesAvailable);
-
- Buffer* buffer = Buffer::New(buf, bytesAvailable);
- baton->buffer = buffer->handle_;
-
- hdfsCloseFile(fs, readFile);
-
- return 0;
- }
-
- static int eio_after_hdfs_read(eio_req *req)
- {
- HandleScope scope;
- hdfs_read_baton_t *baton = static_cast<hdfs_read_baton_t*>(req->data);
- uv_unref();
- baton->client->Unref();
-
- Handle<Value> argv[1];
- argv[0] = Local<Value>::New(baton->buffer);
-
- TryCatch try_catch;
- baton->cb->Call(Context::GetCurrent()->Global(), 1, argv);
-
- if (try_catch.HasCaught()) {
- FatalException(try_catch);
- }
-
- baton->cb.Dispose();
-
- delete baton;
- return 0;
- }
-
- static Handle<Value> Write(const Arguments& args)
- {
- HandleScope scope;
-
- REQ_FUN_ARG(2, cb);
-
- v8::String::Utf8Value pathStr(args[0]);
- char* writePath = (char *) malloc(strlen(*pathStr) + 1);
- strcpy(writePath, *pathStr);
-
- HdfsClient* client = ObjectWrap::Unwrap<HdfsClient>(args.This());
-
- hdfs_write_baton_t *baton = new hdfs_write_baton_t();
- baton->client = client;
- baton->cb = Persistent<Function>::New(cb);
- baton->buffer = Persistent<Object>::New(args[1]->ToObject());
- baton->writtenBytes = 0;
- baton->filePath = writePath;
-
- client->Ref();
-
- eio_custom(eio_hdfs_write, EIO_PRI_DEFAULT, eio_after_hdfs_write, baton);
- uv_ref();
-
- return Undefined();
- }
-
- static int eio_hdfs_write(eio_req *req)
- {
- hdfs_write_baton_t *baton = static_cast<hdfs_write_baton_t*>(req->data);
- char* writePath = baton->filePath;
-
- hdfsFS fs = hdfsConnect("default", 0);
-
- char* bufData = Buffer::Data(baton->buffer);
- size_t bufLength = Buffer::Length(baton->buffer);
-
- hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
- tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)bufData, bufLength);
-
- hdfsFlush(fs, writeFile);
- hdfsCloseFile(fs, writeFile);
-
- baton->writtenBytes = num_written_bytes;
-
- return 0;
- }
-
- static int eio_after_hdfs_write(eio_req *req)
- {
- HandleScope scope;
- hdfs_write_baton_t *baton = static_cast<hdfs_write_baton_t*>(req->data);
- uv_unref();
- baton->client->Unref();
-
- Local<Value> argv[1];
- argv[0] = Integer::New(baton->writtenBytes);
-
- TryCatch try_catch;
-
- baton->cb->Call(Context::GetCurrent()->Global(), 1, argv);
-
- if (try_catch.HasCaught()) {
- FatalException(try_catch);
- }
-
- baton->cb.Dispose();
-
- delete baton;
- return 0;
- }
-};
-
-Persistent<FunctionTemplate> HdfsClient::s_ct;
-
-extern "C" {
- static void init (Handle<Object> target)
- {
- HdfsClient::Init(target);
- }
-
- NODE_MODULE(hdfs, init);
-}
View
522 src/hdfs_bindings.cc
@@ -0,0 +1,522 @@
+/* This code is PUBLIC DOMAIN, and is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND. See the accompanying
+ * LICENSE file.
+ */
+
+#include <v8.h>
+#include <node.h>
+#include <node_buffer.h>
+#include <node_object_wrap.h>
+#include <unistd.h>
+#include "../vendor/hdfs.h"
+
+using namespace node;
+using namespace v8;
+
+#define REQ_FUN_ARG(I, VAR) \
+ if (args.Length() <= (I) || !args[I]->IsFunction()) \
+ return ThrowException(Exception::TypeError( \
+ String::New("Argument " #I " must be a function"))); \
+ Local<Function> VAR = Local<Function>::Cast(args[I]);
+
+class HdfsClient : public ObjectWrap
+{
+private:
+ int m_count;
+ hdfsFS fs_;
+
+ int fh_count_;
+ hdfsFile_internal **fh_;
+public:
+
+ static Persistent<FunctionTemplate> s_ct;
+ static void Init(Handle<Object> target)
+ {
+ HandleScope scope;
+
+ Local<FunctionTemplate> t = FunctionTemplate::New(New);
+
+ s_ct = Persistent<FunctionTemplate>::New(t);
+ s_ct->InstanceTemplate()->SetInternalFieldCount(1);
+ s_ct->SetClassName(String::NewSymbol("HdfsBindings"));
+
+ NODE_SET_PROTOTYPE_METHOD(s_ct, "connect", Connect);
+ NODE_SET_PROTOTYPE_METHOD(s_ct, "write", Write);
+ NODE_SET_PROTOTYPE_METHOD(s_ct, "read", Read);
+ NODE_SET_PROTOTYPE_METHOD(s_ct, "stat", Stat);
+ NODE_SET_PROTOTYPE_METHOD(s_ct, "open", Open);
+ NODE_SET_PROTOTYPE_METHOD(s_ct, "close", Close);
+
+ target->Set(String::NewSymbol("Hdfs"), s_ct->GetFunction());
+ }
+
+ HdfsClient()
+ {
+ m_count = 0;
+ fs_ = NULL;
+ fh_ = (hdfsFile_internal **) calloc(1024, sizeof(hdfsFile_internal *));
+ memset(fh_, 0, sizeof(fh_));
+ fh_count_ = 1024;
+ }
+
+ ~HdfsClient()
+ {
+ free(fh_);
+ }
+
+ static Handle<Value> New(const Arguments& args)
+ {
+ HandleScope scope;
+ HdfsClient* client = new HdfsClient();
+ client->Wrap(args.This());
+ return args.This();
+ }
+
+ struct hdfs_open_baton_t {
+ HdfsClient *client;
+ char *filePath;
+ Persistent<Function> cb;
+ hdfsFile_internal *fileHandle;
+ int flags;
+ };
+
+ struct hdfs_write_baton_t {
+ HdfsClient *client;
+ char* filePath;
+ tSize writtenBytes;
+ Persistent<Function> cb;
+ Persistent<Object> buffer;
+ Local<String> error;
+ hdfsFS fs;
+ };
+
+ struct hdfs_read_baton_t {
+ HdfsClient *client;
+ int fh;
+ int bufferSize;
+ int offset;
+ hdfsFile_internal *fileHandle;
+ Persistent<Function> cb;
+ char *buffer;
+ int readBytes;
+ };
+
+ struct hdfs_stat_baton_t {
+ HdfsClient *client;
+ char *filePath;
+ hdfsFileInfo *fileStat;
+ Persistent<Function> cb;
+ };
+
+ struct hdfs_close_baton_t {
+ HdfsClient *client;
+ int fh;
+ Persistent<Function> cb;
+ };
+
+
+ static Handle<Value> Connect(const Arguments &args)
+ {
+ HdfsClient* client = ObjectWrap::Unwrap<HdfsClient>(args.This());
+ v8::String::Utf8Value hostStr(args[0]);
+ client->fs_ = hdfsConnectNewInstance(*hostStr, args[1]->Int32Value());
+ return Boolean::New(client->fs_ ? true : false);
+ }
+
+ static Handle<Value> Stat(const Arguments &args)
+ {
+ HandleScope scope;
+
+ REQ_FUN_ARG(1, cb);
+
+ HdfsClient* client = ObjectWrap::Unwrap<HdfsClient>(args.This());
+
+ v8::String::Utf8Value pathStr(args[0]);
+ char* statPath = new char[strlen(*pathStr) + 1];
+ strcpy(statPath, *pathStr);
+
+ hdfs_stat_baton_t *baton = new hdfs_stat_baton_t();
+ baton->client = client;
+ baton->cb = Persistent<Function>::New(cb);
+ baton->filePath = statPath;
+ baton->fileStat = NULL;
+
+ client->Ref();
+
+ eio_custom(eio_hdfs_stat, EIO_PRI_DEFAULT, eio_after_hdfs_stat, baton);
+ ev_ref(EV_DEFAULT_UC);
+
+ return Undefined();
+ }
+
+ static int eio_hdfs_stat(eio_req *req)
+ {
+ hdfs_stat_baton_t *baton = static_cast<hdfs_stat_baton_t*>(req->data);
+ baton->fileStat = hdfsGetPathInfo(baton->client->fs_, baton->filePath);
+ return 0;
+ }
+
+ static int eio_after_hdfs_stat(eio_req *req)
+ {
+ HandleScope scope;
+ hdfs_stat_baton_t *baton = static_cast<hdfs_stat_baton_t*>(req->data);
+ ev_unref(EV_DEFAULT_UC);
+ baton->client->Unref();
+
+ Handle<Value> argv[2];
+
+ if(baton->fileStat) {
+ Persistent<Object> statObject = Persistent<Object>::New(Object::New());;
+
+ char *path = baton->fileStat->mName;
+
+ char kind = (char)baton->fileStat->mKind;
+
+ statObject->Set(String::New("type"), String::New(kind == 'F' ? "file" : kind == 'D' ? "directory" : "other"));
+ statObject->Set(String::New("path"), String::New(path));
+ statObject->Set(String::New("size"), Integer::New(baton->fileStat->mSize));
+ statObject->Set(String::New("replication"), Integer::New(baton->fileStat->mReplication));
+ statObject->Set(String::New("block_size"), Integer::New(baton->fileStat->mBlockSize));
+ statObject->Set(String::New("owner"), String::New(baton->fileStat->mOwner));
+ statObject->Set(String::New("group"), String::New(baton->fileStat->mGroup));
+ statObject->Set(String::New("permissions"), Integer::New(baton->fileStat->mPermissions));
+ statObject->Set(String::New("last_mod"), Integer::New(baton->fileStat->mLastMod));
+ statObject->Set(String::New("last_access"), Integer::New(baton->fileStat->mLastAccess));
+
+ argv[0] = Local<Value>::New(Undefined());
+ argv[1] = Local<Value>::New(statObject);
+
+ hdfsFreeFileInfo(baton->fileStat, 1);
+ } else {
+ argv[0] = Local<Value>::New(String::New("File does not exist"));
+ argv[1] = Local<Value>::New(Undefined());
+ }
+
+ TryCatch try_catch;
+ baton->cb->Call(Context::GetCurrent()->Global(), 2, argv);
+
+ if (try_catch.HasCaught()) {
+ FatalException(try_catch);
+ }
+
+ baton->cb.Dispose();
+
+ delete baton;
+ return 0;
+ }
+
+
+ /**********************/
+ /* Open */
+ /**********************/
+ // open(char *path, int flags, callback)
+
+ static Handle<Value> Open(const Arguments &args)
+ {
+ HandleScope scope;
+ REQ_FUN_ARG(2, cb);
+
+ // get client
+ HdfsClient* client = ObjectWrap::Unwrap<HdfsClient>(args.This());
+
+ // Parse path
+ v8::String::Utf8Value pathStr(args[0]);
+ char* statPath = new char[strlen(*pathStr) + 1];
+ strcpy(statPath, *pathStr);
+
+ // Initialize baton
+ hdfs_open_baton_t *baton = new hdfs_open_baton_t();
+ baton->client = client;
+ baton->cb = Persistent<Function>::New(cb);
+ baton->filePath = statPath;
+ baton->fileHandle = NULL;
+ baton->flags = args[1]->Int32Value();
+
+ client->Ref();
+
+ // Call eio operation
+ eio_custom(eio_hdfs_open, EIO_PRI_DEFAULT, eio_after_hdfs_open, baton);
+ ev_ref(EV_DEFAULT_UC);
+
+ return Undefined();
+ }
+
+ static int eio_hdfs_open(eio_req *req)
+ {
+ hdfs_open_baton_t *baton = static_cast<hdfs_open_baton_t*>(req->data);
+ baton->fileHandle = hdfsOpenFile(baton->client->fs_, baton->filePath, baton->flags, 0, 0, 0);
+ return 0;
+ }
+
+ int CreateFileHandle(hdfsFile_internal *f)
+ {
+ // TODO: quick and dirty, totally inefficient!
+ int fh = 0;
+ while(fh_[fh]) fh++;
+ fh_[fh] = f;
+ return fh;
+ }
+
+ hdfsFile_internal *GetFileHandle(int fh)
+ {
+ return fh_[fh];
+ }
+
+ void RemoveFileHandle(int fh)
+ {
+ fh_[fh] = NULL;
+ }
+
+ static int eio_after_hdfs_open(eio_req *req)
+ {
+ HandleScope scope;
+ hdfs_open_baton_t *baton = static_cast<hdfs_open_baton_t*>(req->data);
+
+ ev_unref(EV_DEFAULT_UC);
+ baton->client->Unref();
+
+ Handle<Value> argv[2];
+
+ if(baton->fileHandle) {
+ // create entry on global files
+ int fh = baton->client->CreateFileHandle(baton->fileHandle);
+ argv[0] = Local<Value>::New(Undefined());
+ argv[1] = Local<Value>::New(Integer::New(fh));
+ } else {
+ argv[0] = Local<Value>::New(String::New("File does not exist"));
+ argv[1] = Local<Value>::New(Undefined());
+ }
+
+ TryCatch try_catch;
+ baton->cb->Call(Context::GetCurrent()->Global(), 2, argv);
+
+ if (try_catch.HasCaught()) {
+ FatalException(try_catch);
+ }
+
+ baton->cb.Dispose();
+
+ delete baton;
+ return 0;
+ }
+
+ /**********************/
+ /* Close */
+ /**********************/
+
+ static Handle<Value> Close(const Arguments &args)
+ {
+ HandleScope scope;
+ REQ_FUN_ARG(1, cb);
+
+ // get client
+ HdfsClient* client = ObjectWrap::Unwrap<HdfsClient>(args.This());
+
+ // Initialize baton
+ hdfs_close_baton_t *baton = new hdfs_close_baton_t();
+ baton->client = client;
+ baton->cb = Persistent<Function>::New(cb);
+ baton->fh = args[0]->Int32Value();
+
+ client->Ref();
+
+ // Call eio operation
+ eio_custom(eio_hdfs_close, EIO_PRI_DEFAULT, eio_after_hdfs_close, baton);
+ ev_ref(EV_DEFAULT_UC);
+
+ return Undefined();
+ }
+
+ static int eio_hdfs_close(eio_req *req)
+ {
+ hdfs_close_baton_t *baton = static_cast<hdfs_close_baton_t*>(req->data);
+ hdfsFile_internal *hdfsfile = baton->client->GetFileHandle(baton->fh);
+
+ if(hdfsfile) {
+ hdfsCloseFile(baton->client->fs_, hdfsfile);
+ baton->client->RemoveFileHandle(baton->fh);
+ }
+
+ return 0;
+ }
+
+ static int eio_after_hdfs_close(eio_req *req)
+ {
+ HandleScope scope;
+ hdfs_close_baton_t *baton = static_cast<hdfs_close_baton_t*>(req->data);
+
+ ev_unref(EV_DEFAULT_UC);
+ baton->client->Unref();
+
+ TryCatch try_catch;
+ baton->cb->Call(Context::GetCurrent()->Global(), 0, NULL);
+
+ if (try_catch.HasCaught()) {
+ FatalException(try_catch);
+ }
+
+ baton->cb.Dispose();
+
+ delete baton;
+ return 0;
+ }
+
+ /**********************/
+ /* READ */
+ /**********************/
+
+ // handle, offset, bufferSize, callback
+ static Handle<Value> Read(const Arguments &args)
+ {
+ HandleScope scope;
+
+ REQ_FUN_ARG(3, cb);
+
+ HdfsClient* client = ObjectWrap::Unwrap<HdfsClient>(args.This());
+
+ int fh = args[0]->Int32Value();
+ hdfsFile_internal *fileHandle = client->GetFileHandle(fh);
+
+ if(!fileHandle) {
+ return ThrowException(Exception::TypeError(String::New("Invalid file handle")));
+ }
+
+ hdfs_read_baton_t *baton = new hdfs_read_baton_t();
+ baton->client = client;
+ baton->cb = Persistent<Function>::New(cb);
+ baton->fileHandle = fileHandle;
+ baton->offset = args[1]->Int32Value();
+ baton->bufferSize = args[2]->Int32Value();
+ baton->fh = fh;
+
+ client->Ref();
+
+ eio_custom(eio_hdfs_read, EIO_PRI_DEFAULT, eio_after_hdfs_read, baton);
+ ev_ref(EV_DEFAULT_UC);
+
+ return Undefined();
+ }
+
+ static int eio_hdfs_read(eio_req *req)
+ {
+ hdfs_read_baton_t *baton = static_cast<hdfs_read_baton_t*>(req->data);
+ baton->buffer = (char *) malloc(baton->bufferSize * sizeof(char));
+ baton->readBytes = hdfsPread(baton->client->fs_, baton->fileHandle, baton->offset, baton->buffer, baton->bufferSize);
+ return 0;
+ }
+
+ static int eio_after_hdfs_read(eio_req *req)
+ {
+ HandleScope scope;
+
+ hdfs_read_baton_t *baton = static_cast<hdfs_read_baton_t*>(req->data);
+ ev_unref(EV_DEFAULT_UC);
+ baton->client->Unref();
+
+ Handle<Value> argv[1];
+
+ Buffer *b = Buffer::New(baton->buffer, baton->readBytes);
+ argv[0] = Local<Value>::New(b->handle_);
+ free(baton->buffer);
+
+ TryCatch try_catch;
+ baton->cb->Call(Context::GetCurrent()->Global(), 1, argv);
+
+ if (try_catch.HasCaught()) {
+ FatalException(try_catch);
+ }
+
+ baton->cb.Dispose();
+ delete baton;
+ return 0;
+ }
+
+ static Handle<Value> Write(const Arguments& args)
+ {
+ HandleScope scope;
+
+ REQ_FUN_ARG(2, cb);
+
+ v8::String::Utf8Value pathStr(args[0]);
+ char* writePath = (char *) malloc(strlen(*pathStr) + 1);
+ strcpy(writePath, *pathStr);
+
+ HdfsClient* client = ObjectWrap::Unwrap<HdfsClient>(args.This());
+
+ hdfs_write_baton_t *baton = new hdfs_write_baton_t();
+ baton->client = client;
+ baton->cb = Persistent<Function>::New(cb);
+ baton->buffer = Persistent<Object>::New(args[1]->ToObject());
+ baton->writtenBytes = 0;
+ baton->filePath = writePath;
+
+ client->Ref();
+
+ eio_custom(eio_hdfs_write, EIO_PRI_DEFAULT, eio_after_hdfs_write, baton);
+ ev_ref(EV_DEFAULT_UC);
+
+ return Undefined();
+ }
+
+ static int eio_hdfs_write(eio_req *req)
+ {
+ hdfs_write_baton_t *baton = static_cast<hdfs_write_baton_t*>(req->data);
+ char* writePath = baton->filePath;
+
+ char* bufData = Buffer::Data(baton->buffer);
+ size_t bufLength = Buffer::Length(baton->buffer);
+
+ hdfsFile_internal *writeFile = hdfsOpenFile(baton->client->fs_, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
+ tSize num_written_bytes = hdfsWrite(baton->client->fs_, writeFile, (void*)bufData, bufLength);
+ hdfsFlush(baton->client->fs_, writeFile);
+ hdfsCloseFile(baton->client->fs_, writeFile);
+
+ baton->writtenBytes = num_written_bytes;
+
+ return 0;
+ }
+
+ static int eio_after_hdfs_write(eio_req *req)
+ {
+ HandleScope scope;
+ hdfs_write_baton_t *baton = static_cast<hdfs_write_baton_t*>(req->data);
+ ev_unref(EV_DEFAULT_UC);
+ baton->client->Unref();
+
+ Local<Value> argv[2];
+ if(baton->error == Undefined()) {
+ argv[0] = baton->error;
+ } else {
+ argv[0] = Local<Value>::New(Null());
+ argv[1] = Integer::New(baton->writtenBytes);
+ }
+
+ TryCatch try_catch;
+
+ baton->cb->Call(Context::GetCurrent()->Global(), 2, argv);
+
+ if (try_catch.HasCaught()) {
+ FatalException(try_catch);
+ }
+
+ baton->cb.Dispose();
+
+ delete baton;
+ return 0;
+ }
+};
+
+Persistent<FunctionTemplate> HdfsClient::s_ct;
+
+extern "C" {
+ static void init (Handle<Object> target)
+ {
+ // v8::ResourceConstraints rc;
+ // rc.set_stack_limit((uint32_t *)1);
+ // v8::SetResourceConstraints(&rc);
+
+ HdfsClient::Init(target);
+ }
+
+ NODE_MODULE(hdfs_bindings, init);
+}
View
18 wscript
@@ -1,3 +1,8 @@
+import Options
+from os import unlink, symlink
+from os.path import exists, abspath
+import os
+
def set_options(opt):
opt.tool_options("compiler_cxx")
@@ -9,8 +14,15 @@ def configure(conf):
# conf.check_cfg(package='libnotifymm-1.0', args='--cflags --libs', uselib_store='LIBNOTIFYMM')
def build(bld):
- obj = bld.new_task_gen("cxx", "shlib", "node_addon", includes='./src ./vendor /usr/lib/jvm/java-6-sun/include /usr/lib/jvm/java-6-sun/include/linux', linkflags=['-L/home/paul/src/hadoop-0.20.2-cdh3u0/c++/Linux-i386-32/lib', '-L/usr/lib/jvm/java-6-sun/jre/lib/i386/server', '-lhdfs'])
+ # obj = bld.new_task_gen("cxx", "shlib", "node_addon", includes='./src ./vendor /usr/lib/jvm/java-6-sun/include /usr/lib/jvm/java-6-sun/include/linux', linkflags=['-L/home/paul/src/hadoop-0.20.2-cdh3u0/c++/Linux-i386-32/lib', '-L/usr/lib/jvm/java-6-sun/jre/lib/i386/server', '-lhdfs'])
+ obj = bld.new_task_gen("cxx", "shlib", "node_addon", includes='./src ./vendor', linkflags=['-lhdfs'])
obj.cxxflags = ["-g", "-D_FILE_OFFSET_BITS=64", "-D_LARGEFILE_SOURCE", "-Wall"]
- obj.target = "hdfs"
- obj.source = "src/hdfs.cc"
+ obj.target = "hdfs_bindings"
+ obj.source = "src/hdfs_bindings.cc"
+def shutdown():
+ if Options.commands['clean']:
+ if exists('hdfs_bindings.node'): unlink('hdfs_bindings.node')
+ else:
+ if exists('build/default/hdfs_bindings.node') and not exists('hdfs_bindings.node'):
+ symlink('build/default/hdfs_bindings.node', 'hdfs_bindings.node')

0 comments on commit bf3a033

Please sign in to comment.
Something went wrong with that request. Please try again.