Skip to content

Commit

Permalink
Batch insert into STAGING table...
Browse files Browse the repository at this point in the history
  • Loading branch information
bargenilesh committed Mar 28, 2017
1 parent ccb0a50 commit 974442e
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 0 deletions.
8 changes: 8 additions & 0 deletions modules/assignment/src/main/resources/assignment.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ http.proxy.host:${http.proxy.host}
http.proxy.port:${http.proxy.port}
ruleCache.executor.pool.size:5

#Specify the number of messages to be drained on the event of queue full (executor.max.queue.size).
#Default value is 10_000
drain.queue.by=10000

#Specify the batch size to be used to drain queue
#Default value is 1_000
drain.queue.batch.size=1000

#Flag to enable or disable assignments metadata cache.
metadata.cache.enabled:true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.commons.lang3.tuple.Pair;

import javax.ws.rs.core.StreamingOutput;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -123,6 +124,15 @@ StreamingOutput getAssignmentStream(final Experiment.ID experimentID, final Cont
*/
void pushAssignmentToStaging(String type, String exception, String data);

/**
* Push assignment messages to staging in a BATCH
*
* @param type type of assignment to be staged
* @param exception Exception
* @param data Assignment messages to be pushed to staging
*/
void pushAssignmentsToStaging(String type, String exception, Collection<String> data);

/**
* Increments the bucket assignments counter up by 1 if countUp is true
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,22 @@
*******************************************************************************/
package com.intuit.wasabi.repository.cassandra.accessor;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.mapping.annotations.Accessor;
import com.datastax.driver.mapping.annotations.Query;

import java.util.UUID;

/**
* Accessor interface
*/
@Accessor
public interface StagingAccessor {
@Query("insert into staging(time, type, exep , msg) values(now(), ?, ? , ?)")
ResultSet insertBy(String type, String exception, String message);

@Query("insert into staging(time, type, exep , msg) values(?, ?, ? , ?)")
BoundStatement batchInsertBy(UUID time, String type, String exception, String message);

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import com.intuit.wasabi.repository.cassandra.accessor.index.PageExperimentIndexAccessor;
import com.intuit.wasabi.repository.cassandra.pojo.export.UserAssignmentExport;
import com.intuit.wasabi.repository.cassandra.pojo.index.ExperimentUserByUserIdContextAppNameExperimentId;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
Expand All @@ -82,6 +83,7 @@
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -681,6 +683,25 @@ public void pushAssignmentToStaging(String type, String exception, String data)
}
}

/**
* {@inheritDoc}
*/
public void pushAssignmentsToStaging(String type, String exception, Collection<String> data) {
try {
Session session = driver.getSession();
final BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.UNLOGGED);
final UUID timeUUID = UUIDGen.getTimeUUID();
data.forEach(message -> {
LOGGER.debug("message={}", message);
batchStatement.add(stagingAccessor.batchInsertBy(timeUUID, type, exception, message));
});
session.execute(batchStatement);
LOGGER.debug("Finished pushAssignmentsToStaging");
} catch (Exception e) {
LOGGER.error("Error occurred while pushAssignmentsToStaging", e);
}
}

@Override
public void updateBucketAssignmentCount(Experiment experiment, Assignment assignment, boolean countUp) {
Optional<Bucket.Label> labelOptional = Optional.ofNullable(assignment.getBucketLabel());
Expand Down

0 comments on commit 974442e

Please sign in to comment.