Skip to content
This repository

Changed the stream closing API to return a promise for the total length. #263

Closed
wants to merge 3 commits into from

2 participants

Mark S. Miller Kris Kowal
Mark S. Miller

The old closing protocol had a race condition. The stream appeared to be closed sometime after close was called, whether or not all the pre-close elements had already been gotten. This changes the indication of closedness to be a promise for the overall length of the queue. This enables clients to do some flow control, bounding the extent of their read-ahead, if they wish.

Kris Kowal
Owner

We are still working on this one. Not ready for merge.

Kris Kowal
Owner

Fixes #261

Kris Kowal
Owner

This has been obsoleted by #337. The central notion is that close and closed will no longer be necessary. Instead Queue will merely be an asynchronous transport for iterations as in ES6 generators. The iterations will carry information like {value} or {value, done: true}, where the latter implies that the stream has closed.

Kris Kowal kriskowal closed this July 02, 2013
Domenic Denicola domenic referenced this pull request July 09, 2013
Closed

Race condition in Queue #261

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
27  queue.js
@@ -4,7 +4,9 @@ var Q = require("./q");
4 4
 module.exports = Queue;
5 5
 function Queue() {
6 6
     var ends = Q.defer();
7  
-    var closed = Q.defer();
  7
+    var numPut = 0;
  8
+    var numGotten = 0;
  9
+    var total = Q.defer();
8 10
     return {
9 11
         put: function (value) {
10 12
             var next = Q.defer();
@@ -13,23 +15,22 @@ function Queue() {
13 15
                 tail: next.promise
14 16
             });
15 17
             ends.resolve = next.resolve;
  18
+            numPut++;
16 19
         },
17 20
         get: function () {
18 21
             var result = ends.promise.get("head");
19 22
             ends.promise = ends.promise.get("tail");
20  
-            return result.fail(function (error) {
21  
-                closed.resolve(error);
22  
-                throw error;
23  
-            });
  23
+            numGotten++;
  24
+            return result;
  25
+        },
  26
+        close: function (reason) {
  27
+            ends.reject(reason);
  28
+            total.resolve(numPut);
24 29
         },
25  
-        closed: closed.promise,
26  
-        close: function (error) {
27  
-            error = error || new Error("Can't get value from closed queue");
28  
-            var end = {head: Q.reject(error)};
29  
-            end.tail = end;
30  
-            ends.resolve(end);
31  
-            return closed.promise;
  30
+        getLength: function() {
  31
+            return total.promise.then(function(tot) {
  32
+                return tot - numGotten;
  33
+            });
32 34
         }
33 35
     };
34 36
 }
35  
-
40  spec/queue-spec.js
@@ -116,7 +116,7 @@ describe("queue", function () {
116 116
             })
117 117
         })
118 118
         .then(function () {
119  
-            queue.close();
  119
+            queue.close(new Error("foo"));
120 120
         })
121 121
         .done();
122 122
 
@@ -127,6 +127,11 @@ describe("queue", function () {
127 127
             });
128 128
         })
129 129
         .then(function () {
  130
+            return queue.getLength().then(function(len) {
  131
+                expect(len).toBe(1);
  132
+            });
  133
+        })
  134
+        .then(function () {
130 135
             return queue.get()
131 136
             .then(function (value) {
132 137
                 expect(value).toBe(2);
@@ -142,39 +147,16 @@ describe("queue", function () {
142 147
             expect(error.message).toBe("Can't get value from closed queue");
143 148
             return queue.get();
144 149
         })
145  
-        .catch(function (error) {
146  
-            expect(error.message).toBe("Can't get value from closed queue");
147  
-        })
148 150
         .then(function () {
149  
-            return queue.closed;
150  
-        })
151  
-        .then(function (error) {
152  
-            expect(error.message).toBe("Can't get value from closed queue");
153  
-        })
154  
-    });
155  
-
156  
-    it("should close with alternate error", function () {
157  
-
158  
-        var queue = Queue();
159  
-        queue.close(new Error("Alternate reason"));
160  
-
161  
-        return Q.try(function () {
162  
-            return queue.get();
163  
-        })
164  
-        .catch(function (error) {
165  
-            expect(error.message).toBe("Alternate reason");
166  
-            return queue.get();
  151
+            return queue.getLength().then(function(len) {
  152
+                expect(len).toBe(0);
  153
+            });
167 154
         })
168 155
         .catch(function (error) {
169  
-            expect(error.message).toBe("Alternate reason");
170  
-        })
171  
-        .then(function () {
172  
-            return queue.closed;
  156
+            expect(error.message).toBe("Can't get value from closed queue");
173 157
         })
174 158
         .then(function (error) {
175  
-            expect(error.message).toBe("Alternate reason");
  159
+            expect(error.message).toBe("Can't get value from closed queue");
176 160
         })
177 161
     });
178  
-
179 162
 });
180  
-
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.