Skip to content

Commit

Permalink
KAA-772: Notify futures for all record in a bucket instead of the las…
Browse files Browse the repository at this point in the history
…t added. Rename some API in data collection feature.
  • Loading branch information
Denis Kimcherenko committed Jan 12, 2016
1 parent d1a5f82 commit 325f6c2
Show file tree
Hide file tree
Showing 25 changed files with 345 additions and 235 deletions.
Expand Up @@ -124,10 +124,10 @@ public LogStorageStatus getStatus() {
} }


@Override @Override
public LogBlock getRecordBlock() { public LogBucket getNextBucket() {
synchronized (database) { synchronized (database) {
Log.d(TAG, "Creating a new record block"); Log.d(TAG, "Creating a new record block");
LogBlock logBlock = null; LogBucket logBlock = null;
Cursor cursor = null; Cursor cursor = null;
List<LogRecord> logRecords = new LinkedList<>(); List<LogRecord> logRecords = new LinkedList<>();
int bucketId = 0; int bucketId = 0;
Expand Down Expand Up @@ -160,17 +160,17 @@ public LogBlock getRecordBlock() {


if (!logRecords.isEmpty()) { if (!logRecords.isEmpty()) {
updateBucketState(bucketId); updateBucketState(bucketId);
logBlock = new LogBlock(bucketId, logRecords); logBlock = new LogBucket(bucketId, logRecords);


long logBlockSize = maxBucketSize - leftBucketSize; long logBlockSize = maxBucketSize - leftBucketSize;
unmarkedConsumedSize -= logBlockSize; unmarkedConsumedSize -= logBlockSize;
unmarkedRecordCount -= logRecords.size(); unmarkedRecordCount -= logRecords.size();
consumedMemoryStorage.put(logBlock.getBlockId(), logBlockSize); consumedMemoryStorage.put(logBlock.getBucketId(), logBlockSize);


if (currentBucketId == bucketId) { if (currentBucketId == bucketId) {
moveToNextBucket(); moveToNextBucket();
} }
Log.i(TAG, "Created log block: id [" + logBlock.getBlockId() + "], size: " + Log.i(TAG, "Created log block: id [" + logBlock.getBucketId() + "], size: " +
logBlockSize + ". Log block record count: " + logBlockSize + ". Log block record count: " +
logBlock.getRecords().size() + ", total record count: " + totalRecordCount + logBlock.getRecords().size() + ", total record count: " + totalRecordCount +
", unmarked record count: " + unmarkedRecordCount); ", unmarked record count: " + unmarkedRecordCount);
Expand Down Expand Up @@ -217,7 +217,7 @@ private void updateBucketState(int bucketId) {
} }


@Override @Override
public void removeRecordBlock(int recordBlockId) { public void removeBucket(int recordBlockId) {
synchronized (database) { synchronized (database) {
Log.d(TAG, "Removing record block with id [" + recordBlockId + "] from storage"); Log.d(TAG, "Removing record block with id [" + recordBlockId + "] from storage");
if (deleteByBucketIdStatement == null) { if (deleteByBucketIdStatement == null) {
Expand Down Expand Up @@ -248,7 +248,7 @@ public void removeRecordBlock(int recordBlockId) {
} }


@Override @Override
public void notifyUploadFailed(int bucketId) { public void rollbackBucket(int bucketId) {
synchronized (database) { synchronized (database) {
Log.d(TAG, "Notifying upload fail for bucket id: " + bucketId); Log.d(TAG, "Notifying upload fail for bucket id: " + bucketId);
if (resetBucketIdStatement == null) { if (resetBucketIdStatement == null) {
Expand Down
Expand Up @@ -17,10 +17,11 @@ package org.kaaproject.kaa.client;


import java.io.IOException; import java.io.IOException;
import java.security.GeneralSecurityException; import java.security.GeneralSecurityException;
import java.util.concurrent.Future;


import javax.annotation.Generated; import javax.annotation.Generated;


import org.kaaproject.kaa.client.logging.future.BucketFuture; import org.kaaproject.kaa.client.logging.BucketInfo;


import ${configuration_class_package}.${configuration_class}; import ${configuration_class_package}.${configuration_class};
import ${log_record_class_package}.${log_record_class}; import ${log_record_class_package}.${log_record_class};
Expand All @@ -44,7 +45,7 @@ public class BaseKaaClient extends AbstractKaaClient implements KaaClient {
} }


@Override @Override
public BucketFuture addLogRecord(${log_record_class} record) { public Future<BucketInfo> addLogRecord(${log_record_class} record) {
checkClientState(State.STARTED, "Kaa client is not started"); checkClientState(State.STARTED, "Kaa client is not started");
return logCollector.addLogRecord(record); return logCollector.addLogRecord(record);
} }
Expand Down
Expand Up @@ -18,6 +18,7 @@ package org.kaaproject.kaa.client;


import java.security.PrivateKey; import java.security.PrivateKey;
import java.security.PublicKey; import java.security.PublicKey;
import java.util.concurrent.Future;


import javax.annotation.Generated; import javax.annotation.Generated;


Expand All @@ -26,16 +27,13 @@ import org.kaaproject.kaa.client.channel.KaaDataChannel;
import org.kaaproject.kaa.client.event.EventFamilyFactory; import org.kaaproject.kaa.client.event.EventFamilyFactory;
import org.kaaproject.kaa.client.event.EventListenersResolver; import org.kaaproject.kaa.client.event.EventListenersResolver;
import org.kaaproject.kaa.client.event.registration.EndpointRegistrationManager; import org.kaaproject.kaa.client.event.registration.EndpointRegistrationManager;
import org.kaaproject.kaa.client.logging.future.BucketFuture; import org.kaaproject.kaa.client.logging.BucketInfo;


import ${configuration_class_package}.${configuration_class}; import ${configuration_class_package}.${configuration_class};
import ${log_record_class_package}.${log_record_class}; import ${log_record_class_package}.${log_record_class};


/** /**
* <p> * <p>Base interface to operate with {@link Kaa} library.</p>
* Base interface to operate with {@link Kaa} library.
*
* </p>
* *
* @author Yaroslav Zeygerman * @author Yaroslav Zeygerman
* @author Andrew Shvayka * @author Andrew Shvayka
Expand All @@ -54,9 +52,10 @@ public interface KaaClient extends GenericKaaClient {
/** /**
* Adds new log record to local storage. * Adds new log record to local storage.
* *
* @param record New log record object * @param record New log record object.
* @return The {@link Future} object which allows tracking a delivery status of a log record.
*/ */
BucketFuture addLogRecord(${log_record_class} record); Future<BucketInfo> addLogRecord(${log_record_class} record);


/** /**
* Returns latest configuration. * Returns latest configuration.
Expand Down
Expand Up @@ -16,22 +16,26 @@
package org.kaaproject.kaa.client.logging; package org.kaaproject.kaa.client.logging;


import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Future;

import javax.annotation.Generated;


import org.kaaproject.kaa.client.context.ExecutorContext;
import org.kaaproject.kaa.client.channel.FailoverManager; import org.kaaproject.kaa.client.channel.FailoverManager;
import org.kaaproject.kaa.client.channel.KaaChannelManager; import org.kaaproject.kaa.client.channel.KaaChannelManager;
import org.kaaproject.kaa.client.channel.LogTransport; import org.kaaproject.kaa.client.channel.LogTransport;
import org.kaaproject.kaa.client.context.ExecutorContext;
import org.kaaproject.kaa.client.logging.future.BucketFuture;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import org.kaaproject.kaa.client.logging.future.BucketFuture; import ${log_record_class_package}.${log_record_class};
import org.kaaproject.kaa.client.logging.BucketInfo;


/** /**
* Reference implementation of @see LogCollector * Reference implementation of @see LogCollector
* *
* @author Andrew Shvayka * @author Andrew Shvayka
*/ */
@Generated("DefaultLogCollector.java.template")
public class DefaultLogCollector extends AbstractLogCollector { public class DefaultLogCollector extends AbstractLogCollector {
private static final Logger LOG = LoggerFactory.getLogger(DefaultLogCollector.class); private static final Logger LOG = LoggerFactory.getLogger(DefaultLogCollector.class);


Expand All @@ -41,22 +45,23 @@ public class DefaultLogCollector extends AbstractLogCollector {
} }


@Override @Override
public BucketFuture<BucketInfo> addLogRecord(final ${log_record_class_package}.${log_record_class} record) { public Future<BucketInfo> addLogRecord(final ${log_record_class} record) {
final BucketFuture<BucketInfo> future = new BucketFuture<>(); final BucketFuture<BucketInfo> future = new BucketFuture<>();
executorContext.getApiExecutor().execute(new Runnable() { executorContext.getApiExecutor().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
BucketInfo bucketInfo = storage.addLogRecord(new LogRecord(record)); BucketInfo bucketInfo = storage.addLogRecord(new LogRecord(record));
bucketInfoMap.put(bucketInfo.getBucketId(), bucketInfo); bucketInfoMap.put(bucketInfo.getBucketId(), bucketInfo);
futureMap.put(bucketInfo.getBucketId(), future); addDeliveryFuture(bucketInfo, future);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Can't serialize log record {}", record); LOG.warn("Can't serialize log record {}", record);
} }


uploadIfNeeded(); uploadIfNeeded();
} }
}); });

return future; return future;
} }
} }
Expand Up @@ -16,8 +16,11 @@


package org.kaaproject.kaa.client.logging; package org.kaaproject.kaa.client.logging;


import org.kaaproject.kaa.client.logging.future.BucketFuture; import java.util.concurrent.Future;
import org.kaaproject.kaa.client.logging.BucketInfo;
import javax.annotation.Generated;

import ${log_record_class_package}.${log_record_class};


/** /**
* <p>Interface for a log collector.</p> * <p>Interface for a log collector.</p>
Expand All @@ -29,20 +32,21 @@ import org.kaaproject.kaa.client.logging.BucketInfo;
* Each of them may be set independently of others.</p> * Each of them may be set independently of others.</p>
* *
* <p>Reference implementation of each module used by default.</p> * <p>Reference implementation of each module used by default.</p>
* *
* <p>This interface is auto-generated.</p> * <p>This interface is auto-generated.</p>
* *
* @see LogStorage * @see GenericLogCollector
* @see LogStorageStatus * @see BucketInfo
* @see LogUploadStrategy
* @see LogUploadConfiguration
*/ */
public interface LogCollector extends GenericLogCollector{ @Generated("LogCollector.java.template")
public interface LogCollector extends GenericLogCollector {


/** /**
* Adds new log record to local storage. * Adds a log record to a log storage.
*
* @param record A log record object.
* *
* @param record New log record object * @return The {@link Future} object which allows tracking a delivery status of a log record.
*/ */
BucketFuture<BucketInfo> addLogRecord(${log_record_class_package}.${log_record_class} record); Future<BucketInfo> addLogRecord(${log_record_class} record);
} }
Expand Up @@ -17,10 +17,11 @@


import java.io.IOException; import java.io.IOException;
import java.security.GeneralSecurityException; import java.security.GeneralSecurityException;
import java.util.concurrent.Future;


import javax.annotation.Generated; import javax.annotation.Generated;


import org.kaaproject.kaa.client.logging.future.BucketFuture; import org.kaaproject.kaa.client.logging.BucketInfo;
import org.kaaproject.kaa.schema.base.Configuration; import org.kaaproject.kaa.schema.base.Configuration;
import org.kaaproject.kaa.schema.base.Log; import org.kaaproject.kaa.schema.base.Log;


Expand All @@ -43,7 +44,7 @@ public BaseKaaClient(KaaClientPlatformContext context, KaaClientStateListener li
} }


@Override @Override
public BucketFuture addLogRecord(Log record) { public Future<BucketInfo> addLogRecord(Log record) {
checkClientState(State.STARTED, "Kaa client is not started"); checkClientState(State.STARTED, "Kaa client is not started");
return logCollector.addLogRecord(record); return logCollector.addLogRecord(record);
} }
Expand Down
Expand Up @@ -591,7 +591,7 @@ public interface GenericKaaClient {




/** /**
* Sets callback for receiving log events * Set a listener which receives a delivery status of each log bucket.
* *
* @param listener the listener * @param listener the listener
* *
Expand Down
Expand Up @@ -18,6 +18,7 @@


import java.security.PrivateKey; import java.security.PrivateKey;
import java.security.PublicKey; import java.security.PublicKey;
import java.util.concurrent.Future;


import javax.annotation.Generated; import javax.annotation.Generated;


Expand All @@ -26,7 +27,7 @@
import org.kaaproject.kaa.client.event.EventFamilyFactory; import org.kaaproject.kaa.client.event.EventFamilyFactory;
import org.kaaproject.kaa.client.event.EventListenersResolver; import org.kaaproject.kaa.client.event.EventListenersResolver;
import org.kaaproject.kaa.client.event.registration.EndpointRegistrationManager; import org.kaaproject.kaa.client.event.registration.EndpointRegistrationManager;
import org.kaaproject.kaa.client.logging.future.BucketFuture; import org.kaaproject.kaa.client.logging.BucketInfo;
import org.kaaproject.kaa.schema.base.Configuration; import org.kaaproject.kaa.schema.base.Configuration;
import org.kaaproject.kaa.schema.base.Log; import org.kaaproject.kaa.schema.base.Log;


Expand All @@ -53,10 +54,10 @@ public interface KaaClient extends GenericKaaClient {
/** /**
* Adds new log record to local storage. * Adds new log record to local storage.
* *
* @param record * @param record A log record object.
* New log record object * @return The {@link Future} object which allows tracking a delivery status of a log record.
*/ */
BucketFuture addLogRecord(Log record); Future<BucketInfo> addLogRecord(Log record);


/** /**
* Returns latest configuration. * Returns latest configuration.
Expand Down

0 comments on commit 325f6c2

Please sign in to comment.