Skip to content

Commit

Permalink
Merge main_loop_callback into event_buffer.
Browse files Browse the repository at this point in the history
  • Loading branch information
Stéphan Kochen committed Mar 3, 2015
1 parent 6a7a05f commit 1451287
Showing 1 changed file with 58 additions and 142 deletions.
200 changes: 58 additions & 142 deletions include/p1stream.h
Expand Up @@ -93,29 +93,9 @@ class threaded_loop : public lockable_mutex {
bool wait(uint64_t timeout);
};

// Wrap uv_async with std::function.
class main_loop_callback {
private:
uv_async_t async;
std::function<void ()> fn;

static void async_cb(uv_async_t* handle);

public:
void init(std::function<void ()> fn_);
void destroy();

void send();
};


// ----- Event handling -----

struct event;
class event_buffer;
struct event_buffer_copy;
class buffer_slicer;

// Threads send events to the main thread, which can contain data or log
// messages. Events are queued in a fixed size buffer, then copy'd out by
// the main thread to be emitted one by one.
Expand All @@ -132,78 +112,73 @@ struct event {
inline uint32_t pad();
// Total size of struct + data + padding.
inline uint32_t total_size();
// Return a pointer past this event, accounting for 32-bit alignment.
event *next();
};

// Helper that is used to create buffer slices.
class buffer_slicer {
private:
v8::Isolate *isolate_;
v8::Local<v8::Object> buffer_;
v8::Local<v8::Value> buffer_proto_;
v8::Local<v8::String> length_sym_;
v8::Local<v8::String> parent_sym_;

public:
buffer_slicer(v8::Local<v8::Object> buffer);

// Returns a new buffer object for the given data and length, which should
// be part of the original buffer memory area.
v8::Local<v8::Object> slice(char *data, uint32_t length);
};

// Signature for function that creates a JS object from an event.
typedef v8::Local<v8::Value> (*event_transform)(v8::Isolate *isolate, event &ev, buffer_slicer &slicer);

// Event buffer containing consecutive events.
//
// All access to instances of this class should be within a lock. After
// calling one of the emit*() methods, the user should somehow signal the
// main thread to do a copy_out()
class event_buffer final {
private:
uv_async_t async_;

int size_;
int used_;
int stalled_;
char *data_;

lockable *lock_;
event_transform transform_;
v8::Isolate *isolate_;
v8::Persistent<v8::Context> context_;
v8::Persistent<v8::Function> callback_;

void signal_main_thread();
static void async_cb(uv_async_t *handle);

public:
event_buffer(int size = 65536);
event_buffer(
lockable *lock = nullptr, event_transform transform = nullptr,
int size = 4096 // 4 KiB default event buffer
);
~event_buffer();

// The following emit methods all return NULL if the buffer stalled.
// Set the callback to call for events.
void set_callback(v8::Handle<v8::Context> context, v8::Handle<v8::Function> callback);
// Flush buffered events to the callback. Usually happens automatically,
// but useful to call before destruction.
void flush();

// Emit an empty event.
event *emit(uint32_t id);
// The following emit methods all return nullptr if the buffer stalled.
// They should be called with the lock held.

// Emit an event with optional data. Caller must fill the event data.
event *emit(uint32_t id, int size = 0);
// Emit an event with a formatted string.
event *emitv(uint32_t id, const char *format, va_list ap);
event *emitf(uint32_t id, const char *format, ...)
__attribute__((format(printf, 3, 4)));
// Emit an event with raw data. Caller must fill the returned event data.
event *emitd(uint32_t id, int size);

// Create a copy of the buffer data and clear the buffer itself.
event_buffer_copy *copy_out();
};

// Contains a copy of an event_buffer.
struct event_buffer_copy {
int size;
int stalled;
char data[0];

// Call a JS function for each event in the buffer copy. Call this after
// releasing the lock that was holding the original event_buffer.
//
// The transform function should create a JS value for the given event
// data. The function is not called for standard events defined below.
//
// Calling this method transfers ownership of the entire struct, as it will
// be tied to a Buffer object.
void callback(
v8::Isolate *isolate, v8::Handle<v8::Object> recv, v8::Handle<v8::Function> fn,
v8::Local<v8::Value> (*transform)(v8::Isolate *isolate, event &ev, buffer_slicer &slicer)
);
};

