From 33295b84963c4b262eac4ea1a13f103d1cb902bc Mon Sep 17 00:00:00 2001 From: Chinmay Soman Date: Wed, 23 Oct 2013 17:39:46 -0700 Subject: [PATCH] Fixing NPE in BaseStreamingClient.java related to Faust streaming --- .../protocol/admin/BaseStreamingClient.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/java/voldemort/client/protocol/admin/BaseStreamingClient.java b/src/java/voldemort/client/protocol/admin/BaseStreamingClient.java index aaaa48aa13..0669433e8b 100644 --- a/src/java/voldemort/client/protocol/admin/BaseStreamingClient.java +++ b/src/java/voldemort/client/protocol/admin/BaseStreamingClient.java @@ -105,7 +105,7 @@ public class BaseStreamingClient { private AdminClientConfig adminClientConfig; String bootstrapURL; - + boolean overWriteIfLatestTs; // Data structures for the streaming maps from Pair to @@ -136,10 +136,12 @@ public BaseStreamingClient(StreamingClientConfig config) { CHECKPOINT_COMMIT_SIZE = config.getBatchSize(); THROTTLE_QPS = config.getThrottleQPS(); this.overWriteIfLatestTs = config.isOverWriteIfLatestTs(); - + adminClientConfig = new AdminClientConfig(); adminClient = new AdminClient(bootstrapURL, adminClientConfig, new ClientConfig()); faultyNodes = new ArrayList(); + storeNames = new ArrayList(); + nodesToStream = new ArrayList(); } public AdminClient getAdminClient() { @@ -316,13 +318,11 @@ public synchronized void initStreamingSessions(List stores, entriesProcessed = 0; newBatch = true; isMultiSession = true; - storeNames = new ArrayList(); this.throttler = new EventThrottler(THROTTLE_QPS); TimeUnit unit = TimeUnit.SECONDS; Collection nodesInCluster = adminClient.getAdminClientCluster().getNodes(); - nodesToStream = new ArrayList(); if(blackListedNodes != null && blackListedNodes.size() > 0) { @@ -517,9 +517,8 @@ public synchronized void streamingPut(ByteArray key, Versioned value, St .setVersioned(ProtoUtils.encodeVersioned(value)) .build(); - VAdminProto.UpdatePartitionEntriesRequest.Builder updateRequest = null; - + if(overWriteIfLatestTs) { updateRequest = VAdminProto.UpdatePartitionEntriesRequest.newBuilder() .setStore(storeName) @@ -530,7 +529,7 @@ public synchronized void streamingPut(ByteArray key, Versioned value, St .setStore(storeName) .setPartitionEntry(partitionEntry); } - + DataOutputStream outputStream = nodeIdStoreToOutputStreamRequest.get(new Pair(storeName, node.getId())); try { @@ -656,6 +655,13 @@ private void commitToVoldemort(List storeNamesToCommit) { } boolean hasError = false; + if(nodesToStream == null) { + if(logger.isDebugEnabled()) { + logger.debug("No nodes to stream to. Returning."); + } + return; + } + for(Node node: nodesToStream) { for(String store: storeNamesToCommit) {