diff --git a/modules/assignment/src/main/resources/assignment.properties b/modules/assignment/src/main/resources/assignment.properties index e1d156044..57e84b23a 100644 --- a/modules/assignment/src/main/resources/assignment.properties +++ b/modules/assignment/src/main/resources/assignment.properties @@ -27,6 +27,14 @@ http.proxy.host:${http.proxy.host} http.proxy.port:${http.proxy.port} ruleCache.executor.pool.size:5 +#Specify the number of messages to be drained on the event of queue full (executor.max.queue.size). +#Default value is 10_000 +drain.queue.by=10000 + +#Specify the batch size to be used to drain queue +#Default value is 1_000 +drain.queue.batch.size=1000 + #Flag to enable or disable assignments metadata cache. metadata.cache.enabled:true diff --git a/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/AssignmentsRepository.java b/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/AssignmentsRepository.java index cb66bfed0..c38a2158b 100644 --- a/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/AssignmentsRepository.java +++ b/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/AssignmentsRepository.java @@ -30,6 +30,7 @@ import org.apache.commons.lang3.tuple.Pair; import javax.ws.rs.core.StreamingOutput; +import java.util.Collection; import java.util.Date; import java.util.List; import java.util.Map; @@ -123,6 +124,15 @@ StreamingOutput getAssignmentStream(final Experiment.ID experimentID, final Cont */ void pushAssignmentToStaging(String type, String exception, String data); + /** + * Push assignment messages to staging in a BATCH + * + * @param type type of assignment to be staged + * @param exception Exception + * @param data Assignment messages to be pushed to staging + */ + void pushAssignmentsToStaging(String type, String exception, Collection data); + /** * Increments the bucket assignments counter up by 1 if countUp is true * diff --git a/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/accessor/StagingAccessor.java b/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/accessor/StagingAccessor.java index d7db87582..a9b873f91 100644 --- a/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/accessor/StagingAccessor.java +++ b/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/accessor/StagingAccessor.java @@ -15,10 +15,13 @@ *******************************************************************************/ package com.intuit.wasabi.repository.cassandra.accessor; +import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.mapping.annotations.Accessor; import com.datastax.driver.mapping.annotations.Query; +import java.util.UUID; + /** * Accessor interface */ @@ -26,4 +29,8 @@ public interface StagingAccessor { @Query("insert into staging(time, type, exep , msg) values(now(), ?, ? , ?)") ResultSet insertBy(String type, String exception, String message); + + @Query("insert into staging(time, type, exep , msg) values(?, ?, ? , ?)") + BoundStatement batchInsertBy(UUID time, String type, String exception, String message); + } \ No newline at end of file diff --git a/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepository.java b/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepository.java index d4d604614..ea1a611ab 100644 --- a/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepository.java +++ b/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepository.java @@ -65,6 +65,7 @@ import com.intuit.wasabi.repository.cassandra.accessor.index.PageExperimentIndexAccessor; import com.intuit.wasabi.repository.cassandra.pojo.export.UserAssignmentExport; import com.intuit.wasabi.repository.cassandra.pojo.index.ExperimentUserByUserIdContextAppNameExperimentId; +import org.apache.cassandra.utils.UUIDGen; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; @@ -82,6 +83,7 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -681,6 +683,25 @@ public void pushAssignmentToStaging(String type, String exception, String data) } } + /** + * {@inheritDoc} + */ + public void pushAssignmentsToStaging(String type, String exception, Collection data) { + try { + Session session = driver.getSession(); + final BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.UNLOGGED); + final UUID timeUUID = UUIDGen.getTimeUUID(); + data.forEach(message -> { + LOGGER.debug("message={}", message); + batchStatement.add(stagingAccessor.batchInsertBy(timeUUID, type, exception, message)); + }); + session.execute(batchStatement); + LOGGER.debug("Finished pushAssignmentsToStaging"); + } catch (Exception e) { + LOGGER.error("Error occurred while pushAssignmentsToStaging", e); + } + } + @Override public void updateBucketAssignmentCount(Experiment experiment, Assignment assignment, boolean countUp) { Optional labelOptional = Optional.ofNullable(assignment.getBucketLabel());