Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Newer
Older
100644 335 lines (237 sloc) 7.642 kb
1ae1489 @ry Abstract StreamWrap from TCPWrap
ry authored
1 #include <node.h>
2 #include <node_buffer.h>
0c7bf81 @ry Abstract out HandleWrap class
ry authored
3 #include <handle_wrap.h>
1ae1489 @ry Abstract StreamWrap from TCPWrap
ry authored
4 #include <stream_wrap.h>
5 #include <req_wrap.h>
6
7
8 namespace node {
9
10
11 #define SLAB_SIZE (1024 * 1024)
12 #define MIN(a, b) ((a) < (b) ? (a) : (b))
13
14
15 using v8::Object;
16 using v8::Handle;
17 using v8::Local;
18 using v8::Persistent;
19 using v8::Value;
20 using v8::HandleScope;
21 using v8::FunctionTemplate;
22 using v8::String;
23 using v8::Function;
24 using v8::TryCatch;
25 using v8::Context;
26 using v8::Arguments;
27 using v8::Integer;
28
29
30 #define UNWRAP \
31 assert(!args.Holder().IsEmpty()); \
32 assert(args.Holder()->InternalFieldCount() > 0); \
33 StreamWrap* wrap = \
34 static_cast<StreamWrap*>(args.Holder()->GetPointerFromInternalField(0)); \
35 if (!wrap) { \
36 SetErrno(UV_EBADF); \
37 return scope.Close(Integer::New(-1)); \
38 }
39
40
41 typedef class ReqWrap<uv_shutdown_t> ShutdownWrap;
42 typedef class ReqWrap<uv_write_t> WriteWrap;
43
44
45 static size_t slab_used;
46 static uv_stream_t* handle_that_last_alloced;
0c7bf81 @ry Abstract out HandleWrap class
ry authored
47 static Persistent<String> slab_sym;
48 static Persistent<String> buffer_sym;
49 static Persistent<String> write_queue_size_sym;
50 static bool initialized;
1ae1489 @ry Abstract StreamWrap from TCPWrap
ry authored
51
52
53 void StreamWrap::Initialize(Handle<Object> target) {
54 if (initialized) {
55 return;
56 } else {
57 initialized = true;
58 }
59
60 HandleScope scope;
61
0c7bf81 @ry Abstract out HandleWrap class
ry authored
62 HandleWrap::Initialize(target);
63
1ae1489 @ry Abstract StreamWrap from TCPWrap
ry authored
64 slab_sym = Persistent<String>::New(String::NewSymbol("slab"));
65 buffer_sym = Persistent<String>::New(String::NewSymbol("buffer"));
66 write_queue_size_sym =
67 Persistent<String>::New(String::NewSymbol("writeQueueSize"));
68 }
69
70
0c7bf81 @ry Abstract out HandleWrap class
ry authored
71 StreamWrap::StreamWrap(Handle<Object> object, uv_stream_t* stream)
72 : HandleWrap(object, (uv_handle_t*)stream) {
1ae1489 @ry Abstract StreamWrap from TCPWrap
ry authored
73 stream_ = stream;
187fe27 stdio binding + javascript to enable process.stdin.listen()
Igor Zinkovsky authored
74 if (stream) {
75 stream->data = this;
76 }
77 }
78
79
80 void StreamWrap::SetHandle(uv_handle_t* h) {
81 HandleWrap::SetHandle(h);
82 stream_ = (uv_stream_t*)h;
83 stream_->data = this;
1ae1489 @ry Abstract StreamWrap from TCPWrap
ry authored
84 }
85
86
87 void StreamWrap::UpdateWriteQueueSize() {
6050af4 @ry net_uv: properly initialize writeQueueSize
ry authored
88 HandleScope scope;
1ae1489 @ry Abstract StreamWrap from TCPWrap
ry authored
89 object_->Set(write_queue_size_sym, Integer::New(stream_->write_queue_size));
90 }
91
92
93 Handle<Value> StreamWrap::ReadStart(const Arguments& args) {
94 HandleScope scope;
95
96 UNWRAP
97
98 int r = uv_read_start(wrap->stream_, OnAlloc, OnRead);
99
100 // Error starting the tcp.
101 if (r) SetErrno(uv_last_error().code);
102
103 return scope.Close(Integer::New(r));
104 }
105
106
107 Handle<Value> StreamWrap::ReadStop(const Arguments& args) {
108 HandleScope scope;
109
110 UNWRAP
111
112 int r = uv_read_stop(wrap->stream_);
113
114 // Error starting the tcp.
115 if (r) SetErrno(uv_last_error().code);
116
117 return scope.Close(Integer::New(r));
118 }
119
120
121 inline char* StreamWrap::NewSlab(Handle<Object> global,
122 Handle<Object> wrap_obj) {
123 Buffer* b = Buffer::New(SLAB_SIZE);
124 global->SetHiddenValue(slab_sym, b->handle_);
125 assert(Buffer::Length(b) == SLAB_SIZE);
126 slab_used = 0;
127 wrap_obj->SetHiddenValue(slab_sym, b->handle_);
128 return Buffer::Data(b);
129 }
130
131
132 uv_buf_t StreamWrap::OnAlloc(uv_stream_t* handle, size_t suggested_size) {
133 HandleScope scope;
134
135 StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
136 assert(wrap->stream_ == handle);
137
138 char* slab = NULL;
139
140 Handle<Object> global = Context::GetCurrent()->Global();
141 Local<Value> slab_v = global->GetHiddenValue(slab_sym);
142
143 if (slab_v.IsEmpty()) {
144 // No slab currently. Create a new one.
145 slab = NewSlab(global, wrap->object_);
146 } else {
147 // Use existing slab.
148 Local<Object> slab_obj = slab_v->ToObject();
149 slab = Buffer::Data(slab_obj);
150 assert(Buffer::Length(slab_obj) == SLAB_SIZE);
151 assert(SLAB_SIZE >= slab_used);
152
153 // If less than 64kb is remaining on the slab allocate a new one.
154 if (SLAB_SIZE - slab_used < 64 * 1024) {
155 slab = NewSlab(global, wrap->object_);
156 } else {
157 wrap->object_->SetHiddenValue(slab_sym, slab_obj);
158 }
159 }
160
161 uv_buf_t buf;
162 buf.base = slab + slab_used;
163 buf.len = MIN(SLAB_SIZE - slab_used, suggested_size);
164
165 wrap->slab_offset_ = slab_used;
166 slab_used += buf.len;
167
168 handle_that_last_alloced = handle;
169
170 return buf;
171 }
172
173 void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
174 HandleScope scope;
175
176 StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
177
178 // We should not be getting this callback if someone as already called
179 // uv_close() on the handle.
180 assert(wrap->object_.IsEmpty() == false);
181
182 // Remove the reference to the slab to avoid memory leaks;
183 Local<Value> slab_v = wrap->object_->GetHiddenValue(slab_sym);
184 wrap->object_->SetHiddenValue(slab_sym, v8::Null());
185
186 if (nread < 0) {
187 // EOF or Error
188 if (handle_that_last_alloced == handle) {
189 slab_used -= buf.len;
190 }
191
192 SetErrno(uv_last_error().code);
193 MakeCallback(wrap->object_, "onread", 0, NULL);
194 return;
195 }
196
197 assert(nread <= buf.len);
198
199 if (handle_that_last_alloced == handle) {
200 slab_used -= (buf.len - nread);
201 }
202
203 if (nread > 0) {
204 Local<Value> argv[3] = {
205 slab_v,
206 Integer::New(wrap->slab_offset_),
207 Integer::New(nread)
208 };
209 MakeCallback(wrap->object_, "onread", 3, argv);
210 }
211 }
212
213
214 Handle<Value> StreamWrap::Write(const Arguments& args) {
215 HandleScope scope;
216
217 UNWRAP
218
219 // The first argument is a buffer.
220 assert(Buffer::HasInstance(args[0]));
221 Local<Object> buffer_obj = args[0]->ToObject();
222
223 size_t offset = 0;
224 size_t length = Buffer::Length(buffer_obj);
225
226 if (args.Length() > 1) {
227 offset = args[1]->IntegerValue();
228 }
229
230 if (args.Length() > 2) {
231 length = args[2]->IntegerValue();
232 }
233
234 WriteWrap* req_wrap = new WriteWrap();
235
236 req_wrap->object_->SetHiddenValue(buffer_sym, buffer_obj);
237
238 uv_buf_t buf;
239 buf.base = Buffer::Data(buffer_obj) + offset;
240 buf.len = length;
241
242 int r = uv_write(&req_wrap->req_, wrap->stream_, &buf, 1, StreamWrap::AfterWrite);
243
244 req_wrap->Dispatched();
245
246 wrap->UpdateWriteQueueSize();
247
248 if (r) {
249 SetErrno(uv_last_error().code);
250 delete req_wrap;
251 return scope.Close(v8::Null());
252 } else {
253 return scope.Close(req_wrap->object_);
254 }
255 }
256
257
258 void StreamWrap::AfterWrite(uv_write_t* req, int status) {
259 WriteWrap* req_wrap = (WriteWrap*) req->data;
260 StreamWrap* wrap = (StreamWrap*) req->handle->data;
261
262 HandleScope scope;
263
264 // The wrap and request objects should still be there.
265 assert(req_wrap->object_.IsEmpty() == false);
266 assert(wrap->object_.IsEmpty() == false);
267
268 if (status) {
269 SetErrno(uv_last_error().code);
270 }
271
272 wrap->UpdateWriteQueueSize();
273
274 Local<Value> argv[4] = {
275 Integer::New(status),
276 Local<Value>::New(wrap->object_),
277 Local<Value>::New(req_wrap->object_),
278 req_wrap->object_->GetHiddenValue(buffer_sym),
279 };
280
281 MakeCallback(req_wrap->object_, "oncomplete", 4, argv);
282
283 delete req_wrap;
284 }
285
286
287 Handle<Value> StreamWrap::Shutdown(const Arguments& args) {
288 HandleScope scope;
289
290 UNWRAP
291
292 ShutdownWrap* req_wrap = new ShutdownWrap();
293
294 int r = uv_shutdown(&req_wrap->req_, wrap->stream_, AfterShutdown);
295
296 req_wrap->Dispatched();
297
298 if (r) {
299 SetErrno(uv_last_error().code);
300 delete req_wrap;
301 return scope.Close(v8::Null());
302 } else {
303 return scope.Close(req_wrap->object_);
304 }
305 }
306
307
308 void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) {
309 ReqWrap<uv_shutdown_t>* req_wrap = (ReqWrap<uv_shutdown_t>*) req->data;
310 StreamWrap* wrap = (StreamWrap*) req->handle->data;
311
312 // The wrap and request objects should still be there.
313 assert(req_wrap->object_.IsEmpty() == false);
314 assert(wrap->object_.IsEmpty() == false);
315
316 HandleScope scope;
317
318 if (status) {
319 SetErrno(uv_last_error().code);
320 }
321
322 Local<Value> argv[3] = {
323 Integer::New(status),
324 Local<Value>::New(wrap->object_),
325 Local<Value>::New(req_wrap->object_)
326 };
327
328 MakeCallback(req_wrap->object_, "oncomplete", 3, argv);
329
330 delete req_wrap;
331 }
332
333
334 }
Something went wrong with that request. Please try again.