Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/hourly counts #274

Open
wants to merge 38 commits into
base: develop
Choose a base branch
from
Open

Feature/hourly counts #274

wants to merge 38 commits into from

Conversation

maxBruckhaus
Copy link
Contributor

@maxBruckhaus maxBruckhaus commented Aug 10, 2017

This pull request is for the write path of the traffic management feature. I created a new counter table for hourly bucket counts. There is an accessor for the table which increments the counts. I added code to populate assignment stats into a concurrent hashmap. I have unit tests which ensure that I am incrementing the counts in the hashmap correctly. I have a scheduled executor service which runs every 60 minutes, using the writeCounts method to write the hourly counts to Cassandra.

Bruckhaus added 27 commits July 27, 2017 23:41

@Inject
public AssignmentsHourlyAggregatorTask(AssignmentStats assignmentStats){
// TODO: Figure out how to pass the correct AssignmentStats instance from CassandraAssignmentsRepository
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm in the process of fixing my scheduled executor service

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter Ack on this one

@@ -175,6 +180,30 @@ private void bindMetadataCache(final Properties properties) {
}
}

private void bindAssignmentsHourlyAggregation() {
//This is the assignment aggregation interval.
Integer assignmentAggregationIntervalInMinutes = 60;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter Would it make sense to extract & configure the actual value in a properties (assignment.properties?) file, instead of hardcoding it in code? This is in case you'd need to update it (at run-time?) in the future

ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("AssignmentHourlyAggregatorTask-%d").setDaemon(true).build();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1, threadFactory);

//Bind Scheduled Executor Service
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter Awesome job with inline comments! Very helpful, thanks 👍


import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter Please don't use .* imports, as this loads all (MANY 😄 ) the classes under java.util. into the JVM. Please import specific classes instead that you're actually using. There's a shortcut to for this in your IDE (IntelliJ?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iizrailevsky Ack, I'll remove usage of the .*

}
}

public void incrementCount(Experiment experiment, Assignment assignment) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter Please add method JavaDoc to include what the method does, input & output parameters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iizrailevsky Ack, will add JavaDoc

Bucket.Label bucketLabel = labelOptional.orElseGet(() -> NULL_LABEL);
AtomicInteger oldCount = hourMap.get(ExpBucket.getKey(id, bucketLabel));
if (oldCount == null) {
synchronized (lock) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter Mind adding a comment to describe why you need to synchronize here? What's the specific reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iizrailevsky This is to ensure that multiple threads do not change the oldCount value at the same time, skewing the final value. Without this, oldCount would be initialized to 1 twice instead of properly being set to 1 and then incrementing to 2. I'll add a comment to clarify.

}
}

public int getCount(Experiment experiment, Bucket.Label bucketLabel, int assignmentHour) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter Same here - please add method JavaDoc's for all the methods in the class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iizrailevsky Ack, will add

for (int i = 0; i < count; i++){
hourlyBucketCountAccessor.incrementCountBy(experimentID, day, bucketLabel, assignmentHour);
}
System.out.println(pair.getKey() + " = " + pair.getValue());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter Please use LOGGER.debug(...) (check other classes), insetad of System.out.println(...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iizrailevsky This was mainly for me to see if my code was working properly, I forgot to take this out before committing. I'll use LOGGER.debug(...) in the future though 👍

hourlyCountMap.put(assignmentHour, new ConcurrentHashMap<>());
}

public Date getLastCompletedHour(long time) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter These utility methods should be declared private as you only use it in this class. Or maybe create another class (AssignmentStatsUtil.java?) and move these methods there?

}

public UUID getExpUUID(Map.Entry pair){
String expIDString = pair.getKey().toString().substring(0, UUID_LENGTH);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter You might need to add error checking here to make sure the pair param is not null?

}

