Skip to content

Commit

Permalink
Fixing NPE in BaseStreamingClient.java related to Faust streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
Chinmay Soman committed Oct 24, 2013
1 parent e8f76ee commit 33295b8
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions src/java/voldemort/client/protocol/admin/BaseStreamingClient.java
Expand Up @@ -105,7 +105,7 @@ public class BaseStreamingClient {
private AdminClientConfig adminClientConfig;

String bootstrapURL;

boolean overWriteIfLatestTs;

// Data structures for the streaming maps from Pair<Store, Node Id> to
Expand Down Expand Up @@ -136,10 +136,12 @@ public BaseStreamingClient(StreamingClientConfig config) {
CHECKPOINT_COMMIT_SIZE = config.getBatchSize();
THROTTLE_QPS = config.getThrottleQPS();
this.overWriteIfLatestTs = config.isOverWriteIfLatestTs();

adminClientConfig = new AdminClientConfig();
adminClient = new AdminClient(bootstrapURL, adminClientConfig, new ClientConfig());
faultyNodes = new ArrayList<Integer>();
storeNames = new ArrayList<String>();
nodesToStream = new ArrayList<Node>();
}

public AdminClient getAdminClient() {
Expand Down Expand Up @@ -316,13 +318,11 @@ public synchronized void initStreamingSessions(List<String> stores,
entriesProcessed = 0;
newBatch = true;
isMultiSession = true;
storeNames = new ArrayList();
this.throttler = new EventThrottler(THROTTLE_QPS);

TimeUnit unit = TimeUnit.SECONDS;

Collection<Node> nodesInCluster = adminClient.getAdminClientCluster().getNodes();
nodesToStream = new ArrayList();

if(blackListedNodes != null && blackListedNodes.size() > 0) {

Expand Down Expand Up @@ -517,9 +517,8 @@ public synchronized void streamingPut(ByteArray key, Versioned<byte[]> value, St
.setVersioned(ProtoUtils.encodeVersioned(value))
.build();


VAdminProto.UpdatePartitionEntriesRequest.Builder updateRequest = null;

if(overWriteIfLatestTs) {
updateRequest = VAdminProto.UpdatePartitionEntriesRequest.newBuilder()
.setStore(storeName)
Expand All @@ -530,7 +529,7 @@ public synchronized void streamingPut(ByteArray key, Versioned<byte[]> value, St
.setStore(storeName)
.setPartitionEntry(partitionEntry);
}

DataOutputStream outputStream = nodeIdStoreToOutputStreamRequest.get(new Pair(storeName,
node.getId()));
try {
Expand Down Expand Up @@ -656,6 +655,13 @@ private void commitToVoldemort(List<String> storeNamesToCommit) {
}

boolean hasError = false;
if(nodesToStream == null) {
if(logger.isDebugEnabled()) {
logger.debug("No nodes to stream to. Returning.");
}
return;
}

for(Node node: nodesToStream) {

for(String store: storeNamesToCommit) {
Expand Down

0 comments on commit 33295b8

Please sign in to comment.