Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove archiving #754

Merged
merged 2 commits into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,6 @@ public void handle(String target, Request baseRequest, HttpServletRequest req, H

if (Constants.API_ENDPOINT_FETCH.equals(target)) {
baseRequest.setHandled(true);
} else if (Constants.API_ENDPOINT_AFETCH.equals(target)) {
baseRequest.setHandled(true);
} else if (Constants.API_ENDPOINT_SFETCH.equals(target)) {
baseRequest.setHandled(true);
splitFetch = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,6 @@ public void addPlasmaHandler(StandalonePlasmaHandlerInterface handler) {
public void store(GTSEncoder encoder) throws IOException {
throw new RuntimeException("Not Implemented.");
}

@Override
public void archive(int chunk, GTSEncoder encoder) throws IOException {
throw new RuntimeException("Not Implemented.");
}

@Override
public long delete(WriteToken token, Metadata metadata, long start, long end) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,6 @@ public void run() {
backend.dispatch(clslbls, msg, tmsg, outSipHashKey, outAESKey);
break;
case DELETE:
case ARCHIVE:
break;
default:
throw new RuntimeException("Invalid message type.");
Expand Down
5 changes: 0 additions & 5 deletions warp10/src/main/java/io/warp10/continuum/store/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,6 @@ public class Constants {
* Split fetch endpoint
*/
public static final String API_ENDPOINT_SFETCH = "/api/v0/sfetch";

/**
* Archive Fetch endpoint for the API
*/
public static final String API_ENDPOINT_AFETCH = "/api/v0/afetch";

/**
* Delete endpoint for the API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ public class SlicedRowFilterGTSDecoderIterator extends GTSDecoderIterator implem
private static final byte[] ZERO_BYTES = Longs.toByteArray(0L);
private static final byte[] ONES_BYTES = Longs.toByteArray(0xffffffffffffffffL);

// FIXME(hbs): use a different prefix for archived data
private static byte[] prefix = Constants.HBASE_RAW_DATA_KEY_PREFIX;

private final boolean writeTimestamp;
Expand All @@ -100,7 +99,6 @@ public SlicedRowFilterGTSDecoderIterator(long now, long timespan, List<Metadata>
this.keystore = keystore;
this.now = now;
this.timespan = timespan;
// FIXME(hbs): different key for archival
this.hbaseAESKey = keystore.getKey(KeyStore.AES_HBASE_DATA);
this.writeTimestamp = writeTimestamp;

Expand Down
18 changes: 0 additions & 18 deletions warp10/src/main/java/io/warp10/continuum/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,6 @@ public class Store extends Thread {

private static final Logger LOG = LoggerFactory.getLogger(Store.class);

/**
* Prefix for 'archived' data
*/
public static final byte[] HBASE_ARCHIVE_DATA_KEY_PREFIX = "A".getBytes(StandardCharsets.UTF_8);

/**
* Set of required parameters, those MUST be set
*/
Expand Down Expand Up @@ -1134,9 +1129,6 @@ public void run() {
case DELETE:
handleDelete(ht, tmsg);
break;
case ARCHIVE:
handleArchive(ht, tmsg);
break;
default:
throw new RuntimeException("Invalid message type.");
}
Expand Down Expand Up @@ -1545,16 +1537,6 @@ public Object call() throws Exception {
}
}
}

private void handleArchive(Table ht, KafkaDataMessage msg) {

if (KafkaDataMessageType.ARCHIVE != msg.getType()) {
return;
}


throw new RuntimeException("Archive not implemented yet.");
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

public interface StoreClient {
public void store(GTSEncoder encoder) throws IOException;
public void archive(int chunk, GTSEncoder encoder) throws IOException;
public long delete(WriteToken token, Metadata metadata, long start, long end) throws IOException;
/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ public void addPlasmaHandler(StandalonePlasmaHandlerInterface handler) {}

@Override
public void store(GTSEncoder encoder) throws IOException {}

@Override
public void archive(int chunk, GTSEncoder encoder) throws IOException {}

@Override
public long delete(WriteToken token, Metadata metadata, long start, long end) throws IOException { return 0L; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ public void store(GTSEncoder encoder) throws java.io.IOException {
}
}

@Override
public void archive(int chunk, GTSEncoder encoder) throws IOException {
throw new IOException("Archive not implemented.");
}

@Override
public long delete(WriteToken token, Metadata metadata, long start, long end) throws IOException { return 0L; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,6 @@ public void store(GTSEncoder encoder) throws IOException {
}
}

@Override
public void archive(int chunk, GTSEncoder encoder) throws IOException {
throw new IOException("Not Implemented");
}

public static final void nocache() {
if (null != instance) {
nocache.set(Boolean.TRUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,6 @@ public void store(GTSEncoder encoder) throws IOException {
}
}

@Override
public void archive(int chunk, GTSEncoder encoder) throws IOException {
throw new IOException("in-memory platform does not support archiving.");
}

@Override
public void run() {
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ public void run() {
}

@Override
//public GTSDecoderIterator fetch(final ReadToken token, final List<Metadata> metadatas, final long now, final long timespan, boolean fromArchive, boolean writeTimestamp, final int preBoundary, final int postBoundary) {
public GTSDecoderIterator fetch(final ReadToken token, final List<Metadata> metadatas, final long now, final long then, final long count, final long skip, final double sample, boolean writeTimestamp, final long preBoundary, final long postBoundary) {

if (0 != preBoundary || 0 != postBoundary) {
Expand Down Expand Up @@ -409,11 +408,6 @@ public void store(GTSEncoder encoder) throws IOException {
}
}

@Override
public void archive(int chunk, GTSEncoder encoder) throws IOException {
throw new IOException("in-memory platform does not support archiving.");
}

@Override
public void run() {
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ public void addPlasmaHandler(StandalonePlasmaHandlerInterface handler) {
parent.addPlasmaHandler(handler);
}

@Override
public void archive(int chunk, GTSEncoder encoder) throws IOException {
parent.archive(chunk, encoder);
}

@Override
public long delete(WriteToken token, Metadata metadata, long start, long end) throws IOException {
return parent.delete(token, metadata, start, end);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,6 @@ public void addPlasmaHandler(StandalonePlasmaHandlerInterface handler) {
this.client.addPlasmaHandler(handler);
}

@Override
public void archive(int chunk, GTSEncoder encoder) throws IOException {
throw new IOException("Archive is not implemented.");
}

@Override
public long delete(WriteToken token, Metadata metadata, long start, long end) throws IOException {
return this.client.delete(token, metadata, start, end);
Expand Down
122 changes: 0 additions & 122 deletions warp10/src/main/java/io/warp10/standalone/StandaloneStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -516,128 +516,6 @@ public void store(GTSEncoder encoder) throws IOException {
}
}

@Override
public void archive(int chunk, GTSEncoder encoder) throws IOException {

if (null == encoder || chunk < 0) {
store((List<byte[][]>) null);
return;
}

//
// If the basetimestamp is not 0, throw an error
//

if (0 != encoder.getBaseTimestamp()) {
throw new IOException("Invalid base timestamp.");
}

//
// Add the wrapping key
//

encoder.setWrappingKey(this.keystore.getKey(KeyStore.AES_LEVELDB_DATA));

//
// If chunk is 0, remove the archived data first
//

long count = 0;

if (0 == chunk) {
DBIterator iterator = this.db.iterator();

byte[] seekto = new byte[Store.HBASE_ARCHIVE_DATA_KEY_PREFIX.length + 8 + 8];
ByteBuffer bb = ByteBuffer.wrap(seekto).order(ByteOrder.BIG_ENDIAN);
bb.put(Store.HBASE_ARCHIVE_DATA_KEY_PREFIX);
bb.putLong(encoder.getClassId());
bb.putLong(encoder.getLabelsId());

iterator.seek(seekto);

WriteBatch batch = this.db.createWriteBatch();
int batchsize = 0;

WriteOptions options = new WriteOptions().sync(1.0 == syncrate);

while (iterator.hasNext()) {
Entry<byte[],byte[]> entry = iterator.next();

if (0 == Bytes.compareTo(entry.getKey(), 0, seekto.length, seekto, 0, seekto.length)) {
batch.delete(entry.getKey());
batchsize++;

if (MAX_DELETE_BATCHSIZE <= batchsize) {
if (syncwrites) {
options = new WriteOptions().sync(Math.random() < syncrate);
}
this.db.write(batch, options);
batch.close();
batch = this.db.createWriteBatch();
batchsize = 0;
}
//this.db.delete(entry.getKey());
count++;
} else {
break;
}
}

if (batchsize > 0) {
if (syncwrites) {
options = new WriteOptions().sync(Math.random() < syncrate);
}
this.db.write(batch, options);
}
iterator.close();
batch.close();
}

int v = chunk;
int vbytes = 1;

//
// Compute the number of bytes needed to represent the chunk
//

while(0 != v) {
vbytes++;
v = v >>> 8;
}

byte[] key = new byte[Store.HBASE_ARCHIVE_DATA_KEY_PREFIX.length + 8 + 8 + vbytes + vbytes - 1];
ByteBuffer bb = ByteBuffer.wrap(key).order(ByteOrder.BIG_ENDIAN);
bb.put(Store.HBASE_ARCHIVE_DATA_KEY_PREFIX);
bb.putLong(encoder.getClassId());
bb.putLong(encoder.getLabelsId());

//
// Fill the key with vbytes - 1 0xff
//

for (int i = 0; i < vbytes - 1; i++) {
bb.put((byte) 0xff);
}

//
// Output chunk id
//

for (int i = vbytes - 1; i >= 0; i--) {
bb.put((byte) ((chunk >>> i) & 0xff));
}

List<byte[][]> kvs = new ArrayList<byte[][]>();

kvs.add(new byte[][] { key, encoder.getBytes() });

store(kvs);

//
// We don't propagate data to the plasma handler when archiving
//
}

@Override
public long delete(WriteToken token, Metadata metadata, long start, long end) throws IOException {

Expand Down