public String getBucketLabel(Map.Entry pair){
return pair.getKey().toString().substring(UUID_LENGTH);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter Same here - error checking...



class ExpBucket {
private Experiment.ID expID;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter These should probably be final as well, as they shouldn't change, right?

}

public static String getKey(Experiment.ID expID, Bucket.Label bucket){
// TODO: Instead of concatenating two strings, redefine hashcode and equals methods.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter 👍 on the TODO

Experiment experiment = mock(Experiment.class);
Assignment assignment = mock(Assignment.class);

String dateInString = "2016-11-08 16";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter Mind adding a few inline comments to describe what you're testing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@iizrailevsky iizrailevsky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter Looks good overall! Some comments, questions, and requests. Once resolved and TODO's are followed up on, & tests pass -> 👍 :shipit:

//Bind Scheduled Executor Service
bind(ScheduledExecutorService.class).annotatedWith(named(ASSIGNMENTS_HOURLY_AGGREGATOR_SERVICE)).to(AssignmentCountScheduledExecutorService.class).in(Singleton.class);
//This is the assignment aggregation interval.
Integer assignmentAggregationIntervalInMinutes = 2;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be changed to 60 minutes for hourly aggregation, but it is 2 at the moment for testing purposes.

import static com.google.inject.name.Names.named;
import static com.intuit.autumn.utils.PropertyFactory.create;
import static com.intuit.autumn.utils.PropertyFactory.getProperty;
import static java.lang.Integer.parseInt;
import static org.slf4j.LoggerFactory.getLogger;

public class CassandraRepositoryModule extends AbstractModule {
public String ASSIGNMENTS_HOURLY_AGGREGATOR_SERVICE = "AssignmentsHourlyAggregatorService";
public String ASSIGNMENTS_AGGREGATOR_INTERVAL = "AssignmentsAggregatorInterval";
public String ASSIGNMENTS_HOURLY_AGGREGATOR_TASK = "AssignmentHourlyAggregatorTask";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter it would be great to define all the related constants in AssignmentStatsAnnotations...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out I had the constants defined in AssignmentStatsAnnotations and in CassandraRepositoryModule. Thanks for catching this, I just fixed it 👍

public interface HourlyBucketCountAccessor {
@Query("UPDATE hourly_bucket_counts SET hourly_bucket_count = hourly_bucket_count + 1 " +
"WHERE experiment_id = ? and day = ? and bucket_label = ? and event_time_hour = ? ")
ResultSet incrementCountBy(UUID experimentId, String day, String bucketLabel, int eventTimeHour);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter Major: it would be great to increment counter by given 'N' number rather than just 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


@Override
public void run() {
assignmentStats.writeCounts();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter Major: In order to make this scheduled executor service robust, you need to wrap complete run() method implementation in TRY-CATCH block, so that in case of any exception scheduled thread will be LIVE

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter Also, if you add a INFO log here when this executor service starts & finishes with time taken to track the working of this service that would be great... similar to ScheduledCacheRefreshService...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (oldCount == null) {
synchronized (lock) { // oldCount would be initialized to 1 twice w/o this
oldCount = hourMap.get(key);
if (oldCount == null) { // double-checked locking
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter 👍 for double-checked locking

* @param experiment
* @param assignment
*/
public void incrementCount(Experiment experiment, Assignment assignment) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter it would be great to add execution flow debug logging with some inline comments of what is being done...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

String day = AssignmentStatsUtil.getDayString(completedHour);

for (String key : hourlyCountMap.get(assignmentHour).keySet()){
String experimentID = key.substring(0, UUID_LENGTH);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter it would be better to use 'ExpBucket' as a key for second level map so that you dont have to deal with the parsing logic here... E.g. Map<Integer, Map<ExpBucket, AtomicInteger>> hourlyCountMap - instead of - Map<Integer, Map<String, AtomicInteger>> hourlyCountMap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* @param date the date containing the last completed hour
* @return int representing an hour of the day
*/
static int getHour(Date date) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter Major: SimpleDateFormatter is not thread-safe so here thread might run into race condition and so data corruption...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched usage of SimpleDateFormatter to DateTimeFormatter. I also replaced usage of Date with DateTime. I used the parsers from Joda Time instead, as they are thread safe.

import com.intuit.wasabi.experimentobjects.Bucket;


class ExpBucket {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter Minor: This class can be renamed better may be ExpBucket-> ExperimentBucketKey

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter Major: This class needs to overrides equals() & hashcode() methods, probably add Builder as well...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -0,0 +1,23 @@
package com.intuit.wasabi.repository.cassandra.impl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter Needs copyrights statement here...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -19,3 +19,4 @@ export.pool.size:5
default.time.format:${default.time.format}
cassandra.mutagen.root.resource.path:${cassandra.mutagen.root.resource.path}
mysql.mutagen.root.resource.path:${mysql.mutagen.root.resource.path}
bucket.count.aggregation.interval:60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mooxter here value for bucket.count.aggregation.interval can be taken from pom.xml - similar to other configuration keys...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants