Skip to content

Commit

Permalink
KAA-537: introduce local fields to hold consumed volume and count of …
Browse files Browse the repository at this point in the history
…MemLogStorage and minor refactor tests
  • Loading branch information
batytskyy committed Jun 25, 2015
1 parent 08b7284 commit 2c55861
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public class MemLogStorage implements LogStorage, LogStorageStatus {
private long maxBucketSize;
private int maxBucketRecordCount;

private volatile long consumedVolume;
private volatile long recordCount;

private final AtomicInteger bucketIdSeq = new AtomicInteger();
private MemBucket currentBucket;
private final Map<Integer, MemBucket> buckets;
Expand All @@ -47,32 +50,14 @@ public MemLogStorage(long maxStorageSize, long bucketSize, int bucketRecordCount

@Override
public long getConsumedVolume() {
long volume = 0;
LOG.trace("Calculating consumed volume");
synchronized (buckets) {
for (MemBucket bucket : buckets.values()) {
if (bucket.getState() != MemBucketState.PENDING) {
volume += bucket.getSize();
}
}
}
LOG.debug("Calculated consumed volume {}", volume);
return volume;
LOG.debug("Consumed volume: {}", consumedVolume);
return consumedVolume;
}

@Override
public long getRecordCount() {
long count = 0;
LOG.trace("Calculating record count");
synchronized (buckets) {
for (MemBucket bucket : buckets.values()) {
if (bucket.getState() != MemBucketState.PENDING) {
count += bucket.getCount();
}
}
}
LOG.debug("Calculated record count {}", count);
return count;
LOG.debug("Record count: {}", recordCount);
return recordCount;
}

@Override
Expand All @@ -96,8 +81,10 @@ public void addLogRecord(LogRecord record) {
buckets.put(currentBucket.getId(), currentBucket);
currentBucket.addRecord(record);
}
recordCount++;
consumedVolume += record.getSize();
}
LOG.trace("Added new log record to bucket [{}]", currentBucket.getId());
LOG.trace("Added a new log record to bucket [{}]", currentBucket.getId());
}

@Override
Expand All @@ -107,6 +94,8 @@ public LogBlock getRecordBlock(long blockSize, int batchCount) {
//TODO: add support of block resize
LOG.warn("Resize of record block is not supported yet");
}

LogBlock result = null;
MemBucket bucketCandidate = null;
synchronized (buckets) {
for (MemBucket bucket : buckets.values()) {
Expand All @@ -119,23 +108,24 @@ public LogBlock getRecordBlock(long blockSize, int batchCount) {
break;
}
}
}

LogBlock result = null;
if (bucketCandidate != null) {
if (bucketCandidate.getState() == MemBucketState.FREE) {
LOG.trace("Only a bucket with state FREE found");
bucketCandidate.setState(MemBucketState.PENDING);
}
if (bucketCandidate.getSize() <= blockSize && bucketCandidate.getCount() <= batchCount) {
result = new LogBlock(bucketCandidate.getId(), bucketCandidate.getRecords());
LOG.debug("Return record block with records count: [{}]", bucketCandidate.getCount());
} else {
LOG.debug("Shrinking bucket {} to new size: [{}] and count: [{}]", bucketCandidate, blockSize, batchCount);
List<LogRecord> overSized = bucketCandidate.shrinkToSize(blockSize, batchCount);
result = new LogBlock(bucketCandidate.getId(), bucketCandidate.getRecords());
for (LogRecord record : overSized) {
addLogRecord(record);
if (bucketCandidate != null) {
consumedVolume -= bucketCandidate.getSize();
recordCount -= bucketCandidate.getCount();
if (bucketCandidate.getState() == MemBucketState.FREE) {
LOG.trace("Only a bucket with state FREE found: [{}]. Changing its state to PENDING", bucketCandidate.getId());
bucketCandidate.setState(MemBucketState.PENDING);
}
if (bucketCandidate.getSize() <= blockSize && bucketCandidate.getCount() <= batchCount) {
result = new LogBlock(bucketCandidate.getId(), bucketCandidate.getRecords());
LOG.debug("Return record block with records count: [{}]", bucketCandidate.getCount());
} else {
LOG.debug("Shrinking bucket {} to new size: [{}] and count: [{}]", bucketCandidate, blockSize, batchCount);
List<LogRecord> overSized = bucketCandidate.shrinkToSize(blockSize, batchCount);
result = new LogBlock(bucketCandidate.getId(), bucketCandidate.getRecords());
for (LogRecord record : overSized) {
addLogRecord(record);
}
}
}
}
Expand All @@ -159,6 +149,8 @@ public void notifyUploadFailed(int id) {
LOG.trace("Upload of record block [{}] failed", id);
synchronized (buckets) {
buckets.get(id).setState(MemBucketState.FULL);
consumedVolume += buckets.get(id).getSize();
recordCount += buckets.get(id).getCount();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,8 @@ public void testRecordCountAndConsumedBytes() {
storage.addLogRecord(record);
}

LogStorageStatus logStorageStatus = (LogStorageStatus) storage;

Assert.assertTrue(logStorageStatus.getRecordCount() == insertionCount);
Assert.assertTrue(logStorageStatus.getConsumedVolume() == (insertionCount * record.getSize()));
Assert.assertTrue(storage.getStatus().getRecordCount() == insertionCount);
Assert.assertTrue(storage.getStatus().getConsumedVolume() == (insertionCount * record.getSize()));
storage.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void testRemovalWithBucketShrinking() {
LogBlock logBlock = storage.getRecordBlock(maxSize, maxCount);
Assert.assertTrue(logBlock.getRecords().size() <= maxCount);
Assert.assertTrue(getLogBlockSize(logBlock) <= maxSize);
Assert.assertEquals(insertionCount - logBlock.getRecords().size(), ((LogStorageStatus) storage).getRecordCount());
Assert.assertEquals(insertionCount - logBlock.getRecords().size(), storage.getStatus().getRecordCount());
}

private long getLogBlockSize(LogBlock logBlock) {
Expand Down

0 comments on commit 2c55861

Please sign in to comment.