Skip to content

Commit

Permalink
Adding basic GRPC code for writes with disruptor (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
ambud committed Aug 23, 2017
1 parent e98d764 commit 0e38746
Show file tree
Hide file tree
Showing 33 changed files with 1,050 additions and 628 deletions.
53 changes: 9 additions & 44 deletions cluster/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,62 +14,30 @@
<linea.version>0.0.15</linea.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.1.0</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.facebook.jcommon</groupId>
<artifactId>util</artifactId>
<version>0.1.29</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>com.srotya.sidewinder</groupId>
<artifactId>sidewinder-core</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.srotya.sidewinder</groupId>
<artifactId>sidewinder-rpc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix-all</artifactId>
<version>1.0.5</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-resolver</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.dropwizard</groupId>
Expand All @@ -80,17 +48,14 @@
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import com.codahale.metrics.MetricRegistry;
import com.srotya.sidewinder.cluster.routing.Node;
import com.srotya.sidewinder.cluster.routing.RoutingEngine;
import com.srotya.sidewinder.core.ingress.http.HTTPDataPointDecoder;
import com.srotya.sidewinder.core.rpc.Point;
import com.srotya.sidewinder.core.storage.RejectException;
import com.srotya.sidewinder.core.utils.HTTPDataPointDecoder;
import com.srotya.sidewinder.core.utils.MiscUtils;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/**
* Copyright 2017 Ambud Sharma
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.srotya.sidewinder.cluster.rpc;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;

import com.google.protobuf.ByteString;
import com.google.protobuf.ProtocolStringList;
import com.srotya.sidewinder.cluster.rpc.ListTimeseriesOffsetResponse.OffsetEntry;
import com.srotya.sidewinder.cluster.rpc.ListTimeseriesOffsetResponse.OffsetEntry.Bucket;
import com.srotya.sidewinder.cluster.rpc.ReplicationServiceGrpc.ReplicationServiceImplBase;
import com.srotya.sidewinder.core.storage.Measurement;
import com.srotya.sidewinder.core.storage.StorageEngine;
import com.srotya.sidewinder.core.storage.TimeSeries;
import com.srotya.sidewinder.core.storage.TimeSeriesBucket;

import io.grpc.stub.StreamObserver;

/**
* @author ambud
*/
public class ReplicationServiceImpl extends ReplicationServiceImplBase {

private StorageEngine engine;

public ReplicationServiceImpl(StorageEngine engine) {
this.engine = engine;
}

@Override
public void listBatchTimeseriesOffsetList(ListTimeseriesOffsetRequest request,
StreamObserver<ListTimeseriesOffsetResponse> responseObserver) {
String dbName = request.getDbName();
String measurementName = request.getMeasurementName();
try {
Measurement measurement = engine.getOrCreateMeasurement(dbName, measurementName);

ListTimeseriesOffsetResponse.Builder builder = ListTimeseriesOffsetResponse.newBuilder();
for (TimeSeries timeSeries : measurement.getTimeSeries()) {
OffsetEntry.Builder offsetBuilder = OffsetEntry.newBuilder();

String seriesId = timeSeries.getSeriesId();
String[] keys = seriesId.split(Measurement.FIELD_TAG_SEPARATOR);
String valueFieldName = keys[0];

List<String> tags = Measurement.decodeStringToTags(measurement.getTagIndex(), keys[1]);
offsetBuilder.setValueFieldName(valueFieldName);
offsetBuilder.addAllTags(tags);

SortedMap<String, List<TimeSeriesBucket>> bucketRawMap = timeSeries.getBucketRawMap();
for (Entry<String, List<TimeSeriesBucket>> entry : bucketRawMap.entrySet()) {
for (int i = 0; i < entry.getValue().size(); i++) {
TimeSeriesBucket timeSeriesBucket = entry.getValue().get(i);
OffsetEntry.Bucket.Builder bucket = Bucket.newBuilder();
bucket.setIndex(i);
bucket.setOffset(timeSeriesBucket.getWriter().currentOffset());
bucket.setBucketTs(entry.getKey());
offsetBuilder.addBuckets(bucket.build());
}
}
builder.addEntries(offsetBuilder.build());
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(e);
}
}

@Override
public void batchFetchTimeseriesDataAtOffset(BatchRawTimeSeriesOffsetRequest request,
StreamObserver<BatchRawTimeSeriesOffsetResponse> responseObserver) {
BatchRawTimeSeriesOffsetResponse.Builder rawBuilder = BatchRawTimeSeriesOffsetResponse.newBuilder();
List<RawTimeSeriesOffsetRequest> requestsList = request.getRequestsList();
for (RawTimeSeriesOffsetRequest rawRequest : requestsList) {
RawTimeSeriesOffsetResponse.Builder builder = RawTimeSeriesOffsetResponse.newBuilder();

String dbName = rawRequest.getDbName();
String measurementName = rawRequest.getMeasurementName();
String valueFieldName = rawRequest.getValueFieldName();
ProtocolStringList tags = rawRequest.getTagsList();
long blockTimestamp = rawRequest.getBlockTimestamp();
int index = rawRequest.getIndex();
int offset = rawRequest.getOffset();
int bucketSize = rawRequest.getBucketSize();
try {
TimeSeries timeSeries = engine.getTimeSeries(dbName, measurementName, valueFieldName,
new ArrayList<>(tags));
String timeBucket = TimeSeries.getTimeBucket(TimeUnit.MILLISECONDS, blockTimestamp, bucketSize);
SortedMap<String, List<TimeSeriesBucket>> bucketRawMap = timeSeries.getBucketRawMap();
List<TimeSeriesBucket> list = bucketRawMap.get(timeBucket);
TimeSeriesBucket timeSeriesBucket = list.get(index);
builder.setBlockTimestamp(blockTimestamp);
builder.setDbName(dbName);
builder.setMeasurementName(measurementName);
builder.setBucketSize(bucketSize);
builder.setMessageId(request.getMessageId());
builder.setFp(timeSeries.isFp());
builder.setValueFieldName(valueFieldName);
builder.addAllTags(tags);
builder.setIndex(index);

builder.setCount(timeSeriesBucket.getCount());
ByteBuffer buf = timeSeriesBucket.getWriter().getRawBytes();
buf.position(offset);
builder.setData(ByteString.copyFrom(buf));
rawBuilder.addResponses(builder.build());
} catch (Exception e) {
e.printStackTrace();
responseObserver.onError(e);
}
}
responseObserver.onNext(rawBuilder.build());
responseObserver.onCompleted();
}

@Override
public void fetchTimeseriesDataAtOffset(RawTimeSeriesOffsetRequest request,
StreamObserver<RawTimeSeriesOffsetResponse> responseObserver) {
RawTimeSeriesOffsetResponse.Builder builder = RawTimeSeriesOffsetResponse.newBuilder();

String dbName = request.getDbName();
String measurementName = request.getMeasurementName();
String valueFieldName = request.getValueFieldName();
ProtocolStringList tags = request.getTagsList();
long blockTimestamp = request.getBlockTimestamp();
int index = request.getIndex();
int offset = request.getOffset();
int bucketSize = request.getBucketSize();
try {
TimeSeries timeSeries = engine.getTimeSeries(dbName, measurementName, valueFieldName,
new ArrayList<>(tags));
String timeBucket = TimeSeries.getTimeBucket(TimeUnit.MILLISECONDS, blockTimestamp, bucketSize);
SortedMap<String, List<TimeSeriesBucket>> bucketRawMap = timeSeries.getBucketRawMap();
List<TimeSeriesBucket> list = bucketRawMap.get(timeBucket);
TimeSeriesBucket timeSeriesBucket = list.get(index);

builder.setBlockTimestamp(blockTimestamp);
builder.setDbName(dbName);
builder.setMeasurementName(measurementName);
builder.setBucketSize(bucketSize);
builder.setMessageId(request.getMessageId());
builder.setFp(timeSeries.isFp());
builder.setValueFieldName(valueFieldName);
builder.addAllTags(tags);
builder.setIndex(index);
builder.setCount(timeSeriesBucket.getCount());
ByteBuffer buf = timeSeriesBucket.getWriter().getRawBytes();
buf.position(offset);
builder.setData(ByteString.copyFrom(buf));

responseObserver.onNext(builder.build());
responseObserver.onCompleted();
} catch (Exception e) {
e.printStackTrace();
responseObserver.onError(e);
}
}

}
Loading

0 comments on commit 0e38746

Please sign in to comment.