Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datachannel fix for sending arrayBuffers #170

Closed
wants to merge 12 commits into from
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ language: node_js
node_js:
- "0.10"
- "0.11"
- "0.12"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was pulled in from another patch, so you can revert it here.


before_install:
- sudo apt-get update -qq
Expand Down
5 changes: 5 additions & 0 deletions lib/datachannel.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ function RTCDataChannel(internalDC) {
return this.RTCDataStates[state];
}
},
'bufferedAmount': {
get: function getReadyState() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getReadyState is the wrong name for this function

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was pulled in from another patch, so you can revert the implementation for bufferedAmount. If you update your local copy it will be there.

return internalDC.bufferedAmount;
}
},
'binaryType': {
get: function getBinaryType() {
var type = internalDC.binaryType;
Expand Down
190 changes: 131 additions & 59 deletions src/datachannel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,57 +156,77 @@ void DataChannel::Run(uv_async_t* handle, int status)
uv_mutex_unlock(&self->lock);

TRACE_U("evt.type", evt.type);
if(DataChannel::ERROR & evt.type)
{
DataChannel::ErrorEvent* data = static_cast<DataChannel::ErrorEvent*>(evt.data);
v8::Local<v8::Function> callback = v8::Local<v8::Function>::Cast(dc->Get(NanNew("onerror")));
v8::Local<v8::Value> argv[1];
argv[0] = v8::Exception::Error(NanNew(data->msg));
NanMakeCallback(dc, callback, 1, argv);
} else if(DataChannel::STATE & evt.type)
{
StateEvent* data = static_cast<StateEvent*>(evt.data);
v8::Local<v8::Function> callback = v8::Local<v8::Function>::Cast(dc->Get(NanNew("onstatechange")));
v8::Local<v8::Value> argv[1];
v8::Local<v8::Integer> state = NanNew<v8::Integer>((data->state));
argv[0] = state;
NanMakeCallback(dc, callback, 1, argv);

if(webrtc::DataChannelInterface::kClosed == self->_jingleDataChannel->state()) {
do_shutdown = true;
}
} else if(DataChannel::MESSAGE & evt.type)
{
MessageEvent* data = static_cast<MessageEvent*>(evt.data);
v8::Local<v8::Function> callback = v8::Local<v8::Function>::Cast(dc->Get(NanNew("onmessage")));

v8::Local<v8::Value> argv[1];

if(data->binary) {

switch (evt.type) {
case DataChannel::ERROR:
{
DataChannel::ErrorEvent* data = static_cast<DataChannel::ErrorEvent*>(evt.data);
v8::Local<v8::Function> callback = v8::Local<v8::Function>::Cast(dc->Get(NanNew("onerror")));

v8::Local<v8::Value> argv[1];

argv[0] = v8::Exception::Error(NanNew(data->msg));

NanMakeCallback(dc, callback, 1, argv);
}

break;
case DataChannel::STATE:
{
StateEvent* data = static_cast<StateEvent*>(evt.data);

v8::Local<v8::Function> callback = v8::Local<v8::Function>::Cast(dc->Get(NanNew("onstatechange")));
v8::Local<v8::Value> argv[1];
v8::Local<v8::Integer> state = NanNew<v8::Integer>((data->state));

argv[0] = state;
NanMakeCallback(dc, callback, 1, argv);

if(webrtc::DataChannelInterface::kClosed == self->_jingleDataChannel->state()) {
do_shutdown = true;
}
}

break;
case DataChannel::MESSAGE:
{
MessageEvent* data = static_cast<MessageEvent*>(evt.data);
v8::Local<v8::Function> callback = v8::Local<v8::Function>::Cast(dc->Get(NanNew("onmessage")));

v8::Local<v8::Value> argv[1];

if (data->binary) {
#if NODE_MODULE_VERSION > 0x000B
v8::Local<v8::ArrayBuffer> array = v8::ArrayBuffer::New(
v8::Isolate::GetCurrent(), data->message, data->size);
v8::Local<v8::ArrayBuffer> array = v8::ArrayBuffer::New(
v8::Isolate::GetCurrent(), data->message, data->size);
#else
v8::Local<v8::Object> array = NanNew(ArrayBufferConstructor)->NewInstance();
array->SetIndexedPropertiesToExternalArrayData(
data->message, v8::kExternalByteArray, data->size);
array->ForceSet(NanNew("byteLength"), NanNew<v8::Integer>(data->size));
#endif
NanMakeWeakPersistent(array, data, &MessageWeakCallback);

argv[0] = array;
NanMakeCallback(dc, callback, 1, argv);
} else {
v8::Local<v8::String> str = NanNew(data->message, data->size);

// cleanup message event
delete[] data->message;
data->message = NULL;
delete data;

argv[0] = str;
NanMakeCallback(dc, callback, 1, argv);
}
v8::Local<v8::Object> array = NanNew(ArrayBufferConstructor)->NewInstance();
array->SetIndexedPropertiesToExternalArrayData(
data->message, v8::kExternalByteArray, data->size);
array->ForceSet(NanNew("byteLength"), NanNew<v8::Integer>(data->size));
#endif
NanMakeWeakPersistent(array, data, &MessageWeakCallback);

argv[0] = array;
NanMakeCallback(dc, callback, 1, argv);
} else {
v8::Local<v8::String> str = NanNew(data->message, data->size);

// cleanup message event
delete[] data->message;
data->message = NULL;
delete data;

argv[0] = str;
NanMakeCallback(dc, callback, 1, argv);
}
}

break;
default:
NanThrowTypeError("Unknown WEBRTC Datachannel Event");

break;
}
}

Expand Down Expand Up @@ -239,21 +259,60 @@ NAN_METHOD(DataChannel::Send) {

DataChannel* self = ObjectWrap::Unwrap<DataChannel>( args.This() );

bool isBinary = false;
rtc::Buffer buffer;

if(args[0]->IsString()) {
v8::Local<v8::String> str = v8::Local<v8::String>::Cast(args[0]);
std::string data = *v8::String::Utf8Value(str);

webrtc::DataBuffer buffer(data);
self->_jingleDataChannel->Send(buffer);
const char *data = *v8::String::Utf8Value(str);

buffer = rtc::Buffer(data, str->Length());
} else {
v8::Local<v8::Object> arraybuffer = v8::Local<v8::Object>::Cast(args[0]);
void* data = arraybuffer->GetIndexedPropertiesExternalArrayData();
uint32_t data_len = arraybuffer->GetIndexedPropertiesExternalArrayDataLength();
isBinary = true;

#if NODE_MINOR_VERSION >= 12
if (args[0]->IsArrayBuffer() || args[0]->IsTypedArray()) {
v8::Local<v8::ArrayBuffer> arraybuffer;

if (args[0]->IsArrayBuffer()) {
arraybuffer = v8::Local<v8::ArrayBuffer>::Cast(args[0]);
} else {
v8::Local<v8::ArrayBufferView> view = v8::Local<v8::ArrayBufferView>::Cast(args[0]);
arraybuffer = view->Buffer();
}

if (!arraybuffer.IsEmpty()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should allow sending an empty buffer, no?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will send empty buffers.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this is a test on the handle. Ignore.

v8::ArrayBuffer::Contents content = arraybuffer->Externalize();

rtc::Buffer buffer(data, data_len);
webrtc::DataBuffer data_buffer(buffer, true);
self->_jingleDataChannel->Send(data_buffer);
buffer = rtc::Buffer(content.Data(), content.ByteLength());

arraybuffer->Neuter();
delete[] static_cast<char*>(content.Data());
} else {
NanThrowTypeError("Invalid argument");
}
} else {
NanThrowTypeError("Invalid argument");
}
#else
if (args[0]->IsObject()) {
v8::Local<v8::Object> arraybuffer = v8::Local<v8::Object>::Cast(args[0]);
void* data = arraybuffer->GetIndexedPropertiesExternalArrayData();
int data_len = arraybuffer->GetIndexedPropertiesExternalArrayDataLength();

if (data_len >= 0) {
buffer = rtc::Buffer(data, data_len);
} else {
NanThrowTypeError("Invalid object argument. WRTC only supports ArrayBufferView");
}
} else {
NanThrowTypeError("Invalid argument");
}
#endif
}

webrtc::DataBuffer data_buffer(buffer, isBinary);
self->_jingleDataChannel->Send(data_buffer);

TRACE_END;
NanReturnUndefined();
Expand Down Expand Up @@ -316,6 +375,18 @@ NAN_GETTER(DataChannel::GetBinaryType) {
NanReturnValue(NanNew<v8::Number>(static_cast<uint32_t>(self->_binaryType)));
}

NAN_GETTER(DataChannel::BufferedAmount) {
TRACE_CALL;
NanScope();

DataChannel* self = node::ObjectWrap::Unwrap<DataChannel>( args.Holder() );

uint64_t bufferSize = self->_jingleDataChannel->buffered_amount();

TRACE_END;
NanReturnValue(NanNew<v8::Number>(static_cast<uint32_t>(bufferSize)));
}

NAN_SETTER(DataChannel::SetBinaryType) {
TRACE_CALL;

Expand Down Expand Up @@ -344,6 +415,7 @@ void DataChannel::Init( v8::Handle<v8::Object> exports ) {
tpl->InstanceTemplate()->SetAccessor(NanNew("label"), GetLabel, ReadOnly);
tpl->InstanceTemplate()->SetAccessor(NanNew("binaryType"), GetBinaryType, SetBinaryType);
tpl->InstanceTemplate()->SetAccessor(NanNew("readyState"), GetReadyState, ReadOnly);
tpl->InstanceTemplate()->SetAccessor(NanNew("bufferedAmount"), BufferedAmount, ReadOnly);

NanAssignPersistent(constructor, tpl->GetFunction());
exports->Set( NanNew("DataChannel"), tpl->GetFunction() );
Expand Down
7 changes: 4 additions & 3 deletions src/datachannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ class DataChannel
};

enum AsyncEventType {
MESSAGE = 0x1 << 0, // 1
ERROR = 0x1 << 1, // 2
STATE = 0x1 << 2, // 4
MESSAGE = 1,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why were these changed?

ERROR,
STATE,
};

enum BinaryType {
Expand Down Expand Up @@ -95,6 +95,7 @@ class DataChannel
static NAN_GETTER(GetBinaryType);
static NAN_GETTER(GetReadyState);
static NAN_SETTER(SetBinaryType);
static NAN_GETTER(BufferedAmount);
static NAN_SETTER(ReadOnly);

void QueueEvent(DataChannel::AsyncEventType type, void* data);
Expand Down
Loading