Skip to content
This repository
Browse code

fix for /db and /db/_changes with sub-lounges

  • Loading branch information...
commit 53a02a3f273145356cab1c58f3de9e77ef16adf3 1 parent 05d76f8
Randall Leeds authored November 01, 2010
2  smartproxy/setup.py
@@ -17,7 +17,7 @@
17 17
 
18 18
 py_modules = ["smartproxy.proxy", "smartproxy.fetcher", "smartproxy.reducer", "smartproxy.streaming", "smartproxy.changes", "smartproxy.lrucache"]
19 19
 
20  
-setup( version = '1.3.7',
  20
+setup( version = '1.3.10',
21 21
 	   name = 'lounge-smartproxy',
22 22
 	   author='meebo',
23 23
 	   author_email='shaun@meebo-inc.com',
59  smartproxy/smartproxy/proxy.py
@@ -370,30 +370,32 @@ def cache_pred(x):
370 370
 
371 371
 	def render_changes(self, request):
372 372
 		database = request.path[1:].split('/', 1)[0]
373  
-		shards = self.conf_data.shards(database)
374  
-		rep_lists = self.conf_data.shardmap
  373
+		shards = dict(map(lambda s:
  374
+						  (self.conf_data.get_index_from_shard(s), s),
  375
+						  self.conf_data.shards(database)))
375 376
 
376 377
 		feed = request.args.get('feed',['nofeed'])[-1]
377 378
 		continuous = (feed == 'continuous')
378 379
 
379 380
 		since = None
380  
-		if 'since' in request.args:
  381
+		if 'since' in request.args and request.args['since'][-1] != '0':
381 382
 			since = changes.decode_seq(request.args['since'][-1])
382 383
 			if True in itertools.imap(
383  
-				lambda s, rl: str(s) not in since # missing shard
384  
-				or False not in	itertools.imap(lambda r:
385  
-					str(r) not in since[str(s)],
386  
-					rl), # no recognized replicas
387  
-				itertools.count(), rep_lists):
  384
+				lambda s: str(s) not in since # missing shard
  385
+				or True not in itertools.imap(lambda r:
  386
+					str(r) in since[str(s)],
  387
+					self.conf_data.shardmap[s]), # no recognized replicas
  388
+				shards.iterkeys()):
388 389
 				request.setResponseCode(http.BAD_REQUEST)
389  
-				request.write('{"error":"bad request",' +
390  
-							  '"reason":' +
391  
-							  '"missing shard or bad replicas in since"}')
  390
+				return ('{"error":"bad request",'
  391
+					    '"reason":'
  392
+					    '"missing shard or bad replicas in since"}')
392 393
 			del request.args['since']
393 394
 		else:
394  
-			since = dict(map(lambda n: (str(n),
395  
-						    {str(self.conf_data.shardmap[n][0]): 0}),
396  
-					 xrange(len(shards))))
  395
+			since = dict(
  396
+				map(lambda n: (str(n),
  397
+							   {str(self.conf_data.shardmap[n][0]): 0}),
  398
+					shards))
397 399
 
398 400
 		heartbeat = None
399 401
 		if 'heartbeat' in request.args and continuous:
@@ -760,8 +762,11 @@ def get_db(self, request):
760 762
 
761 763
 		# fold function to reduce the sharded results
762 764
 		def fold_results_fun(acc, result):
763  
-			result, shard_idx = result             #packed by DeferredList
764  
-			result, node_idx, factory = result     #packed by getPageFromAny
  765
+			result, idx = result                   #packed by DeferredList
  766
+			result, rep_id, factory = result       #packed by getPageFromAny
  767
+			shard, rep_idx = rep_id
  768
+			shard_idx = self.conf_data.get_index_from_shard(shard)
  769
+			node_idx = self.conf_data.shardmap[shard_idx][rep_idx]
765 770
 			result = cjson.decode(result)
766 771
 			acc['doc_count'] += result['doc_count']
767 772
 			acc['doc_del_count'] += result['doc_del_count']
@@ -811,20 +816,18 @@ def handle_error(reason):
811 816
 		# construct a DeferredList of the deferred sub-requests
812 817
 		# fetches shard results from any replica of each shard
813 818
 		# if any shard fails completely the whole thing fails fast
814  
-		nodes = self.conf_data.nodelist
815 819
 		deferred = defer.DeferredList(
816 820
 			# map over all the shards and get a deferred that handles fail-over
817  
-			map(lambda s, rl: getPageFromAny(
818  
-					# create the upstream descriptions by mapping over the replica list
819  
-					itertools.imap(lambda r:
820  
-						       (r,    # upstream identifier
821  
-							"http://%s:%s/%s%d" #url
822  
-							% (nodes[r][0], nodes[r][1], db_name, s),
823  
-							[],   # factory args
824  
-							{}),  # factor kwargs
825  
-						       rl)),
826  
-					xrange(len(self.conf_data.shardmap)),
827  
-					self.conf_data.shardmap),
  821
+			map(lambda shard: getPageFromAny(
  822
+				# create the upstream descriptions
  823
+				itertools.imap(
  824
+					lambda rep_idx, rep_uri:
  825
+					((shard, rep_idx),    # upstream identifier
  826
+					 rep_uri,             # url
  827
+					 [],                  # factory args
  828
+					 {}),                 # factor kwargs
  829
+					*zip(*enumerate(self.conf_data.nodes(shard))))),
  830
+				self.conf_data.shards(db_name)),
828 831
 			fireOnOneErrback=1,
829 832
 			consumeErrors=1).addCallbacks(finish_request, handle_error)
830 833
 

0 notes on commit 53a02a3

Please sign in to comment.
Something went wrong with that request. Please try again.