Skip to content
This repository has been archived by the owner on Feb 5, 2020. It is now read-only.

Commit

Permalink
Using per-binary chunk size for retrieval
Browse files Browse the repository at this point in the history
  • Loading branch information
ajs6f committed Dec 12, 2018
1 parent 54b7841 commit 47cf68c
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 26 deletions.
18 changes: 11 additions & 7 deletions impl/src/main/java/edu/si/trellis/cassandra/CassandraBinary.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Statement;

import edu.si.trellis.cassandra.CassandraBinaryService.BinaryContext;
import edu.si.trellis.cassandra.CassandraBinaryService.BinaryQueryContext;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -30,17 +30,21 @@ public class CassandraBinary implements Binary {

private final IRI id;

private final BinaryContext context;
private final BinaryQueryContext context;

private final int chunkLength;

/**
* @param id identifier for this {@link Binary}
* @param size size in bytes of this {@code Binary}
* @param c context for queries
* @param length the length of chunk to use in Cassandra
*/
public CassandraBinary(IRI id, Long size, BinaryContext c) {
public CassandraBinary(IRI id, Long size, BinaryQueryContext c, int length) {
this.id = id;
this.size = size;
this.context = c;
this.chunkLength = length;
}

@Override
Expand All @@ -55,9 +59,9 @@ public InputStream getContent() {

@Override
public BoundedInputStream getContent(int from, int to) {
int firstChunk = from / context.maxChunkLength();
int lastChunk = to / context.maxChunkLength();
int chunkStreamStart = from % context.maxChunkLength();
int firstChunk = from / chunkLength;
int lastChunk = to / chunkLength;
int chunkStreamStart = from % chunkLength;
int rangeSize = to - from + 1; // +1 because range is inclusive
Statement boundStatement = context.readRangeStatement().bind(id.getIRIString(), firstChunk, lastChunk);

Expand All @@ -76,7 +80,7 @@ private InputStream retrieve(Statement boundStatement) {
return stream(context.session().execute(statementWithConsistency).spliterator(), false)
.map(r -> r.getInt("chunk_index"))
.peek(chunkNumber -> log.debug("Found pointer to chunk: {}", chunkNumber))
.map(chunkNumber -> context.readSingleChunk().bind(id, chunkNumber).setConsistencyLevel(readConsistency))
.map(chunkNumber -> context.readSingleChunk().bind(id, chunkNumber))
.<InputStream> map(statement -> new LazyChunkInputStream(context.session(), statement))
.reduce(SequenceInputStream::new) // chunks now in one large stream
.orElseThrow(() -> new RuntimeTrellisException("Binary not found under IRI: " + id.getIRIString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ public class CassandraBinaryService extends CassandraService implements BinarySe

private PreparedStatement deleteStatement, insertStatement, retrieveStatement;

private final BinaryContext queries;
private final BinaryQueryContext queries;

private final IdentifierService idService;

private final int maxChunkLength;

/**
* @param idService {@link IdentifierService} to use for binaries
* @param session {@link Session} to use to connect to Cassandra
Expand All @@ -71,8 +74,9 @@ public CassandraBinaryService(IdentifierService idService, Session session, @Max
@BinaryWriteConsistency ConsistencyLevel writeCons) {
super(session, readCons, writeCons);
this.idService = idService;
this.maxChunkLength = chunkLength;
log.info("Using chunk length: {}", chunkLength);
this.queries = new BinaryContext(session(), chunkLength, readConsistency());
this.queries = new BinaryQueryContext(session(), readConsistency());
}

@PostConstruct
Expand All @@ -91,7 +95,7 @@ public CompletableFuture<Binary> get(IRI id) {
log.debug("Binary {} was {}found", id, wasFound ? "" : "not ");
if (!wasFound) throw new RuntimeTrellisException("Binary not found under IRI: " + id.getIRIString());
return meta;
}).thenApply(r -> r.getLong("size")).thenApply(size -> new CassandraBinary(id, size, queries));
}).thenApply(r -> r.getLong("size")).thenApply(size -> new CassandraBinary(id, size, queries, maxChunkLength));
}

@Override
Expand All @@ -105,18 +109,21 @@ private CompletableFuture<Long> setChunk(BinaryMetadata meta, InputStream stream
IRI id = meta.getIdentifier();
Long size = meta.getSize().orElse(null);
log.debug("Recording chunk {} of binary content under: {}", chunkIndex.get(), id);
try (CountingInputStream countingChunk = new CountingInputStream(
new BoundedInputStream(stream, queries.maxChunkLength()))) {

try (BoundedInputStream bs = new BoundedInputStream(stream, maxChunkLength);
CountingInputStream countingChunk = new CountingInputStream(bs)) {
@SuppressWarnings("cast")
// upcast to match this object with InputStreamCodec
InputStream chunk = (InputStream) countingChunk;
Statement boundStatement = insertStatement.bind(id, size, chunkIndex.getAndIncrement(), chunk)
.setConsistencyLevel(LOCAL_QUORUM);
return translate(session().executeAsync(boundStatement.setConsistencyLevel(writeConsistency())))
.thenApply(r -> countingChunk.getByteCount())
.thenComposeAsync(bytesStored -> bytesStored == queries.maxChunkLength()
.thenComposeAsync(bytesStored -> bytesStored == maxChunkLength
? setChunk(meta, stream, chunkIndex)
: completedFuture(DONE), mappingThread);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

Expand Down Expand Up @@ -153,7 +160,7 @@ public String generateIdentifier() {
*/
private Executor mappingThread = Runnable::run;

static class BinaryContext {
static class BinaryQueryContext {

private final Session session;

Expand All @@ -165,23 +172,16 @@ static class BinaryContext {

private PreparedStatement readRangeStatement, readStatement, readChunkStatement;

private final int maxChunkLength;

private final ConsistencyLevel readConsistency;

public BinaryContext(Session session, int maxChunkLength, ConsistencyLevel consistency) {
public BinaryQueryContext(Session session, ConsistencyLevel consistency) {
this.session = session;
this.readStatement = session.prepare(READ_QUERY);
this.readRangeStatement = session.prepare(READ_RANGE_QUERY);
this.readChunkStatement = session.prepare(READ_CHUNK_QUERY);
this.maxChunkLength = maxChunkLength;
this.readConsistency = consistency;
}

int maxChunkLength() {
return maxChunkLength;
}

Session session() {
return session;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import com.datastax.driver.core.*;

import edu.si.trellis.cassandra.CassandraBinaryService.BinaryContext;
import edu.si.trellis.cassandra.CassandraBinaryService.BinaryQueryContext;

import java.io.InputStream;
import java.util.Spliterator;
Expand All @@ -32,10 +32,12 @@ public class CassandraBinaryTest {

private Long testSize;

private int testChunkSize;

private final IRI testId = factory.createIRI("urn:test");

@Mock
private BinaryContext mockContext;
private BinaryQueryContext mockContext;

@Mock
private PreparedStatement mockPreparedStatement1, mockPreparedStatement2;
Expand All @@ -59,7 +61,7 @@ public class CassandraBinaryTest {

@Test
public void correctSize() {
CassandraBinary testCassandraBinary = new CassandraBinary(testId, testSize, mockContext);
CassandraBinary testCassandraBinary = new CassandraBinary(testId, testSize, mockContext, testChunkSize);
assertSame(testSize, testCassandraBinary.getSize());
}

Expand All @@ -74,7 +76,7 @@ public void noContent() {
testSpliterator = new TestRowSpliterator(0, mockRow);
when(mockResultSet1.spliterator()).thenReturn(testSpliterator);

CassandraBinary testCassandraBinary = new CassandraBinary(testId, testSize, mockContext);
CassandraBinary testCassandraBinary = new CassandraBinary(testId, testSize, mockContext, testChunkSize);

try {
testCassandraBinary.getContent();
Expand Down

0 comments on commit 47cf68c

Please sign in to comment.