Navigation Menu

Skip to content

Commit

Permalink
Reaper #369 Pull-only backend for tracking streams
Browse files Browse the repository at this point in the history
  • Loading branch information
Radovan Zvoncek authored and rzvoncek committed Aug 8, 2018
1 parent 196406d commit b11518a
Show file tree
Hide file tree
Showing 18 changed files with 2,409 additions and 8 deletions.
2 changes: 2 additions & 0 deletions src/server/src/main/java/io/cassandrareaper/AppContext.java
Expand Up @@ -19,6 +19,7 @@
import io.cassandrareaper.service.PurgeManager;
import io.cassandrareaper.service.RepairManager;
import io.cassandrareaper.service.SnapshotManager;
import io.cassandrareaper.service.StreamManager;
import io.cassandrareaper.storage.IStorage;

import java.net.InetAddress;
Expand Down Expand Up @@ -50,6 +51,7 @@ public final class AppContext {
public SnapshotManager snapshotManager;
public PurgeManager purgeManager;
public MetricsGrabber metricsGrabber;
public StreamManager streamManager;

private static String initialiseInstanceAddress() {
String reaperInstanceAddress;
Expand Down
Expand Up @@ -33,6 +33,7 @@
import io.cassandrareaper.service.RepairManager;
import io.cassandrareaper.service.SchedulingManager;
import io.cassandrareaper.service.SnapshotManager;
import io.cassandrareaper.service.StreamManager;
import io.cassandrareaper.storage.CassandraStorage;
import io.cassandrareaper.storage.IDistributedStorage;
import io.cassandrareaper.storage.IStorage;
Expand Down Expand Up @@ -161,6 +162,7 @@ public void run(ReaperApplicationConfiguration config, Environment environment)
environment.lifecycle().executorService("SnapshotManager").minThreads(5).maxThreads(5).build());

context.metricsGrabber = MetricsGrabber.create(context);
context.streamManager = StreamManager.create(context);

int repairThreads = config.getRepairRunThreadCount();
LOG.info("initializing runner thread pool with {} threads", repairThreads);
Expand Down
318 changes: 318 additions & 0 deletions src/server/src/main/java/io/cassandrareaper/core/Stream.java
@@ -0,0 +1,318 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.cassandrareaper.core;

import java.util.List;

import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.google.common.collect.ImmutableList;


/**
* This class describes the state of a file exchange between two Cassandra nodes.
*/
public final class Stream {

/**
* String identifier of this stream.
*
* <p>Needs to be unique within a StreamSession.
*/
private final String id;

/**
* Node on the _local_ side of the stream. The JMX notification came from this node.
*/
private final String host;

/**
* Node on the _remote_ side of the stream.
*/
private final String peer;

/**
* The direction of data transfer relative from host.
*/
private final Direction direction;

/**
* Total number of bytes to transfer from host to peer.
*/
private final long sizeToSend;

/**
* Total number of bytes to transfer from peer to host.
*/
private final long sizeToReceive;

/**
* Number of bytes transferred from host to peer. Needed only to make the JSON automagic see this field.
*/
private final long sizeSent;

/**
* Number of bytes transferred from peer to host. Needed only to make the JSON automagic see this field.
*/
private final long sizeReceived;

/**
* Per keyspace.table total number of bytes sent.
*/
private final List<TableProgress> progressSent ;

/**
* Per keyspace.table total number of bytes received.
*/
private final List<TableProgress> progressReceived;

/**
* Timestamp of the last update of this stream.
*/
private final long lastUpdated;

/**
* Indicates if the stream has completed.
*/
private final boolean completed;

/**
* Indicates if the stream has completed successfully.
*/
private final boolean success;

public enum Direction {
IN,
OUT,
BOTH
}

public static final class TableProgress {
private final String table;
private final Long current;
private final Long total;

public TableProgress(String table, Long current, Long total) {
this.table = table;
this.current = current;
this.total = total;
}

public String getTable() {
return table;
}

public Long getCurrent() {
return current;
}

public Long getTotal() {
return total;
}

@Override
public String toString() {
return String.format("TableProgress(table=%s,current=%d,total=%d", table, current, total);
}

}

private Stream(Builder builder) {
this.id = builder.id;
this.host = builder.host;
this.peer = builder.peer;
this.direction = builder.direction;
this.sizeToSend = builder.sizeToSend;
this.sizeToReceive = builder.sizeToReceive;
this.sizeSent = builder.sizeSent;
this.sizeReceived = builder.sizeReceived;
this.progressSent = ImmutableList.copyOf(builder.progressSent);
this.progressReceived = ImmutableList.copyOf(builder.progressReceived);
this.lastUpdated = builder.lastUpdated;
this.completed = builder.completed;
this.success = builder.success;
}

public String getId() {
return id;
}

public String getHost() {
return host;
}

public String getPeer() {
return peer;
}

public Direction getDirection() {
return direction;
}

public long getSizeToSend() {
return sizeToSend;
}

public long getSizeToReceive() {
return sizeToReceive;
}

public long getSizeSent() {
return sizeSent;
}

public long getSizeReceived() {
return sizeReceived;
}

public List<TableProgress> getProgressReceived() {
return progressReceived;
}

public List<TableProgress> getProgressSent() {
return progressSent;
}

public long getLastUpdated() {
return lastUpdated;
}

public boolean getCompleted() {
return completed;
}

public boolean getSuccess() {
return success;
}

public String toString() {
return String.format(
"Stream(host=%s, peer=%s, direction=%s, toSend=%d, sent=%d, toReceive=%d, received=%d, progressSent=%s, "
+ "progressReceived=%s, updated=%d, completed=%b",
host, peer, direction.toString(), sizeToSend, sizeSent, sizeToReceive, sizeReceived, progressSent,
progressReceived, lastUpdated, completed);
}

public static Stream.Builder builder() {
return new Stream.Builder();
}

public static Stream.Builder builder(Stream oldStream) {
return new Stream.Builder(oldStream);
}

@JsonPOJOBuilder(buildMethodName = "build", withPrefix = "with")
public static final class Builder {
private String id;
private String host;
private String peer;
private Direction direction;
private long sizeToSend;
private long sizeToReceive;
private long sizeSent;
private long sizeReceived;
private List<TableProgress> progressSent;
private List<TableProgress> progressReceived;
private long lastUpdated;
private boolean completed;
private boolean success;

private Builder() {}

public Builder(Stream oldStream) {
this.id = oldStream.getId();
this.host = oldStream.getHost();
this.peer = oldStream.getPeer();
this.direction = oldStream.getDirection();
this.sizeToSend = oldStream.getSizeToSend();
this.sizeToReceive = oldStream.getSizeToReceive();
this.progressSent = oldStream.getProgressSent();
this.progressReceived = oldStream.getProgressReceived();
this.lastUpdated = oldStream.getLastUpdated();
this.completed = oldStream.getCompleted();
this.success = oldStream.getSuccess();
}

public Builder withId(String id) {
this.id = id;
return this;
}

public Builder withHost(String host) {
this.host = host;
return this;
}

public Stream.Builder withPeer(String peer) {
this.peer = peer;
return this;
}

public Stream.Builder withDirection(Direction direction) {
this.direction = direction;
return this;
}

public Stream.Builder withProgressReceived(List<TableProgress> progressReceived) {
this.progressReceived = progressReceived;
return this;
}

public Stream.Builder withSizeToReceive(long sizeToReceive) {
this.sizeToReceive = sizeToReceive;
return this;
}

public Stream.Builder withProgressSent(List<TableProgress> progressSent) {
this.progressSent = progressSent;
return this;
}

public Stream.Builder withSizeToSend(long sizeToSend) {
this.sizeToSend = sizeToSend;
return this;
}

public Stream.Builder withSizeSent(long sizeSent) {
this.sizeSent = sizeSent;
return this;
}

public Stream.Builder withSizeReceived(long sizeReceived) {
this.sizeReceived = sizeReceived;
return this;
}

public Stream.Builder withLastUpdated(long lastUpdated) {
this.lastUpdated = lastUpdated;
return this;
}

public Stream.Builder withCompleted(boolean completed) {
this.completed = completed;
return this;
}

public Stream.Builder withSuccess(boolean success) {
this.success = success;
return this;
}

public Stream build() {
return new Stream(this);
}

}

}

0 comments on commit b11518a

Please sign in to comment.