Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Add message framing

  • Loading branch information...
commit 9fa807680232ee69d894610ceafe542522336afe 1 parent 9798fd4
Erik Rigtorp authored November 15, 2010
2  Makefile
@@ -8,5 +8,5 @@ all: $(TARGETS)
8 8
 $(TARGETS): include/nmq.hpp
9 9
 
10 10
 clean:
11  
-	rm -f *~
  11
+	rm -f *~ core
12 12
 	rm -f $(TARGETS)
5  TODO
@@ -2,5 +2,6 @@
2 2
 * Monitoring tool
3 3
   * Monitor message throughput
4 4
   * Ping nodes to check if they are running
5  
-* Message framing, ie send message length
6  
-* Unit tests
  5
+* Unit tests
  6
+* Maybe allow zero length messages
  7
+* Magic and versioning of mmap file
52  include/nmq.hpp
@@ -45,7 +45,7 @@ class context_t {
45 45
     unsigned int nodes;
46 46
     unsigned int rings;
47 47
     unsigned int size;
48  
-    unsigned int msg_size;
  48
+    size_t msg_size;
49 49
   };
50 50
   
51 51
   typedef volatile unsigned int vo_uint; 
@@ -54,8 +54,8 @@ class context_t {
54 54
   struct ring {
55 55
     
56 56
     unsigned int _size;
57  
-    unsigned int _msg_size;
58  
-    unsigned int _offset;
  57
+    size_t _msg_size;
  58
+    size_t _offset;
59 59
     
60 60
     // R/W access by the reader
61 61
     // R/O access by the writer
@@ -97,7 +97,7 @@ class context_t {
97 97
     header_->nodes = nodes;
98 98
     header_->rings = n_rings;
99 99
     header_->size = real_size - 1;
100  
-    header_->msg_size = msg_size;
  100
+    header_->msg_size = msg_size + sizeof(size_t);
101 101
 
102 102
     for (unsigned int i = 0; i < header_->rings; i++) {
103 103
       ring_[i]._size = real_size - 1;
@@ -133,7 +133,7 @@ class context_t {
133 133
   }
134 134
   
135 135
   void print() {
136  
-    printf("nodes: %u, size: %u, msgsz: %u\n", header_->nodes, header_->size, header_->msg_size);
  136
+    printf("nodes: %u, size: %u, msgsz: %lu\n", header_->nodes, header_->size, header_->msg_size);
137 137
     for (unsigned int i = 0; i < header_->rings; i++) {
138 138
       printf("%3i: %10u %10u\n", i, ring_[i]._head, ring_[i]._tail);
139 139
     }
@@ -167,7 +167,7 @@ class context_t {
167 167
   }
168 168
 
169 169
   bool send(ring *ring, const void *msg, size_t size) {
170  
-    assert(size <= ring->_msg_size);
  170
+    assert(size <= (ring->_msg_size - sizeof(size_t)));
171 171
 
172 172
     unsigned int h = (ring->_head - 1) & ring->_size;
173 173
     unsigned int t = ring->_tail;
@@ -175,7 +175,8 @@ class context_t {
175 175
       return false;
176 176
 
177 177
     char *d = &data_[ring_->_offset + t*ring->_msg_size];
178  
-    memcpy(d, msg, size);
  178
+    memcpy(d, &size, sizeof(size));
  179
+    memcpy(d + sizeof(size), msg, size);
179 180
     
180 181
     // Barrier is needed to make sure that item is updated 
181 182
     // before it's made available to the reader
@@ -196,14 +197,19 @@ class context_t {
196 197
     return send(ring, msg, size);
197 198
   }
198 199
 
199  
-  bool recv(ring *ring, void *msg, size_t size) {
  200
+  bool recv(ring *ring, void *msg, size_t *size) {
200 201
     unsigned int t = ring->_tail;
201 202
     unsigned int h = ring->_head;
202 203
     if (h == t)
203 204
       return false;
  205
+    
  206
+    char *d = &data_[ring_->_offset + h*ring->_msg_size];
204 207
 
205  
-    void *d = &data_[ring_->_offset + h*ring->_msg_size];
206  
-    memcpy(msg, d, size);
  208
+    size_t recv_size;
  209
+    memcpy(&recv_size, d, sizeof(size_t));
  210
+    assert(recv_size >= *size && "buffer too small");
  211
+    *size = recv_size;
  212
+    memcpy(msg, d + sizeof(size_t), recv_size);
207 213
 
208 214
     // Barrier is needed to make sure that we finished reading the item
209 215
     // before moving the head
@@ -213,37 +219,35 @@ class context_t {
213 219
     return true;
214 220
   }
215 221
 
216  
-  bool recv(unsigned int from, unsigned int to, void *msg, size_t size) {
  222
+  bool recv(unsigned int from, unsigned int to, void *msg, size_t *size) {
217 223
     ring *ring = get_ring(from, to);
218 224
     while (!recv(ring, msg, size)) bones::cpu::__relax();
219 225
     return true;
220 226
   }
221 227
 
222  
-  bool recvnb(unsigned int from, unsigned int to, void *s, int size) {
  228
+  bool recvnb(unsigned int from, unsigned int to, void *s, size_t *size) {
223 229
     return recv(get_ring(from, to), s, size);
224 230
   }
225 231
   
226  
-  bool recv(unsigned int to, void *msg, size_t size) {
  232
+  bool recv(unsigned int to, void *msg, size_t *size) {
227 233
     // TODO "fair" receiving
228 234
     while (true) {
229 235
       for (unsigned int i = 0; i < header_->nodes; i++) {
230  
-        if (to != i && recvnb(i, to, msg, size) != -1) return true;
  236
+        if (to != i && recvnb(i, to, msg, size)) return true;
231 237
       }
232 238
       bones::cpu::__relax();
233 239
     }
234 240
     return false;
235 241
   }
236 242
 
237  
-  bool recvnb(unsigned int to, void *msg, size_t size) {
  243
+  ssize_t recvnb(unsigned int to, void *msg, size_t *size) {
238 244
     // TODO "fair" receiving
239 245
     for (unsigned int i = 0; i < header_->nodes; i++) {
240  
-      if (to != i && recvnb(i, to, msg, size) != -1) return true;
  246
+      if (to != i && recvnb(i, to, msg, size)) return true;
241 247
     }
242 248
     return false;
243 249
   }
244 250
 
245  
-
246  
-
247 251
   std::string fname_;
248 252
   void *p_;
249 253
   header *header_;
@@ -267,27 +271,27 @@ class node_t {
267 271
     //assert(node < context_->nodes());
268 272
   }
269 273
 
270  
-  bool send(unsigned int to, const void *msg, int size) {
  274
+  bool send(unsigned int to, const void *msg, size_t size) {
271 275
     return context_->send(node_, to, msg, size);
272 276
   }
273 277
 
274  
-  bool sendnb(unsigned int to, const void *msg, int size) {
  278
+  bool sendnb(unsigned int to, const void *msg, size_t size) {
275 279
     return context_->sendnb(node_, to, msg, size);
276 280
   }
277 281
 
278  
-  bool recv(unsigned int from, void *msg, int size) {
  282
+  bool recv(unsigned int from, void *msg, size_t *size) {
279 283
     return context_->recv(from, node_, msg, size);
280 284
   }
281 285
 
282  
-  bool recvnb(unsigned int from, void *msg, int size) {
  286
+  bool recvnb(unsigned int from, void *msg, size_t *size) {
283 287
     return context_->recvnb(from, node_, msg, size);
284 288
   }
285 289
 
286  
-  bool recv(void *msg, int size) {
  290
+  bool recv(void *msg, size_t *size) {
287 291
     return context_->recv(node_, msg, size);
288 292
   }
289 293
 
290  
-  bool recvnb(void *msg, int size) {
  294
+  bool recvnb(void *msg, size_t *size) {
291 295
     return context_->recvnb(node_, msg, size);
292 296
   }
293 297
 
4  local_lat.cpp
@@ -31,7 +31,7 @@ int main(int argc, char* argv[]) {
31 31
     return 1;
32 32
   }
33 33
   const char* queue = argv[1];
34  
-  int message_size = atoi(argv [2]);
  34
+  size_t message_size = atoi(argv [2]);
35 35
   int roundtrip_count = atoi(argv [3]);
36 36
 
37 37
   nmq::context_t context(queue);
@@ -49,7 +49,7 @@ int main(int argc, char* argv[]) {
49 49
   
50 50
   nmq::node_t node(context, 0);
51 51
   for (int i = 0; i != roundtrip_count; i++) {
52  
-    node.recv(1, s, message_size);
  52
+    node.recv(1, s, &message_size);
53 53
     node.send(1, s, message_size);
54 54
   }
55 55
 }
4  local_thr.cpp
@@ -31,7 +31,7 @@ int main(int argc, char* argv[]) {
31 31
     return 1;
32 32
   }
33 33
   const char* queue = argv[1];
34  
-  int message_size = atoi(argv [2]);
  34
+  size_t message_size = atoi(argv [2]);
35 35
   int roundtrip_count = atoi(argv [3]);
36 36
 
37 37
   nmq::context_t context(queue);
@@ -49,6 +49,6 @@ int main(int argc, char* argv[]) {
49 49
   
50 50
   nmq::node_t node(context, 0);
51 51
   for (int i = 0; i != roundtrip_count; i++) {
52  
-    node.recv(1, s, message_size);
  52
+    node.recv(1, s, &message_size);
53 53
   }
54 54
 }
4  remote_lat.cpp
@@ -31,7 +31,7 @@ int main(int argc, char* argv[]) {
31 31
     return 1;
32 32
   }
33 33
   const char* queue = argv[1];
34  
-  int message_size = atoi(argv [2]);
  34
+  size_t message_size = atoi(argv [2]);
35 35
   int roundtrip_count = atoi(argv [3]);
36 36
 
37 37
   nmq::context_t context(queue);
@@ -52,7 +52,7 @@ int main(int argc, char* argv[]) {
52 52
   gettimeofday(&start, NULL);
53 53
   for (int i = 0; i != roundtrip_count; i++) {
54 54
     node.send(0, s, message_size);
55  
-    node.recv(0, s, message_size);
  55
+    node.recv(0, s, &message_size);
56 56
   }
57 57
   gettimeofday(&stop, NULL);
58 58
   long delta = (stop.tv_sec - start.tv_sec)*1000000 + (stop.tv_usec - start.tv_usec);
2  remote_thr.cpp
@@ -31,7 +31,7 @@ int main(int argc, char* argv[]) {
31 31
     return 1;
32 32
   }
33 33
   const char* queue = argv[1];
34  
-  int message_size = atoi(argv [2]);
  34
+  size_t message_size = atoi(argv [2]);
35 35
   long roundtrip_count = atoi(argv [3]);
36 36
 
37 37
   nmq::context_t context(queue);

0 notes on commit 9fa8076

Please sign in to comment.
Something went wrong with that request. Please try again.