Skip to content

Commit

Permalink
add non-blocking async read method
Browse files Browse the repository at this point in the history
  • Loading branch information
Dane Springmeyer committed Jan 26, 2011
1 parent 7d39e5e commit 3770012
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 10 deletions.
2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name" : "zipfile",
"version" : "0.1.0",
"version" : "0.1.1",
"main" : "./lib/index.js",
"description" : "C++ library for handling zipfiles in node",
"keywords" : ["zipfile", "uncompress", "unzip", "zlib"],
Expand Down
174 changes: 167 additions & 7 deletions src/node_zipfile.cpp
Expand Up @@ -3,6 +3,7 @@
// stl
#include <sstream>
#include <vector>
//#include <iostream>
#include <cstring>

#include <node_buffer.h>
Expand All @@ -23,6 +24,7 @@ void ZipFile::Initialize(Handle<Object> target) {

// functions
NODE_SET_PROTOTYPE_METHOD(constructor, "readFileSync", readFileSync);
NODE_SET_PROTOTYPE_METHOD(constructor, "readFile", readFile);

// properties
constructor->InstanceTemplate()->SetAccessor(String::NewSymbol("count"), get_prop);
Expand Down Expand Up @@ -58,9 +60,9 @@ Handle<Value> ZipFile::New(const Arguments& args)
if ((za=zip_open(input_file.c_str(), 0, &err)) == NULL) {
zip_error_to_str(errstr, sizeof(errstr), err, errno);
std::stringstream s;
s << "cannot open file" << input_file << " error: " << errstr << "\n";
s << "cannot open file: " << input_file << " error: " << errstr << "\n";
return ThrowException(Exception::Error(
String::New(s.str().c_str())));
String::New(s.str().c_str())));
}

ZipFile* zf = new ZipFile(input_file);
Expand Down Expand Up @@ -102,7 +104,7 @@ Handle<Value> ZipFile::readFileSync(const Arguments& args)
return ThrowException(Exception::TypeError(
String::New("first argument must be a file name inside the zip")));

std::string input_file = TOSTR(args[0]);
std::string name = TOSTR(args[0]);

// TODO - enforce valid index
ZipFile* zf = ObjectWrap::Unwrap<ZipFile>(args.This());
Expand All @@ -115,21 +117,21 @@ Handle<Value> ZipFile::readFileSync(const Arguments& args)
for (i=0; i<num; i++) {
struct zip_stat st;
zip_stat_index(zf->archive_, i, 0, &st);
if (st.name == input_file) {
if (st.name == name) {
idx = i;
}
}

if (idx == -1) {
std::stringstream s;
s << "No file found by the name of: '" << input_file << "\n";
s << "No file found by the name of: '" << name << "\n";
return ThrowException(Exception::Error(String::New(s.str().c_str())));
}

if ((zf_ptr=zip_fopen_index(zf->archive_, idx, 0)) == NULL) {
zip_fclose(zf_ptr);
std::stringstream s;
s << "cannot open file #" << idx << " in " << input_file << ": archive error: " << zip_strerror(zf->archive_) << "\n";
s << "cannot open file #" << idx << " in " << name << ": archive error: " << zip_strerror(zf->archive_) << "\n";
return ThrowException(Exception::Error(String::New(s.str().c_str())));
}

Expand All @@ -146,7 +148,7 @@ Handle<Value> ZipFile::readFileSync(const Arguments& args)
if (result < 0) {
zip_fclose(zf_ptr);
std::stringstream s;
s << "error reading file #" << idx << " in " << input_file << ": archive error: " << zip_file_strerror(zf_ptr) << "\n";
s << "error reading file #" << idx << " in " << name << ": archive error: " << zip_file_strerror(zf_ptr) << "\n";
return ThrowException(Exception::Error(String::New(s.str().c_str())));
}

Expand All @@ -160,3 +162,161 @@ Handle<Value> ZipFile::readFileSync(const Arguments& args)
zip_fclose(zf_ptr);
return scope.Close(retbuf->handle_);
}

typedef struct {
ZipFile* zf;
struct zip *za;
std::string name;
bool error;
std::string error_name;
std::vector<unsigned char> data;
//Handle<Function> fn;
Persistent<Function> cb;
} closure_t;


Handle<Value> ZipFile::readFile(const Arguments& args)
{
HandleScope scope;

if (args.Length() < 2)
return ThrowException(Exception::TypeError(
String::New("requires two arguments, the name of a file and a callback")));

// first arg must be name
if(!args[0]->IsString())
return ThrowException(Exception::TypeError(
String::New("first argument must be a file name inside the zip")));

// last arg must be function callback
if (!args[args.Length()-1]->IsFunction())
return ThrowException(Exception::TypeError(
String::New("last argument must be a callback function")));

std::string name = TOSTR(args[0]);

ZipFile* zf = ObjectWrap::Unwrap<ZipFile>(args.This());

closure_t *closure = new closure_t();


// libzip is not threadsafe so we cannot use the zf->archive_
// instead we open a new zip archive for each thread
struct zip *za;
int err;
char errstr[1024];
if ((za=zip_open(zf->file_name_.c_str() , 0, &err)) == NULL) {
zip_error_to_str(errstr, sizeof(errstr), err, errno);
std::stringstream s;
s << "cannot open file: " << zf->file_name_ << " error: " << errstr << "\n";
zip_close(za);
return ThrowException(Exception::Error(
String::New(s.str().c_str())));
}

closure->zf = zf;
closure->za = za;
closure->error = false;
closure->name = name;
closure->cb = Persistent<Function>::New(Handle<Function>::Cast(args[args.Length()-1]));
eio_custom(EIO_ReadFile, EIO_PRI_DEFAULT, EIO_AfterReadFile, closure);
ev_ref(EV_DEFAULT_UC);
zf->Ref();
return Undefined();
}


