From bf3a03375469922421d299cf97f62a78d092702e Mon Sep 17 00:00:00 2001 From: Horaci Cuevas Date: Wed, 7 Sep 2011 19:28:28 +0100 Subject: [PATCH] Initial rewrite --- .gitignore | 4 + conf/hdfs-site.xml | 60 +++++ conf/mapred-site.xml | 8 + conf/slaves | 1 + demo/demo.js | 54 +++-- node-hdfs.js | 129 +++++++++++ package.json | 14 ++ run | 11 + src/hdfs.cc | 226 ------------------- src/hdfs_bindings.cc | 522 +++++++++++++++++++++++++++++++++++++++++++ wscript | 18 +- 11 files changed, 798 insertions(+), 249 deletions(-) create mode 100644 conf/hdfs-site.xml create mode 100644 conf/mapred-site.xml create mode 100644 conf/slaves create mode 100644 node-hdfs.js create mode 100644 package.json create mode 100755 run delete mode 100644 src/hdfs.cc create mode 100644 src/hdfs_bindings.cc diff --git a/.gitignore b/.gitignore index a179a9f..35aac6d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,6 @@ build/ .lock-wscript +hdfs_bindings.node +hadoop_cluster.xml +core-site.xml + diff --git a/conf/hdfs-site.xml b/conf/hdfs-site.xml new file mode 100644 index 0000000..768882b --- /dev/null +++ b/conf/hdfs-site.xml @@ -0,0 +1,60 @@ + + + + + + + + + dfs.replication + 1 + Default block replication. + The actual number of replications can be specified when the file is created. + The default is used if replication is not specified in create time. + + + + + dfs.support.append + true + Allow appends to files. + + + + + dfs.datanode.address + 0.0.0.0:50012 + + The address where the datanode server will listen to. + If the port is 0 then the server will start on a free port. + + + + + dfs.datanode.http.address + 0.0.0.0:50079 + + The datanode http server address and port. + If the port is 0 then the server will start on a free port. + + + + + dfs.datanode.ipc.address + 0.0.0.0:50022 + + The datanode ipc server address and port. + If the port is 0 then the server will start on a free port. + + + + + dfs.http.address + 0.0.0.0:50072 + + The address and the base port where the dfs namenode web ui will listen on. + If the port is 0 then the server will start on a free port. + + + + diff --git a/conf/mapred-site.xml b/conf/mapred-site.xml new file mode 100644 index 0000000..970c8fe --- /dev/null +++ b/conf/mapred-site.xml @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/conf/slaves b/conf/slaves new file mode 100644 index 0000000..2fbb50c --- /dev/null +++ b/conf/slaves @@ -0,0 +1 @@ +localhost diff --git a/demo/demo.js b/demo/demo.js index ad231a4..4a3713d 100644 --- a/demo/demo.js +++ b/demo/demo.js @@ -1,25 +1,39 @@ -var h = require('../build/default/hdfs.node'); -var hi = new h.Hdfs(); -var data = new Buffer("Hello, my name is Paul. This is an example of what to do.", encoding='utf8') +var sys = require('sys') + , HDFS = require('../node-hdfs') -var writtenBytes = hi.write("/tmp/testfile.txt", data, function(bytes) { - console.log("Wrote file with " + bytes + " bytes.\n") - console.log("About to start reading byes...") - - hi.read("/tmp/testfile.txt", function(data) { - console.log("Data was: " + data) - }) +var hdfs = new HDFS({host:"default", port:0}); - console.log("Finished asking to read byes...") -}) -console.log("Finished outer write\n") +var hdfs_file_path = "/tmp/test.txt" +var local_out_path = "/tmp/test.out"; -// console.log("Wrote " + writtenBytes + " bytes") +// Stat File +hdfs.stat(hdfs_file_path, function(err, data) { + if(!err) { + console.log("File stat of '" + data.path + "' is: " + JSON.stringify(data)); + // => {"type":"file","path":"/tmp/test","size":183,"replication":1,"block_size":33554432,"owner":"horaci","group":"wheel","permissions":420,"last_mod":1315326370,"last_access":0} + } +}) +// Read file +hdfs.read(hdfs_file_path, 1024*1024, function(reader) { + var readed = 0; + reader.on("open", function(handle) { + console.log("File " + hdfs_file_path + " opened.") + }); + reader.on("data", function(data) { + readed += data.length; + console.log("readed " + data.length + " bytes (" + readed +")"); + }); + reader.on("end", function(err) { + if(!err) { + console.log("Finished reading data - Total readed: " + readed); + } + }); +}) -// hi.openForWriting("/tmp/tetfile.txt", function(f) { -// f.write(buffer, function(bytes) { -// console.log("I just wrote some bytes"); -// }) -// }) -// +// Copy file to local fs (in parallel with previous read file) +hdfs.copyToLocalPath(hdfs_file_path, local_out_path, function(err, readed) { + if(!err) { + console.log(readed + " bytes copied from " + hdfs_file_path + " to " + local_out_path); + } +}) diff --git a/node-hdfs.js b/node-hdfs.js new file mode 100644 index 0000000..f8c3652 --- /dev/null +++ b/node-hdfs.js @@ -0,0 +1,129 @@ +var sys = require('sys') + , fs = require('fs') + , EventEmitter = require('events').EventEmitter + , HDFSBindings = require('./hdfs_bindings') + +var HDFS = new HDFSBindings.Hdfs(); + +var modes = { + O_RDONLY : 0x0000, + O_WRONLY : 0x0001, + O_RDWR : 0x0002, + O_APPEND : 0x0008, + O_CREAT : 0x0200, + O_TRUNC : 0x0400 +} + +module.exports = function(options) { + this.host = options.host || "default"; + this.port = options.port || 0; + this.connected = false; + + var self = this; + + this.connect = function() { + if(!this.connected) { + HDFS.connect(self.host, self.port); + this.connected = true; + } + } + + this.stat = function(path, cb) { + self.connect(); + HDFS.stat(path, cb); + } + + this.open = function(path, mode, cb) { + self.connect(); + HDFS.open(path, mode, cb); + } + + this.close = function(handle, cb) { + self.connect(); + HDFS.close(handle, cb); + } + + this.read = function(path, bufferSize, cb) { + if (!cb || typeof cb != "function") { + cb = bufferSize; + bufferSize = 1024*1024; + } + + self.connect(); + var reader = new HDFSReader(path, bufferSize); + if(cb) { + cb(reader); + } else { + return reader; + } + } + + this.copyToLocalPath = function(srcPath, dstPath, options, cb) { + if (!cb || typeof cb != "function") { + cb = options; + options = {encoding: null, mode:0666, flags: 'w'}; + } + var stream = fs.createWriteStream(dstPath, options); + var readed = 0; + var bufferSize = options.bufferSize || 1024*1024; // 1mb chunks by default + + stream.once('open', function(fd) { + self.read(srcPath, bufferSize, function(rh) { + rh.on('data', function(data) { + stream.write(data); + readed += data.length; + }); + rh.once('end', function(err) { + stream.end(); + cb(err, readed); + }); + }) + }); + } +} + + +var HDFSReader = function(path, bufferSize) { + var self = this; + + this.handle = null; + this.offset = 0; + this.length = 0; + this.bufferSize = bufferSize || 1024*1024; + + this.read = function() { + HDFS.read(self.handle, self.offset, self.bufferSize, function(data) { + if(!data || data.length == 0) { + self.end(); + } else { + self.emit("data", data); + self.offset += data.length; + data.length < self.bufferSize ? self.end() : self.read(); + } + }); + }; + + this.end = function(err) { + if(self.handle) { + HDFS.close(self.handle, function() { + self.emit("end", err); + }) + } else { + self.emit("end", err); + } + } + + HDFS.open(path, modes.O_RDONLY, function(err, handle) { + if(err) { + self.end(err); + } else { + self.emit("open", handle); + self.handle = handle; + self.read(); + } + }); + + EventEmitter.call(this); +} + +sys.inherits(HDFSReader, EventEmitter); diff --git a/package.json b/package.json new file mode 100644 index 0000000..bb77fe5 --- /dev/null +++ b/package.json @@ -0,0 +1,14 @@ +{ "name": "node-hdfs", + "version": "0.0.1", + "author": "Forward", + "contributors": [ + { "name": "Paul Ingles", "email": "paul@forward.co.uk"}, + { "name": "Horaci Cuevas", "email": "horaci@forward.co.uk" } + ] + "description": "A node module for accessing Hadoop's file system (HDFS)", + "scripts": { "preinstall": "node-waf configure build" }, + "main": "./node-hdfs", + "engines": { "node": ">0.4.11" }, + "keywords": [ "hdfs", "hadoop", "fs", "libhdfs" ], + "repository": { "type": "git", "url": "http://github.com/forward/node-hdfs.git" } +} \ No newline at end of file diff --git a/run b/run new file mode 100755 index 0000000..b1fd8e9 --- /dev/null +++ b/run @@ -0,0 +1,11 @@ +export HADOOP_HOME=/usr/local/hadoop + +# Path to hadoop libs +export CLASSPATH=$HADOOP_HOME/hadoop-core-0.20.2-cdh3u0.jar:$HADOOP_HOME/lib/commons-logging-1.0.4.jar + +# Add conf path where core-site.xml is +export CLASSPATH=$CLASSPATH::./conf +node-waf clean +node-waf configure +node-waf build +node demo/demo.js \ No newline at end of file diff --git a/src/hdfs.cc b/src/hdfs.cc deleted file mode 100644 index bf2edfa..0000000 --- a/src/hdfs.cc +++ /dev/null @@ -1,226 +0,0 @@ -/* This code is PUBLIC DOMAIN, and is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND. See the accompanying - * LICENSE file. - */ - -#include -#include -#include -#include "../vendor/hdfs.h" - -using namespace node; -using namespace v8; - -#define REQ_FUN_ARG(I, VAR) \ - if (args.Length() <= (I) || !args[I]->IsFunction()) \ - return ThrowException(Exception::TypeError( \ - String::New("Argument " #I " must be a function"))); \ - Local VAR = Local::Cast(args[I]); - -class HdfsClient: ObjectWrap -{ -private: - int m_count; -public: - - static Persistent s_ct; - static void Init(Handle target) - { - HandleScope scope; - - Local t = FunctionTemplate::New(New); - - s_ct = Persistent::New(t); - s_ct->InstanceTemplate()->SetInternalFieldCount(1); - s_ct->SetClassName(String::NewSymbol("Hdfs")); - - NODE_SET_PROTOTYPE_METHOD(s_ct, "write", Write); - NODE_SET_PROTOTYPE_METHOD(s_ct, "read", Read); - - target->Set(String::NewSymbol("Hdfs"), s_ct->GetFunction()); - } - - HdfsClient() : - m_count(0) - { - } - - ~HdfsClient() - { - } - - static Handle New(const Arguments& args) - { - HandleScope scope; - HdfsClient* hw = new HdfsClient(); - hw->Wrap(args.This()); - return args.This(); - } - - struct hdfs_write_baton_t { - HdfsClient *client; - char* filePath; - tSize writtenBytes; - Persistent cb; - Persistent buffer; - }; - - struct hdfs_read_baton_t { - HdfsClient *client; - char *filePath; - Persistent cb; - Persistent buffer; - }; - - static Handle Read(const Arguments& args) - { - HandleScope scope; - - REQ_FUN_ARG(1, cb); - - v8::String::Utf8Value pathStr(args[0]); - char* readPath = new char[strlen(*pathStr) + 1]; - strcpy(readPath, *pathStr); - - HdfsClient* client = ObjectWrap::Unwrap(args.This()); - - hdfs_read_baton_t *baton = new hdfs_read_baton_t(); - baton->client = client; - baton->cb = Persistent::New(cb); - baton->filePath = readPath; - - client->Ref(); - - eio_custom(eio_hdfs_read, EIO_PRI_DEFAULT, eio_after_hdfs_read, baton); - uv_ref(); - - return Undefined(); - } - - static int eio_hdfs_read(eio_req *req) - { - hdfs_read_baton_t *baton = static_cast(req->data); - char* readPath = baton->filePath; - - hdfsFS fs = hdfsConnect("default", 0); - hdfsFile readFile = hdfsOpenFile(fs, readPath, O_RDONLY, 0, 0, 0); - int bytesAvailable = hdfsAvailable(fs, readFile); - char *buf = new char[bytesAvailable + 1]; - memset(buf, 0, bytesAvailable + 1); - - int readBytes = hdfsRead(fs, readFile, (void*)buf, bytesAvailable); - - Buffer* buffer = Buffer::New(buf, bytesAvailable); - baton->buffer = buffer->handle_; - - hdfsCloseFile(fs, readFile); - - return 0; - } - - static int eio_after_hdfs_read(eio_req *req) - { - HandleScope scope; - hdfs_read_baton_t *baton = static_cast(req->data); - uv_unref(); - baton->client->Unref(); - - Handle argv[1]; - argv[0] = Local::New(baton->buffer); - - TryCatch try_catch; - baton->cb->Call(Context::GetCurrent()->Global(), 1, argv); - - if (try_catch.HasCaught()) { - FatalException(try_catch); - } - - baton->cb.Dispose(); - - delete baton; - return 0; - } - - static Handle Write(const Arguments& args) - { - HandleScope scope; - - REQ_FUN_ARG(2, cb); - - v8::String::Utf8Value pathStr(args[0]); - char* writePath = (char *) malloc(strlen(*pathStr) + 1); - strcpy(writePath, *pathStr); - - HdfsClient* client = ObjectWrap::Unwrap(args.This()); - - hdfs_write_baton_t *baton = new hdfs_write_baton_t(); - baton->client = client; - baton->cb = Persistent::New(cb); - baton->buffer = Persistent::New(args[1]->ToObject()); - baton->writtenBytes = 0; - baton->filePath = writePath; - - client->Ref(); - - eio_custom(eio_hdfs_write, EIO_PRI_DEFAULT, eio_after_hdfs_write, baton); - uv_ref(); - - return Undefined(); - } - - static int eio_hdfs_write(eio_req *req) - { - hdfs_write_baton_t *baton = static_cast(req->data); - char* writePath = baton->filePath; - - hdfsFS fs = hdfsConnect("default", 0); - - char* bufData = Buffer::Data(baton->buffer); - size_t bufLength = Buffer::Length(baton->buffer); - - hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0); - tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)bufData, bufLength); - - hdfsFlush(fs, writeFile); - hdfsCloseFile(fs, writeFile); - - baton->writtenBytes = num_written_bytes; - - return 0; - } - - static int eio_after_hdfs_write(eio_req *req) - { - HandleScope scope; - hdfs_write_baton_t *baton = static_cast(req->data); - uv_unref(); - baton->client->Unref(); - - Local argv[1]; - argv[0] = Integer::New(baton->writtenBytes); - - TryCatch try_catch; - - baton->cb->Call(Context::GetCurrent()->Global(), 1, argv); - - if (try_catch.HasCaught()) { - FatalException(try_catch); - } - - baton->cb.Dispose(); - - delete baton; - return 0; - } -}; - -Persistent HdfsClient::s_ct; - -extern "C" { - static void init (Handle target) - { - HdfsClient::Init(target); - } - - NODE_MODULE(hdfs, init); -} diff --git a/src/hdfs_bindings.cc b/src/hdfs_bindings.cc new file mode 100644 index 0000000..80d2625 --- /dev/null +++ b/src/hdfs_bindings.cc @@ -0,0 +1,522 @@ +/* This code is PUBLIC DOMAIN, and is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND. See the accompanying + * LICENSE file. + */ + +#include +#include +#include +#include +#include +#include "../vendor/hdfs.h" + +using namespace node; +using namespace v8; + +#define REQ_FUN_ARG(I, VAR) \ + if (args.Length() <= (I) || !args[I]->IsFunction()) \ + return ThrowException(Exception::TypeError( \ + String::New("Argument " #I " must be a function"))); \ + Local VAR = Local::Cast(args[I]); + +class HdfsClient : public ObjectWrap +{ +private: + int m_count; + hdfsFS fs_; + + int fh_count_; + hdfsFile_internal **fh_; +public: + + static Persistent s_ct; + static void Init(Handle target) + { + HandleScope scope; + + Local t = FunctionTemplate::New(New); + + s_ct = Persistent::New(t); + s_ct->InstanceTemplate()->SetInternalFieldCount(1); + s_ct->SetClassName(String::NewSymbol("HdfsBindings")); + + NODE_SET_PROTOTYPE_METHOD(s_ct, "connect", Connect); + NODE_SET_PROTOTYPE_METHOD(s_ct, "write", Write); + NODE_SET_PROTOTYPE_METHOD(s_ct, "read", Read); + NODE_SET_PROTOTYPE_METHOD(s_ct, "stat", Stat); + NODE_SET_PROTOTYPE_METHOD(s_ct, "open", Open); + NODE_SET_PROTOTYPE_METHOD(s_ct, "close", Close); + + target->Set(String::NewSymbol("Hdfs"), s_ct->GetFunction()); + } + + HdfsClient() + { + m_count = 0; + fs_ = NULL; + fh_ = (hdfsFile_internal **) calloc(1024, sizeof(hdfsFile_internal *)); + memset(fh_, 0, sizeof(fh_)); + fh_count_ = 1024; + } + + ~HdfsClient() + { + free(fh_); + } + + static Handle New(const Arguments& args) + { + HandleScope scope; + HdfsClient* client = new HdfsClient(); + client->Wrap(args.This()); + return args.This(); + } + + struct hdfs_open_baton_t { + HdfsClient *client; + char *filePath; + Persistent cb; + hdfsFile_internal *fileHandle; + int flags; + }; + + struct hdfs_write_baton_t { + HdfsClient *client; + char* filePath; + tSize writtenBytes; + Persistent cb; + Persistent buffer; + Local error; + hdfsFS fs; + }; + + struct hdfs_read_baton_t { + HdfsClient *client; + int fh; + int bufferSize; + int offset; + hdfsFile_internal *fileHandle; + Persistent cb; + char *buffer; + int readBytes; + }; + + struct hdfs_stat_baton_t { + HdfsClient *client; + char *filePath; + hdfsFileInfo *fileStat; + Persistent cb; + }; + + struct hdfs_close_baton_t { + HdfsClient *client; + int fh; + Persistent cb; + }; + + + static Handle Connect(const Arguments &args) + { + HdfsClient* client = ObjectWrap::Unwrap(args.This()); + v8::String::Utf8Value hostStr(args[0]); + client->fs_ = hdfsConnectNewInstance(*hostStr, args[1]->Int32Value()); + return Boolean::New(client->fs_ ? true : false); + } + + static Handle Stat(const Arguments &args) + { + HandleScope scope; + + REQ_FUN_ARG(1, cb); + + HdfsClient* client = ObjectWrap::Unwrap(args.This()); + + v8::String::Utf8Value pathStr(args[0]); + char* statPath = new char[strlen(*pathStr) + 1]; + strcpy(statPath, *pathStr); + + hdfs_stat_baton_t *baton = new hdfs_stat_baton_t(); + baton->client = client; + baton->cb = Persistent::New(cb); + baton->filePath = statPath; + baton->fileStat = NULL; + + client->Ref(); + + eio_custom(eio_hdfs_stat, EIO_PRI_DEFAULT, eio_after_hdfs_stat, baton); + ev_ref(EV_DEFAULT_UC); + + return Undefined(); + } + + static int eio_hdfs_stat(eio_req *req) + { + hdfs_stat_baton_t *baton = static_cast(req->data); + baton->fileStat = hdfsGetPathInfo(baton->client->fs_, baton->filePath); + return 0; + } + + static int eio_after_hdfs_stat(eio_req *req) + { + HandleScope scope; + hdfs_stat_baton_t *baton = static_cast(req->data); + ev_unref(EV_DEFAULT_UC); + baton->client->Unref(); + + Handle argv[2]; + + if(baton->fileStat) { + Persistent statObject = Persistent::New(Object::New());; + + char *path = baton->fileStat->mName; + + char kind = (char)baton->fileStat->mKind; + + statObject->Set(String::New("type"), String::New(kind == 'F' ? "file" : kind == 'D' ? "directory" : "other")); + statObject->Set(String::New("path"), String::New(path)); + statObject->Set(String::New("size"), Integer::New(baton->fileStat->mSize)); + statObject->Set(String::New("replication"), Integer::New(baton->fileStat->mReplication)); + statObject->Set(String::New("block_size"), Integer::New(baton->fileStat->mBlockSize)); + statObject->Set(String::New("owner"), String::New(baton->fileStat->mOwner)); + statObject->Set(String::New("group"), String::New(baton->fileStat->mGroup)); + statObject->Set(String::New("permissions"), Integer::New(baton->fileStat->mPermissions)); + statObject->Set(String::New("last_mod"), Integer::New(baton->fileStat->mLastMod)); + statObject->Set(String::New("last_access"), Integer::New(baton->fileStat->mLastAccess)); + + argv[0] = Local::New(Undefined()); + argv[1] = Local::New(statObject); + + hdfsFreeFileInfo(baton->fileStat, 1); + } else { + argv[0] = Local::New(String::New("File does not exist")); + argv[1] = Local::New(Undefined()); + } + + TryCatch try_catch; + baton->cb->Call(Context::GetCurrent()->Global(), 2, argv); + + if (try_catch.HasCaught()) { + FatalException(try_catch); + } + + baton->cb.Dispose(); + + delete baton; + return 0; + } + + + /**********************/ + /* Open */ + /**********************/ + // open(char *path, int flags, callback) + + static Handle Open(const Arguments &args) + { + HandleScope scope; + REQ_FUN_ARG(2, cb); + + // get client + HdfsClient* client = ObjectWrap::Unwrap(args.This()); + + // Parse path + v8::String::Utf8Value pathStr(args[0]); + char* statPath = new char[strlen(*pathStr) + 1]; + strcpy(statPath, *pathStr); + + // Initialize baton + hdfs_open_baton_t *baton = new hdfs_open_baton_t(); + baton->client = client; + baton->cb = Persistent::New(cb); + baton->filePath = statPath; + baton->fileHandle = NULL; + baton->flags = args[1]->Int32Value(); + + client->Ref(); + + // Call eio operation + eio_custom(eio_hdfs_open, EIO_PRI_DEFAULT, eio_after_hdfs_open, baton); + ev_ref(EV_DEFAULT_UC); + + return Undefined(); + } + + static int eio_hdfs_open(eio_req *req) + { + hdfs_open_baton_t *baton = static_cast(req->data); + baton->fileHandle = hdfsOpenFile(baton->client->fs_, baton->filePath, baton->flags, 0, 0, 0); + return 0; + } + + int CreateFileHandle(hdfsFile_internal *f) + { + // TODO: quick and dirty, totally inefficient! + int fh = 0; + while(fh_[fh]) fh++; + fh_[fh] = f; + return fh; + } + + hdfsFile_internal *GetFileHandle(int fh) + { + return fh_[fh]; + } + + void RemoveFileHandle(int fh) + { + fh_[fh] = NULL; + } + + static int eio_after_hdfs_open(eio_req *req) + { + HandleScope scope; + hdfs_open_baton_t *baton = static_cast(req->data); + + ev_unref(EV_DEFAULT_UC); + baton->client->Unref(); + + Handle argv[2]; + + if(baton->fileHandle) { + // create entry on global files + int fh = baton->client->CreateFileHandle(baton->fileHandle); + argv[0] = Local::New(Undefined()); + argv[1] = Local::New(Integer::New(fh)); + } else { + argv[0] = Local::New(String::New("File does not exist")); + argv[1] = Local::New(Undefined()); + } + + TryCatch try_catch; + baton->cb->Call(Context::GetCurrent()->Global(), 2, argv); + + if (try_catch.HasCaught()) { + FatalException(try_catch); + } + + baton->cb.Dispose(); + + delete baton; + return 0; + } + + /**********************/ + /* Close */ + /**********************/ + + static Handle Close(const Arguments &args) + { + HandleScope scope; + REQ_FUN_ARG(1, cb); + + // get client + HdfsClient* client = ObjectWrap::Unwrap(args.This()); + + // Initialize baton + hdfs_close_baton_t *baton = new hdfs_close_baton_t(); + baton->client = client; + baton->cb = Persistent::New(cb); + baton->fh = args[0]->Int32Value(); + + client->Ref(); + + // Call eio operation + eio_custom(eio_hdfs_close, EIO_PRI_DEFAULT, eio_after_hdfs_close, baton); + ev_ref(EV_DEFAULT_UC); + + return Undefined(); + } + + static int eio_hdfs_close(eio_req *req) + { + hdfs_close_baton_t *baton = static_cast(req->data); + hdfsFile_internal *hdfsfile = baton->client->GetFileHandle(baton->fh); + + if(hdfsfile) { + hdfsCloseFile(baton->client->fs_, hdfsfile); + baton->client->RemoveFileHandle(baton->fh); + } + + return 0; + } + + static int eio_after_hdfs_close(eio_req *req) + { + HandleScope scope; + hdfs_close_baton_t *baton = static_cast(req->data); + + ev_unref(EV_DEFAULT_UC); + baton->client->Unref(); + + TryCatch try_catch; + baton->cb->Call(Context::GetCurrent()->Global(), 0, NULL); + + if (try_catch.HasCaught()) { + FatalException(try_catch); + } + + baton->cb.Dispose(); + + delete baton; + return 0; + } + + /**********************/ + /* READ */ + /**********************/ + + // handle, offset, bufferSize, callback + static Handle Read(const Arguments &args) + { + HandleScope scope; + + REQ_FUN_ARG(3, cb); + + HdfsClient* client = ObjectWrap::Unwrap(args.This()); + + int fh = args[0]->Int32Value(); + hdfsFile_internal *fileHandle = client->GetFileHandle(fh); + + if(!fileHandle) { + return ThrowException(Exception::TypeError(String::New("Invalid file handle"))); + } + + hdfs_read_baton_t *baton = new hdfs_read_baton_t(); + baton->client = client; + baton->cb = Persistent::New(cb); + baton->fileHandle = fileHandle; + baton->offset = args[1]->Int32Value(); + baton->bufferSize = args[2]->Int32Value(); + baton->fh = fh; + + client->Ref(); + + eio_custom(eio_hdfs_read, EIO_PRI_DEFAULT, eio_after_hdfs_read, baton); + ev_ref(EV_DEFAULT_UC); + + return Undefined(); + } + + static int eio_hdfs_read(eio_req *req) + { + hdfs_read_baton_t *baton = static_cast(req->data); + baton->buffer = (char *) malloc(baton->bufferSize * sizeof(char)); + baton->readBytes = hdfsPread(baton->client->fs_, baton->fileHandle, baton->offset, baton->buffer, baton->bufferSize); + return 0; + } + + static int eio_after_hdfs_read(eio_req *req) + { + HandleScope scope; + + hdfs_read_baton_t *baton = static_cast(req->data); + ev_unref(EV_DEFAULT_UC); + baton->client->Unref(); + + Handle argv[1]; + + Buffer *b = Buffer::New(baton->buffer, baton->readBytes); + argv[0] = Local::New(b->handle_); + free(baton->buffer); + + TryCatch try_catch; + baton->cb->Call(Context::GetCurrent()->Global(), 1, argv); + + if (try_catch.HasCaught()) { + FatalException(try_catch); + } + + baton->cb.Dispose(); + delete baton; + return 0; + } + + static Handle Write(const Arguments& args) + { + HandleScope scope; + + REQ_FUN_ARG(2, cb); + + v8::String::Utf8Value pathStr(args[0]); + char* writePath = (char *) malloc(strlen(*pathStr) + 1); + strcpy(writePath, *pathStr); + + HdfsClient* client = ObjectWrap::Unwrap(args.This()); + + hdfs_write_baton_t *baton = new hdfs_write_baton_t(); + baton->client = client; + baton->cb = Persistent::New(cb); + baton->buffer = Persistent::New(args[1]->ToObject()); + baton->writtenBytes = 0; + baton->filePath = writePath; + + client->Ref(); + + eio_custom(eio_hdfs_write, EIO_PRI_DEFAULT, eio_after_hdfs_write, baton); + ev_ref(EV_DEFAULT_UC); + + return Undefined(); + } + + static int eio_hdfs_write(eio_req *req) + { + hdfs_write_baton_t *baton = static_cast(req->data); + char* writePath = baton->filePath; + + char* bufData = Buffer::Data(baton->buffer); + size_t bufLength = Buffer::Length(baton->buffer); + + hdfsFile_internal *writeFile = hdfsOpenFile(baton->client->fs_, writePath, O_WRONLY|O_CREAT, 0, 0, 0); + tSize num_written_bytes = hdfsWrite(baton->client->fs_, writeFile, (void*)bufData, bufLength); + hdfsFlush(baton->client->fs_, writeFile); + hdfsCloseFile(baton->client->fs_, writeFile); + + baton->writtenBytes = num_written_bytes; + + return 0; + } + + static int eio_after_hdfs_write(eio_req *req) + { + HandleScope scope; + hdfs_write_baton_t *baton = static_cast(req->data); + ev_unref(EV_DEFAULT_UC); + baton->client->Unref(); + + Local argv[2]; + if(baton->error == Undefined()) { + argv[0] = baton->error; + } else { + argv[0] = Local::New(Null()); + argv[1] = Integer::New(baton->writtenBytes); + } + + TryCatch try_catch; + + baton->cb->Call(Context::GetCurrent()->Global(), 2, argv); + + if (try_catch.HasCaught()) { + FatalException(try_catch); + } + + baton->cb.Dispose(); + + delete baton; + return 0; + } +}; + +Persistent HdfsClient::s_ct; + +extern "C" { + static void init (Handle target) + { + // v8::ResourceConstraints rc; + // rc.set_stack_limit((uint32_t *)1); + // v8::SetResourceConstraints(&rc); + + HdfsClient::Init(target); + } + + NODE_MODULE(hdfs_bindings, init); +} diff --git a/wscript b/wscript index 941e376..ef8f3fa 100644 --- a/wscript +++ b/wscript @@ -1,3 +1,8 @@ +import Options +from os import unlink, symlink +from os.path import exists, abspath +import os + def set_options(opt): opt.tool_options("compiler_cxx") @@ -9,8 +14,15 @@ def configure(conf): # conf.check_cfg(package='libnotifymm-1.0', args='--cflags --libs', uselib_store='LIBNOTIFYMM') def build(bld): - obj = bld.new_task_gen("cxx", "shlib", "node_addon", includes='./src ./vendor /usr/lib/jvm/java-6-sun/include /usr/lib/jvm/java-6-sun/include/linux', linkflags=['-L/home/paul/src/hadoop-0.20.2-cdh3u0/c++/Linux-i386-32/lib', '-L/usr/lib/jvm/java-6-sun/jre/lib/i386/server', '-lhdfs']) + # obj = bld.new_task_gen("cxx", "shlib", "node_addon", includes='./src ./vendor /usr/lib/jvm/java-6-sun/include /usr/lib/jvm/java-6-sun/include/linux', linkflags=['-L/home/paul/src/hadoop-0.20.2-cdh3u0/c++/Linux-i386-32/lib', '-L/usr/lib/jvm/java-6-sun/jre/lib/i386/server', '-lhdfs']) + obj = bld.new_task_gen("cxx", "shlib", "node_addon", includes='./src ./vendor', linkflags=['-lhdfs']) obj.cxxflags = ["-g", "-D_FILE_OFFSET_BITS=64", "-D_LARGEFILE_SOURCE", "-Wall"] - obj.target = "hdfs" - obj.source = "src/hdfs.cc" + obj.target = "hdfs_bindings" + obj.source = "src/hdfs_bindings.cc" +def shutdown(): + if Options.commands['clean']: + if exists('hdfs_bindings.node'): unlink('hdfs_bindings.node') + else: + if exists('build/default/hdfs_bindings.node') and not exists('hdfs_bindings.node'): + symlink('build/default/hdfs_bindings.node', 'hdfs_bindings.node')