// Helper that creates buffer slices. The transform function from
// event_buffer_copy::callback receives a stack allocated instance of this as
// a parameter.
class buffer_slicer {
private:
v8::Isolate *isolate_;
v8::Local<v8::Object> buffer_;
v8::Local<v8::Value> buffer_proto_;
v8::Local<v8::String> length_sym_;
v8::Local<v8::String> parent_sym_;

public:
buffer_slicer(v8::Local<v8::Object> buffer);

// Returns a new buffer object for the given data and length, which should
// be part of the original buffer memory area.
v8::Local<v8::Object> slice(char *data, uint32_t length);
};

// Standard event IDs.
Expand Down Expand Up @@ -448,26 +423,6 @@ inline bool threaded_loop::wait(uint64_t timeout)
return uv_cond_timedwait(&cond, &mutex, timeout) == 0;
}

inline void main_loop_callback::init(std::function<void ()> fn_)
{
fn = fn_;
auto loop = uv_default_loop();
if (uv_async_init(loop, &async, async_cb))
abort();
}

inline void main_loop_callback::destroy()
{
uv_close((uv_handle_t *) &async, NULL);
async.loop = NULL;
}

inline void main_loop_callback::send()
{
if (uv_async_send(&async))
abort();
}

inline video_mixer::video_mixer()
{
}
Expand Down Expand Up @@ -555,35 +510,28 @@ inline uint32_t event::total_size()
return sizeof(event) + size + pad();
}

inline event *event::next()
{
return (event *) (data + size + pad());
}

inline event_buffer::event_buffer(int size)
: size_(size), used_(0), stalled_(0)
{
data_ = new char[size_];
}

inline event_buffer::~event_buffer()
inline void event_buffer::signal_main_thread()
{
delete[] data_;
if (used_ == 0 && stalled_ == 0) {
if (uv_async_send(&async_))
abort();
}
}

inline event *event_buffer::emit(uint32_t id)
inline event *event_buffer::emit(uint32_t id, int size)
{
const int data_space = size_ - used_ - sizeof(event);
event * const ev = (event *) (data_ + used_);
signal_main_thread();

if (data_space < 0) {
if (size > data_space) {
stalled_++;
return NULL;
return nullptr;
}
else {
ev->id = id;
ev->size = 0;
used_ += sizeof(event);
used_ += ev->total_size();
return ev;
}
}
Expand All @@ -592,14 +540,15 @@ inline event *event_buffer::emitv(uint32_t id, const char *format, va_list ap)
{
const int data_space = size_ - used_ - sizeof(event);
event * const ev = (event *) (data_ + used_);
signal_main_thread();

int size = vsnprintf(ev->data, data_space, format, ap);
if (size < 0) {
return NULL;
return nullptr;
}
else if (size > data_space - 1) {
stalled_++;
return NULL;
return nullptr;
}
else {
ev->id = id;
Expand All @@ -618,39 +567,6 @@ inline event *event_buffer::emitf(uint32_t id, const char *format, ...)
return ev;
}

inline event *event_buffer::emitd(uint32_t id, int size)
{
const int data_space = size_ - used_ - sizeof(event);
event * const ev = (event *) (data_ + used_);

if (size > data_space) {
stalled_++;
return NULL;
}
else {
ev->id = id;
ev->size = size;
used_ += ev->total_size();
return ev;
}
}

inline event_buffer_copy *event_buffer::copy_out()
{
if (used_ == 0)
return NULL;

auto *ret = (event_buffer_copy *) new char[sizeof(event_buffer_copy) + used_];
memcpy(ret->data, data_, used_);
ret->size = used_;
ret->stalled = stalled_;

used_ = 0;
stalled_ = 0;

return ret;
}


} // namespace p1stream

Expand Down

0 comments on commit 1451287

Please sign in to comment.