int ZipFile::EIO_ReadFile(eio_req *req)
{
closure_t *closure = static_cast<closure_t *>(req->data);

struct zip_file *zf_ptr=NULL;

int idx = -1;
int num = zip_get_num_files(closure->za);
int i = 0;
for (i=0; i<num; i++) {
struct zip_stat st;
zip_stat_index(closure->za, i, 0, &st);
if (st.name == closure->name) {
idx = i;
}
}

if (idx == -1) {
std::stringstream s;
s << "No file found by the name of: '" << closure->name << "\n";
closure->error = true;
closure->error_name = s.str();

} else {

if ((zf_ptr = zip_fopen_index(closure->za, idx, 0)) == NULL) {
std::stringstream s;
s << "cannot open file #" << idx << " in "
<< closure->name << ": archive error: " << zip_strerror(closure->za) << "\n";
closure->error = true;
closure->error_name = s.str();

} else {

struct zip_stat st;
zip_stat_index(closure->za, idx, 0, &st);
closure->data.clear();
closure->data.resize( st.size );

int result = 0;
result = (int)zip_fread( zf_ptr, reinterpret_cast<void*> (&closure->data[0]), closure->data.size() );

if (result < 0) {
std::stringstream s;
s << "error reading file #" << idx << " in "
<< closure->name << ": archive error: " << zip_file_strerror(zf_ptr) << "\n";
closure->error = true;
closure->error_name = s.str();

}
}
}

//zip_close(closure->za);
zip_fclose(zf_ptr);
return 0;
}

int ZipFile::EIO_AfterReadFile(eio_req *req)
{
HandleScope scope;

closure_t *closure = static_cast<closure_t *>(req->data);
ev_unref(EV_DEFAULT_UC);

TryCatch try_catch;

if (closure->error) {
Local<Value> argv[1] = { Exception::Error(String::New(closure->error_name.c_str())) };
//Local<Value> argv[2] = { Exception::Error(String::New(closure->error_name.c_str())),
// Local<Value>::New(Null()) };
closure->cb->Call(Context::GetCurrent()->Global(), 2, argv);
} else {
#if NODE_VERSION_AT_LEAST(0,3,0)
node::Buffer *retbuf = Buffer::New((char *)&closure->data[0],closure->data.size());
#else
node::Buffer *retbuf = Buffer::New(closure->data.size());
std::memcpy(retbuf->data(), (char *)&closure->data[0], closure->data.size());
#endif
Local<Value> argv[2] = { Local<Value>::New(Null()), Local<Value>::New(retbuf->handle_) };
closure->cb->Call(Context::GetCurrent()->Global(), 2, argv);
}

if (try_catch.HasCaught()) {
FatalException(try_catch);
//try_catch.ReThrow();
}

closure->zf->Unref();
closure->cb.Dispose();
delete closure;
return 0;
}

8 changes: 7 additions & 1 deletion src/node_zipfile.hpp
Expand Up @@ -26,13 +26,19 @@ class ZipFile: public node::ObjectWrap {
static Handle<Value> get_prop(Local<String> property,
const AccessorInfo& info);

// Sync
static Handle<Value> readFileSync(const Arguments& args);

// Async
static Handle<Value> readFile(const Arguments& args);
static int EIO_ReadFile(eio_req *req);
static int EIO_AfterReadFile(eio_req *req);

ZipFile(std::string const& file_name);

private:
~ZipFile();
std::string const& file_name_;
std::string const file_name_;
struct zip *archive_;
};

Expand Down
31 changes: 30 additions & 1 deletion test.js
Expand Up @@ -49,8 +49,10 @@ function mkdirP (p, mode, f) {
});
};

// test writing with Sync reading method
// and sync node writing functions
zf.names.forEach(function(name) {
var uncompressed = path.join('/tmp',name);
var uncompressed = path.join('/tmp/sync',name);
var dirname = path.dirname(uncompressed);
mkdirP(dirname, 0755 , function(err) {
if (err && err.errno != constants.EEXIST) throw err;
Expand All @@ -63,4 +65,31 @@ zf.names.forEach(function(name) {
})
})


// test writing with Async reading method
// and async node writing functions
zf.names.forEach(function(name) {
var uncompressed = path.join('/tmp/async',name);
var dirname = path.dirname(uncompressed);
mkdirP(dirname, 0755 , function(err) {
if (err && err.errno != constants.EEXIST) throw err;
if (path.extname(name)) {
zf.readFile(name, function(err, buffer) {
if (err) throw err;
fs.open(uncompressed,'w',0755, function(err,fd){
if (err) throw err;
fs.write(fd, buffer, 0, buffer.length, null, function(err,written) {
if (err) throw err;
// written is number of bytes written
assert.ok(written > 0);
fs.close(fd, function(err) {
if (err) throw err;
});
});
});
});
}
})
})

console.log('All tests pass...');

0 comments on commit 3770012

Please sign in to comment.