diff --git a/ambari/pom.xml b/ambari/pom.xml
index cd7a853..b089545 100644
--- a/ambari/pom.xml
+++ b/ambari/pom.xml
@@ -4,7 +4,7 @@
com.srotya.sidewinder
sidewinder-parent
- 0.0.27-SNAPSHOT
+ 0.0.28-SNAPSHOT
../pom.xml
ambari-stack
diff --git a/archiver/hdfs-archiver/pom.xml b/archiver/hdfs-archiver/pom.xml
index 3b6ddd5..b75bdd5 100644
--- a/archiver/hdfs-archiver/pom.xml
+++ b/archiver/hdfs-archiver/pom.xml
@@ -4,7 +4,7 @@
com.srotya.sidewinder
sidewinder-archiver
- 0.0.27-SNAPSHOT
+ 0.0.28-SNAPSHOT
../pom.xml
hdfs-archiver
diff --git a/archiver/hdfs-archiver/src/test/java/com/srotya/sidewinder/archiver/hdfs/TestHDFSArchiver.java b/archiver/hdfs-archiver/src/test/java/com/srotya/sidewinder/archiver/hdfs/TestHDFSArchiver.java
index b1789f7..e1d9f42 100644
--- a/archiver/hdfs-archiver/src/test/java/com/srotya/sidewinder/archiver/hdfs/TestHDFSArchiver.java
+++ b/archiver/hdfs-archiver/src/test/java/com/srotya/sidewinder/archiver/hdfs/TestHDFSArchiver.java
@@ -30,9 +30,8 @@
import com.srotya.sidewinder.core.storage.ArchiveException;
import com.srotya.sidewinder.core.storage.Archiver;
import com.srotya.sidewinder.core.storage.DataPoint;
-import com.srotya.sidewinder.core.storage.TimeSeriesBucket;
import com.srotya.sidewinder.core.storage.compression.Reader;
-import com.srotya.sidewinder.core.storage.compression.byzantine.ByzantineWriter;
+import com.srotya.sidewinder.core.storage.compression.Writer;
import com.srotya.sidewinder.core.storage.mem.archival.TimeSeriesArchivalObject;
/**
@@ -61,11 +60,11 @@ public void testHDFSArchive() throws IOException, ArchiveException {
archiver.init(conf);
long ts = System.currentTimeMillis();
//TODO fix unit tests for HDFS Archiver
- TimeSeriesBucket bucket = null;
+ Writer bucket = null;
for (int i = 0; i < 1000; i++) {
- bucket.addDataPoint(ts + i * 1000, i);
+ bucket.addValue(ts + i * 1000, i);
}
- Reader reader = bucket.getReader(null, null);
+ Reader reader = bucket.getReader();
for (int i = 0; i < 1000; i++) {
DataPoint pair = reader.readPair();
assertEquals(ts + i * 1000, pair.getTimestamp());
diff --git a/archiver/pom.xml b/archiver/pom.xml
index ac45f6c..e547ccd 100644
--- a/archiver/pom.xml
+++ b/archiver/pom.xml
@@ -4,7 +4,7 @@
com.srotya.sidewinder
sidewinder-parent
- 0.0.27-SNAPSHOT
+ 0.0.28-SNAPSHOT
../pom.xml
sidewinder-archiver
diff --git a/archiver/s3-archiver/pom.xml b/archiver/s3-archiver/pom.xml
index fe913c1..dc0b9b0 100644
--- a/archiver/s3-archiver/pom.xml
+++ b/archiver/s3-archiver/pom.xml
@@ -4,7 +4,7 @@
com.srotya.sidewinder
sidewinder-archiver
- 0.0.27-SNAPSHOT
+ 0.0.28-SNAPSHOT
../pom.xml
s3-archiver
diff --git a/build/build-info.txt b/build/build-info.txt
index 4e9eec2..60f9697 100644
--- a/build/build-info.txt
+++ b/build/build-info.txt
@@ -1,2 +1,2 @@
-Release Build number:309
-Release Commit number:e72d92eeab84952bfaf1d5c55a4c26de7e69add8
+Release Build number:326
+Release Commit number:61bf02699b9f328ce1d83ada7074ee54fa7089b0
diff --git a/cluster/pom.xml b/cluster/pom.xml
index 418c298..e726e45 100644
--- a/cluster/pom.xml
+++ b/cluster/pom.xml
@@ -4,7 +4,7 @@
com.srotya.sidewinder
sidewinder-parent
- 0.0.27-SNAPSHOT
+ 0.0.28-SNAPSHOT
../pom.xml
sidewinder-cluster
@@ -19,6 +19,12 @@
com.facebook.jcommon
util
0.1.29
+
+
+ org.slf4j
+ slf4j-api
+
+
com.google.protobuf
@@ -38,6 +44,12 @@
io.atomix
atomix-all
1.0.5
+
+
+ org.slf4j
+ slf4j-api
+
+
io.dropwizard
@@ -66,6 +78,12 @@
org.apache.curator
curator-framework
3.2.1
+
+
+ org.slf4j
+ slf4j-api
+
+
org.apache.curator
diff --git a/cluster/src/main/java/com/srotya/sidewinder/cluster/SidewinderClusteredServer.java b/cluster/src/main/java/com/srotya/sidewinder/cluster/SidewinderClusteredServer.java
index 06777a9..9ebc057 100644
--- a/cluster/src/main/java/com/srotya/sidewinder/cluster/SidewinderClusteredServer.java
+++ b/cluster/src/main/java/com/srotya/sidewinder/cluster/SidewinderClusteredServer.java
@@ -74,19 +74,36 @@ public void run(ClusterConfiguration configuration, Environment env) throws Exce
ScheduledExecutorService bgTasks = Executors.newScheduledThreadPool(2, new BackgrounThreadFactory("bgtasks"));
+ StorageEngine storageEngine;
+
String storageEngineClass = conf.getOrDefault("storage.engine",
"com.srotya.sidewinder.core.storage.mem.MemStorageEngine");
- StorageEngine storageEngine = (StorageEngine) Class.forName(storageEngineClass).newInstance();
- storageEngine.configure(conf, bgTasks);
- storageEngine.connect();
+ try {
+ storageEngine = (StorageEngine) Class.forName(storageEngineClass).newInstance();
+ storageEngine.configure(conf, bgTasks);
+ storageEngine.connect();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
- ClusterConnector connector = (ClusterConnector) Class.forName(conf.getOrDefault("cluster.connector",
- "com.srotya.sidewinder.cluster.connectors.ConfigConnector")).newInstance();
- connector.init(conf);
+ ClusterConnector connector;
+ try {
+ connector = (ClusterConnector) Class.forName(
+ conf.getOrDefault("cluster.connector", "com.srotya.sidewinder.cluster.connectors.ConfigConnector"))
+ .newInstance();
+ connector.init(conf);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
- RoutingEngine router = (RoutingEngine) Class.forName(conf.getOrDefault("cluster.routing.engine",
- "com.srotya.sidewinder.cluster.routing.impl.MasterSlaveRoutingEngine")).newInstance();
- router.init(conf, storageEngine, connector);
+ RoutingEngine router;
+ try {
+ router = (RoutingEngine) Class.forName(conf.getOrDefault("cluster.routing.engine",
+ "com.srotya.sidewinder.cluster.routing.impl.MasterSlaveRoutingEngine")).newInstance();
+ router.init(conf, storageEngine, connector);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
final Server server = ServerBuilder.forPort(port)
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
@@ -103,7 +120,7 @@ public void run() {
});
ResourceMonitor.getInstance().init(storageEngine, bgTasks);
- ClusterResourceMonitor.getInstance().init(storageEngine, connector, bgTasks);
+ ClusterResourceMonitor.getInstance().init(storageEngine, connector, bgTasks);
env.jersey().register(new GrafanaQueryApi(storageEngine, registry));
env.jersey().register(new MeasurementOpsApi(storageEngine, registry));
env.jersey().register(new DatabaseOpsApi(storageEngine, registry));
@@ -111,14 +128,14 @@ public void run() {
env.jersey().register(new InfluxApi(router, registry, conf));
}
env.healthChecks().register("restapi", new RestAPIHealthCheck());
-
+
if (Boolean.parseBoolean(conf.getOrDefault(ConfigConstants.AUTH_BASIC_ENABLED, ConfigConstants.FALSE))) {
AuthFilter basicCredentialAuthFilter = new BasicCredentialAuthFilter.Builder<>()
.setAuthenticator(new BasicAuthenticator(conf.get(ConfigConstants.AUTH_BASIC_USERS)))
.setAuthorizer(new AllowAllAuthorizer()).setPrefix("Basic").buildAuthFilter();
env.jersey().register(basicCredentialAuthFilter);
}
-
+
}
private void loadConfiguration(ClusterConfiguration configuration, HashMap conf)
diff --git a/cluster/src/main/java/com/srotya/sidewinder/cluster/api/DatabaseOpsApi.java b/cluster/src/main/java/com/srotya/sidewinder/cluster/api/DatabaseOpsApi.java
new file mode 100644
index 0000000..0a11224
--- /dev/null
+++ b/cluster/src/main/java/com/srotya/sidewinder/cluster/api/DatabaseOpsApi.java
@@ -0,0 +1,52 @@
+/**
+ * Copyright 2017 Ambud Sharma
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.srotya.sidewinder.cluster.api;
+
+import javax.ws.rs.BadRequestException;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+
+import com.codahale.metrics.MetricRegistry;
+import com.srotya.sidewinder.cluster.routing.RoutingEngine;
+import com.srotya.sidewinder.core.rpc.Point;
+
+@Path("/cluster/databases")
+public class DatabaseOpsApi {
+
+ public static final String DB_NAME = "dbName";
+ private RoutingEngine engine;
+
+ public DatabaseOpsApi(RoutingEngine engine, MetricRegistry registry) {
+ this.engine = engine;
+ if (registry != null) {
+ // register things
+ }
+ }
+
+ @Path("/{DB_NAME}/measurements/{MEASUREMENT_NAME}")
+ @POST
+ public void createMeasurement(@PathParam("DB_NAME") String dbName,
+ @PathParam("MEASUREMENT_NAME") String measurementName) {
+ Point point = Point.newBuilder().setDbName(dbName).setMeasurementName(measurementName).build();
+ try {
+ engine.addRoutableKey(point, 3);
+ } catch (UnsupportedOperationException e) {
+ throw new BadRequestException(e);
+ }
+ }
+
+}
diff --git a/cluster/src/main/java/com/srotya/sidewinder/cluster/api/InfluxApi.java b/cluster/src/main/java/com/srotya/sidewinder/cluster/api/InfluxApi.java
index 94115f7..3113fcb 100644
--- a/cluster/src/main/java/com/srotya/sidewinder/cluster/api/InfluxApi.java
+++ b/cluster/src/main/java/com/srotya/sidewinder/cluster/api/InfluxApi.java
@@ -44,12 +44,10 @@ public class InfluxApi {
private Meter meter;
private RoutingEngine router;
- private int replicationFactor;
public InfluxApi(RoutingEngine router, MetricRegistry registry, Map conf) {
this.router = router;
meter = registry.meter("writes");
- this.replicationFactor = Integer.parseInt(conf.getOrDefault("cluster.replication.factor", "3"));
}
@POST
@@ -64,7 +62,7 @@ public void insertData(@QueryParam("db") String dbName, String payload) {
}
meter.mark(dps.size());
for (Point dp : dps) {
- List nodes = router.routeData(dp, replicationFactor);
+ List nodes = router.routeData(dp);
for (int i = 0; i < nodes.size(); i++) {
Node node = nodes.get(i);
System.err.println(MiscUtils.pointToDataPoint(dp)+"\t"+node);
diff --git a/cluster/src/main/java/com/srotya/sidewinder/cluster/connectors/AtomixConnector.java b/cluster/src/main/java/com/srotya/sidewinder/cluster/connectors/AtomixConnector.java
index 5bd7c6c..4e5b6fe 100644
--- a/cluster/src/main/java/com/srotya/sidewinder/cluster/connectors/AtomixConnector.java
+++ b/cluster/src/main/java/com/srotya/sidewinder/cluster/connectors/AtomixConnector.java
@@ -17,7 +17,10 @@
import java.time.Duration;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import java.util.logging.Level;
import java.util.logging.Logger;
import com.srotya.sidewinder.cluster.routing.GRPCWriter;
@@ -26,12 +29,18 @@
import com.srotya.sidewinder.cluster.routing.RoutingEngine;
import io.atomix.AtomixReplica;
+import io.atomix.catalyst.buffer.BufferInput;
+import io.atomix.catalyst.buffer.BufferOutput;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.serializer.TypeSerializer;
import io.atomix.catalyst.transport.Address;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.copycat.server.storage.Storage.Builder;
import io.atomix.copycat.server.storage.StorageLevel;
import io.atomix.group.DistributedGroup;
import io.atomix.group.GroupMember;
+import io.atomix.group.election.Term;
+import io.atomix.variables.DistributedValue;
import io.grpc.CompressorRegistry;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
@@ -41,12 +50,15 @@
*/
public class AtomixConnector extends ClusterConnector {
+ private static final String TABLE = "table";
private static final String BROADCAST_GROUP = "controller";
private static final Logger logger = Logger.getLogger(AtomixConnector.class.getName());
private AtomixReplica atomix;
private boolean isBootstrap;
private String address;
private int port;
+ private volatile boolean leader;
+ private Node localNode;
@Override
public void init(Map conf) throws Exception {
@@ -66,18 +78,20 @@ public void init(Map conf) throws Exception {
.ofSeconds(Integer.parseInt(conf.getOrDefault("cluster.atomix.heartbeat.interval", "5"))))
.build();
- logger.info("Constructed Atomix Replica");
+ atomix.serializer().register(Node.class, NodeSerializer.class);
+
this.isBootstrap = Boolean.parseBoolean(conf.getOrDefault("cluster.atomix.bootstrap", "true"));
if (isBootstrap) {
logger.info("Joining cluster as bootstrap node");
- atomix.bootstrap().join();
+ atomix.bootstrap(new Address(conf.getOrDefault("cluster.atomix.bootstrap.host", "localhost"),
+ Integer.parseInt(conf.getOrDefault("cluster.atomix.bootstrap.port", "8901")))).join();
+ atomix.getValue(TABLE);
} else {
logger.info("Joining cluster as a member node");
atomix.join(new Address(conf.getOrDefault("cluster.atomix.bootstrap.host", "localhost"),
Integer.parseInt(conf.getOrDefault("cluster.atomix.bootstrap.port", "8901")))).get();
}
logger.info("Atomix clustering initialized");
-
}
@Override
@@ -85,41 +99,78 @@ public void initializeRouterHooks(final RoutingEngine engine) {
port = engine.getPort();
address = engine.getAddress();
final DistributedGroup group = getAtomix().getGroup(BROADCAST_GROUP).join();
- Node localNode = new Node(address, port, address + ":" + port);
+ localNode = new Node(address, port, address + ":" + port);
localNode.setWriter(new LocalWriter(engine.getEngine()));
+ // add local node to the node list so that requests can be routed to the local
+ // writer instead of GRPC writer
engine.nodeAdded(localNode);
- if (isBootstrap()) {
- // track nodes only if this node is the master
- group.onJoin(new Consumer() {
-
- @Override
- public void accept(GroupMember t) {
- if (!isLocal(t.id())) {
- logger.info("Non-local node found:" + t.id());
- String[] split = t.id().split(":");
- ManagedChannel channel = ManagedChannelBuilder.forAddress(split[0], Integer.parseInt(split[1]))
- .compressorRegistry(CompressorRegistry.getDefaultInstance()).usePlaintext(true).build();
- Node node = new Node(split[0], Integer.parseInt(split[1]), t.id());
- node.setWriter(new GRPCWriter(channel));
- engine.nodeAdded(node);
+ group.election().onElection(new Consumer() {
+
+ @Override
+ public void accept(Term t) {
+ if (isLocal(t.leader().id())) {
+ logger.info("Completed leader election:" + t.leader().id());
+ leader = true;
+ try {
+ engine.makeCoordinator();
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.severe("Error making corrdinator");
}
+ } else {
+ logger.info("Leader election completed, " + t.leader().id() + " is the leader");
+ leader = false;
+ String[] id = t.leader().id().split(":");
+ engine.setLeader(new Node(id[0], Integer.parseInt(id[1]), t.leader().id()));
}
- });
+ }
+ });
+
+ group.onJoin(new Consumer() {
+
+ @Override
+ public void accept(GroupMember t) {
+ if (!isLocal(t.id())) {
+ logger.info("Node found:" + t.id());
+ String[] split = t.id().split(":");
+ ManagedChannel channel = ManagedChannelBuilder.forAddress(split[0], Integer.parseInt(split[1]))
+ .compressorRegistry(CompressorRegistry.getDefaultInstance()).usePlaintext(true).build();
+ Node node = new Node(split[0], Integer.parseInt(split[1]), t.id());
+ node.setWriter(new GRPCWriter(channel));
+ engine.nodeAdded(node);
+ }
+ }
+ });
+
+ group.onLeave(new Consumer() {
+
+ @Override
+ public void accept(GroupMember t) {
+ if (!isLocal(t.id())) {
+ logger.info("Node found:" + t.id());
+ String[] split = t.id().split(":");
+ Node node = new Node(split[0], Integer.parseInt(split[1]), t.id());
+ engine.nodeAdded(node);
+ }
+ }
+ });
- group.onLeave(new Consumer() {
+ group.join(address + ":" + port).join();
+ logger.info("Created cluster using Atomix connector");
- @Override
- public void accept(GroupMember t) {
- if (!isLocal(t.id())) {
- logger.info("Non-local node found:" + t.id());
- String[] split = t.id().split(":");
- Node node = new Node(split[0], Integer.parseInt(split[1]), t.id());
- engine.nodeAdded(node);
- }
- }
+ try {
+ getAtomix().getValue(TABLE).get().onChange(event -> {
+ logger.info("Route table updated by leader");
+ engine.updateLocalRouteTable(event.newValue());
});
+ } catch (InterruptedException | ExecutionException e) {
+ logger.log(Level.SEVERE, "Error updating route table on node " + address + ":" + port, e);
}
- group.join(address + ":" + port).join();
+ }
+
+ @Override
+ public boolean isLeader() {
+ return leader;
}
public AtomixReplica getAtomix() {
@@ -140,4 +191,44 @@ public int getClusterSize() throws Exception {
return getAtomix().getGroup(BROADCAST_GROUP).join().members().size();
}
+ @Override
+ public Object fetchRoutingTable() {
+ try {
+ logger.info("Fetching route table info from metastore");
+ DistributedValue
+
+ org.apache.lucene
+ lucene-core
+ 6.6.0
+
org.lz4
lz4-java
1.4.0
+
+ org.mapdb
+ mapdb
+ 3.0.5
+
org.mockito
mockito-all
diff --git a/core/src/main/java/com/srotya/sidewinder/core/ConfigConstants.java b/core/src/main/java/com/srotya/sidewinder/core/ConfigConstants.java
index b237a4a..937682f 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/ConfigConstants.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/ConfigConstants.java
@@ -32,5 +32,6 @@ public interface ConfigConstants {
public static final String GRPC_EXECUTOR_COUNT = "grpc.executor.count";
public static final String ENABLE_GRPC = "grpc.enabled";
public static final String BG_THREAD_COUNT = "bgthread.count";
+ public static final String GRAPHITE_ENABLED = "graphite.enabled";
}
diff --git a/core/src/main/java/com/srotya/sidewinder/core/SidewinderServer.java b/core/src/main/java/com/srotya/sidewinder/core/SidewinderServer.java
index 5aee2b3..81d874a 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/SidewinderServer.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/SidewinderServer.java
@@ -20,7 +20,9 @@
import java.io.IOException;
import java.security.Principal;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
@@ -39,6 +41,7 @@
import com.srotya.sidewinder.core.api.MeasurementOpsApi;
import com.srotya.sidewinder.core.api.SqlApi;
import com.srotya.sidewinder.core.api.grafana.GrafanaQueryApi;
+import com.srotya.sidewinder.core.graphite.GraphiteServer;
import com.srotya.sidewinder.core.health.RestAPIHealthCheck;
import com.srotya.sidewinder.core.rpc.WriterServiceImpl;
import com.srotya.sidewinder.core.security.AllowAllAuthorizer;
@@ -68,9 +71,11 @@ public class SidewinderServer extends Application {
private static final Logger logger = Logger.getLogger(SidewinderServer.class.getName());
private StorageEngine storageEngine;
private static SidewinderServer sidewinderServer;
+ private List shutdownTasks;
@Override
public void run(SidewinderConfig config, Environment env) throws Exception {
+ shutdownTasks = new ArrayList<>();
final MetricRegistry registry = new MetricRegistry();
Map conf = new HashMap<>();
@@ -83,6 +88,37 @@ public void run(SidewinderConfig config, Environment env) throws Exception {
enableMonitoring(registry, bgTasks);
registerWebAPIs(env, conf, registry, bgTasks);
checkAndEnableGRPC(conf);
+ checkAndEnableGraphite(conf);
+ registerShutdownHook(conf);
+ }
+
+ private void registerShutdownHook(Map conf) {
+ Runtime.getRuntime().addShutdownHook(new Thread("shutdown-hook") {
+ @Override
+ public void run() {
+ for (Runnable task : shutdownTasks) {
+ task.run();
+ }
+ }
+ });
+ }
+
+ private void checkAndEnableGraphite(Map conf) throws Exception {
+ if (Boolean.parseBoolean(conf.getOrDefault(ConfigConstants.GRAPHITE_ENABLED, ConfigConstants.FALSE))) {
+ final GraphiteServer server = new GraphiteServer(conf, storageEngine);
+ server.start();
+ shutdownTasks.add(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ server.stop();
+ } catch (Exception e) {
+ logger.log(Level.SEVERE, "Failed to shutdown graphite server", e);
+ }
+ }
+ });
+ }
}
private void enableMonitoring(final MetricRegistry registry, ScheduledExecutorService bgTasks) {
@@ -151,8 +187,8 @@ private void checkAndEnableGRPC(Map conf) throws IOException {
final Server server = ServerBuilder
.forPort(Integer.parseInt(conf.getOrDefault(ConfigConstants.GRPC_PORT, "9928"))).executor(es)
.decompressorRegistry(DecompressorRegistry.getDefaultInstance()).addService(writer).build().start();
+ shutdownTasks.add(new Runnable() {
- Runtime.getRuntime().addShutdownHook(new Thread("shutdown-hook") {
@Override
public void run() {
server.shutdown();
@@ -170,6 +206,7 @@ public void run() {
writer.getEs().shutdownNow();
}
});
+
}
}
diff --git a/core/src/main/java/com/srotya/sidewinder/core/aggregators/WindowedFunction.java b/core/src/main/java/com/srotya/sidewinder/core/aggregators/WindowedFunction.java
index 20e5da8..069c34a 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/aggregators/WindowedFunction.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/aggregators/WindowedFunction.java
@@ -34,7 +34,7 @@ public void init(Object[] args) throws Exception {
}
/**
- * @return
+ * @return time window for this aggregation
*/
public int getTimeWindow() {
return timeWindow;
diff --git a/core/src/main/java/com/srotya/sidewinder/core/analytics/MathUtils.java b/core/src/main/java/com/srotya/sidewinder/core/analytics/MathUtils.java
index 7152c22..6485dd0 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/analytics/MathUtils.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/analytics/MathUtils.java
@@ -25,8 +25,8 @@ public class MathUtils {
/**
* Compute PPMCC between the supplied arrays
*
- * @param a
- * @param b
+ * @param a first array to compute ppmcc
+ * @param b second array to compute ppmcc
* @return ppmcc
*/
public static double ppmcc(double[] a, double[] b) {
@@ -43,7 +43,7 @@ public static double ppmcc(double[] a, double[] b) {
/**
* Compute mean or average of the supplied array
*
- * @param a
+ * @param a array to compute mean for
* @return average / mean
*/
public static double mean(double[] a) {
@@ -58,7 +58,7 @@ public static double mean(double[] a) {
/**
* Compute mean or average of the supplied array
*
- * @param a
+ * @param a array to compute mean for
* @return average / mean
*/
public static long mean(long[] a) {
@@ -72,10 +72,10 @@ public static long mean(long[] a) {
/**
* Compute co-variance of the supplied array pair
*
- * @param a
- * @param amean
- * @param b
- * @param bmean
+ * @param a array to compute covariance for
+ * @param amean mean of array a
+ * @param b array to compute covariance for
+ * @param bmean mean of array b
* @return co-variance
*/
public static double covariance(double[] a, double amean, double[] b, double bmean) {
@@ -89,7 +89,7 @@ public static double covariance(double[] a, double amean, double[] b, double bme
/**
* Compute standard deviation for the supplied array
*
- * @param a
+ * @param a array to compute stddev for
* @param avg
* @return standard deviation
*/
diff --git a/core/src/main/java/com/srotya/sidewinder/core/api/grafana/GrafanaQueryApi.java b/core/src/main/java/com/srotya/sidewinder/core/api/grafana/GrafanaQueryApi.java
index d549b1a..c374b68 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/api/grafana/GrafanaQueryApi.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/api/grafana/GrafanaQueryApi.java
@@ -118,16 +118,18 @@ public String query(@PathParam(DatabaseOpsApi.DB_NAME) String dbName, String que
@Produces({ MediaType.APPLICATION_JSON })
@Consumes({ MediaType.APPLICATION_JSON })
public Set queryMeasurementNames(@PathParam(DatabaseOpsApi.DB_NAME) String dbName, String queryString) {
- logger.log(Level.INFO, "Query measurements for db:" + dbName + "\t" + queryString);
+ logger.log(Level.FINE, "Query measurements for db:" + dbName + "\t" + queryString);
try {
if (queryString != null && !queryString.isEmpty()) {
JsonObject query = new Gson().fromJson(queryString, JsonObject.class);
if (query.has("target")) {
String target = query.get("target").getAsString();
- if (target.contains("*")) {
- return engine.getTagsForMeasurement(dbName, target.replace(".*", ""));
- }else {
- return engine.getFieldsForMeasurement(dbName, target);
+ if (target.startsWith("measurement:")) {
+ return engine.getTagsForMeasurement(dbName, target.replace("measurement:", ""));
+ } else if (target.contains("field:")) {
+ return engine.getFieldsForMeasurement(dbName, target.replace("field:", ""));
+ } else {
+ return engine.getMeasurementsLike(dbName, "");
}
}
}
diff --git a/core/src/main/java/com/srotya/sidewinder/core/graphite/GraphiteDecoder.java b/core/src/main/java/com/srotya/sidewinder/core/graphite/GraphiteDecoder.java
new file mode 100644
index 0000000..c5604fc
--- /dev/null
+++ b/core/src/main/java/com/srotya/sidewinder/core/graphite/GraphiteDecoder.java
@@ -0,0 +1,98 @@
+/**
+ * Copyright 2017 Ambud Sharma
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.srotya.sidewinder.core.graphite;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import com.srotya.sidewinder.core.storage.StorageEngine;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
+/**
+ * http://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol
+ * metricpath metricvalue metrictimestamp
+ *
+ * @author ambud
+ */
+public class GraphiteDecoder extends SimpleChannelInboundHandler {
+
+ private static final Logger logger = Logger.getLogger(GraphiteDecoder.class.getName());
+ private StorageEngine storageEngine;
+ private String dbName;
+
+ public GraphiteDecoder(String dbName, StorageEngine storageEngine) {
+ this.dbName = dbName;
+ this.storageEngine = storageEngine;
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
+ logger.fine("Graphite input:" + msg);
+ parseAndInsertDataPoints(dbName, msg, storageEngine);
+ }
+
+ public static void parseAndInsertDataPoints(String dbName, String line, StorageEngine storageEngine)
+ throws IOException {
+ String[] parts = line.split("\\s+");
+ if (parts.length != 3) {
+ // invalid data point
+ logger.fine("Ignoring bad metric:" + line);
+ return;
+ }
+ String[] key = parts[0].split("\\.");
+ String measurementName, valueFieldName;
+ List tags = new ArrayList<>();
+ switch (key.length) {
+ case 0:// invalid metric
+ case 1:// invalid metric
+ case 2:
+ logger.fine("Ignoring bad metric:" + line);
+ return;
+ default:
+ measurementName = key[1];
+ valueFieldName = key[key.length - 1];
+ tags.add(key[0]);
+ for (int i = 2; i < key.length - 1; i++) {
+ tags.add(key[i]);
+ }
+ break;
+ }
+ long timestamp = Long.parseLong(parts[2]) * 1000;
+ if (parts[1].contains(".")) {
+ double value = Double.parseDouble(parts[1]);
+ logger.fine("Writing graphite metric (fp)" + dbName + "," + measurementName + "," + valueFieldName + ","
+ + tags + "," + timestamp + "," + value);
+ storageEngine.writeDataPoint(dbName, measurementName, valueFieldName, tags, timestamp, value);
+ } else {
+ long value = Long.parseLong(parts[1]);
+ logger.fine("Writing graphite metric (fp)" + dbName + "," + measurementName + "," + valueFieldName + ","
+ + tags + "," + timestamp + "," + value);
+ storageEngine.writeDataPoint(dbName, measurementName, valueFieldName, tags, timestamp, value);
+ }
+ }
+
+}
diff --git a/core/src/main/java/com/srotya/sidewinder/core/graphite/GraphiteServer.java b/core/src/main/java/com/srotya/sidewinder/core/graphite/GraphiteServer.java
new file mode 100644
index 0000000..e222ba9
--- /dev/null
+++ b/core/src/main/java/com/srotya/sidewinder/core/graphite/GraphiteServer.java
@@ -0,0 +1,83 @@
+/**
+ * Copyright 2017 Ambud Sharma
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.srotya.sidewinder.core.graphite;
+
+import java.util.Map;
+
+import com.srotya.sidewinder.core.storage.StorageEngine;
+
+import io.dropwizard.lifecycle.Managed;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.LineBasedFrameDecoder;
+import io.netty.handler.codec.string.StringDecoder;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+
+/**
+ * @author ambud
+ */
+public class GraphiteServer implements Managed {
+
+ private StorageEngine storageEngine;
+ private int serverPort;
+ private Channel channel;
+ private String dbName;
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+
+ public GraphiteServer(Map conf, StorageEngine storageEngine) {
+ this.storageEngine = storageEngine;
+ this.serverPort = Integer.parseInt(conf.getOrDefault("server.graphite.port", "8772"));
+ this.dbName = conf.getOrDefault("server.graphite.dbname", "graphite");
+ }
+
+ @Override
+ public void start() throws Exception {
+ bossGroup = new NioEventLoopGroup(1);
+ workerGroup = new NioEventLoopGroup(2);
+
+ ServerBootstrap bs = new ServerBootstrap();
+ channel = bs.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+ .option(ChannelOption.SO_RCVBUF, 10485760).handler(new LoggingHandler(LogLevel.DEBUG))
+ .childHandler(new ChannelInitializer() {
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(workerGroup, new LineBasedFrameDecoder(1024, true, true));
+ p.addLast(workerGroup, new StringDecoder());
+ p.addLast(workerGroup, new GraphiteDecoder(dbName, storageEngine));
+ }
+
+ }).bind("localhost", serverPort).sync().channel();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ workerGroup.shutdownGracefully().sync();
+ bossGroup.shutdownGracefully().sync();
+ channel.closeFuture().sync();
+ }
+
+}
\ No newline at end of file
diff --git a/core/src/main/java/com/srotya/sidewinder/core/rpc/WriterServiceImpl.java b/core/src/main/java/com/srotya/sidewinder/core/rpc/WriterServiceImpl.java
index 9eecce5..75d7c8b 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/rpc/WriterServiceImpl.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/rpc/WriterServiceImpl.java
@@ -31,7 +31,6 @@
import com.srotya.sidewinder.core.storage.DataPoint;
import com.srotya.sidewinder.core.storage.StorageEngine;
import com.srotya.sidewinder.core.storage.TimeSeries;
-import com.srotya.sidewinder.core.storage.TimeSeriesBucket;
import com.srotya.sidewinder.core.storage.compression.Writer;
import com.srotya.sidewinder.core.utils.BackgrounThreadFactory;
import com.srotya.sidewinder.core.utils.MiscUtils;
@@ -85,40 +84,40 @@ public void writeSingleDataPoint(SingleData request, StreamObserver respons
@Override
public void writeBatchDataPoint(BatchData request, StreamObserver responseObserver) {
+ Ack ack = null;
try {
for (Point point : request.getPointsList()) {
DataPoint dp = MiscUtils.pointToDataPoint(point);
engine.writeDataPoint(dp);
}
- Ack ack = Ack.newBuilder().setMessageId(request.getMessageId()).build();
- responseObserver.onNext(ack);
- responseObserver.onCompleted();
+ ack = Ack.newBuilder().setMessageId(request.getMessageId()).setResponseCode(200).build();
} catch (Exception e) {
e.printStackTrace();
- responseObserver.onError(e);
+ ack = Ack.newBuilder().setMessageId(request.getMessageId()).setResponseCode(500).build();
}
+ responseObserver.onNext(ack);
+ responseObserver.onCompleted();
}
@Override
public void writeSeriesPoint(RawTimeSeriesBucket request, StreamObserver responseObserver) {
+ Ack ack;
try {
TimeSeries series = engine.getOrCreateTimeSeries(request.getDbName(), request.getMeasurementName(),
request.getValueFieldName(), new ArrayList<>(request.getTagsList()), request.getBucketSize(),
request.getFp());
for (Bucket bucket : request.getBucketsList()) {
- TimeSeriesBucket tsb = series.getOrCreateSeriesBucket(TimeUnit.MILLISECONDS,
- bucket.getHeaderTimestamp());
- Writer writer = tsb.getWriter();
+ Writer writer = series.getOrCreateSeriesBucket(TimeUnit.MILLISECONDS, bucket.getHeaderTimestamp());
writer.configure(conf, null, false);
writer.setCounter(bucket.getCount());
writer.bootstrap(bucket.getData().asReadOnlyByteBuffer());
}
- Ack ack = Ack.newBuilder().setMessageId(request.getMessageId()).build();
- responseObserver.onNext(ack);
- responseObserver.onCompleted();
+ ack = Ack.newBuilder().setMessageId(request.getMessageId()).setResponseCode(200).build();
} catch (Exception e) {
- responseObserver.onError(e);
+ ack = Ack.newBuilder().setMessageId(request.getMessageId()).setResponseCode(500).build();
}
+ responseObserver.onNext(ack);
+ responseObserver.onCompleted();
}
@Override
@@ -241,11 +240,16 @@ public void onEvent(DPWrapper event, long sequence, boolean endOfBatch) throws E
if (event.getHashValue() % handlerCount == handlerIndex) {
try {
engine.writeDataPoint(event.getDp());
- event.getResponseObserver().onNext(Ack.newBuilder().setMessageId(event.getMessageId()).build());
- event.getResponseObserver().onCompleted();
+ event.getResponseObserver()
+ .onNext(Ack.newBuilder().setMessageId(event.getMessageId()).setResponseCode(200).build());
+ } catch (IOException e) {
+ event.getResponseObserver()
+ .onNext(Ack.newBuilder().setMessageId(event.getMessageId()).setResponseCode(400).build());
} catch (Exception e) {
- event.getResponseObserver().onError(e);
+ event.getResponseObserver()
+ .onNext(Ack.newBuilder().setMessageId(event.getMessageId()).setResponseCode(500).build());
}
+ event.getResponseObserver().onCompleted();
}
}
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/BufferWrapper.java b/core/src/main/java/com/srotya/sidewinder/core/storage/BufferWrapper.java
new file mode 100644
index 0000000..49d0c46
--- /dev/null
+++ b/core/src/main/java/com/srotya/sidewinder/core/storage/BufferWrapper.java
@@ -0,0 +1,75 @@
+/**
+ * Copyright 2017 Ambud Sharma
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.srotya.sidewinder.core.storage;
+
+import java.nio.ByteBuffer;
+
+/**
+ *
+ * @author ambud
+ */
+public class BufferWrapper {
+
+ private int startOffset;
+ private int limit;
+ private ByteBuffer buf;
+
+ public BufferWrapper() {
+ }
+
+ public BufferWrapper(int startOffset, int limit, ByteBuffer buf) {
+ this.startOffset = startOffset;
+ this.limit = limit;
+ this.buf = buf;
+ }
+ /**
+ * @return the limit
+ */
+ public int getLimit() {
+ return limit;
+ }
+ /**
+ * @param limit the limit to set
+ */
+ public void setLimit(int limit) {
+ this.limit = limit;
+ }
+ /**
+ * @return the startOffset
+ */
+ public int getStartOffset() {
+ return startOffset;
+ }
+ /**
+ * @param startOffset the startOffset to set
+ */
+ public void setStartOffset(int startOffset) {
+ this.startOffset = startOffset;
+ }
+ /**
+ * @return the buf
+ */
+ public ByteBuffer getBuf() {
+ return buf;
+ }
+ /**
+ * @param buf the buf to set
+ */
+ public void setBuf(ByteBuffer buf) {
+ this.buf = buf;
+ }
+
+}
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/Measurement.java b/core/src/main/java/com/srotya/sidewinder/core/storage/Measurement.java
index 2a77a83..82f54a4 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/storage/Measurement.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/storage/Measurement.java
@@ -25,8 +25,9 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -35,6 +36,7 @@
import com.srotya.sidewinder.core.filters.Filter;
import com.srotya.sidewinder.core.predicates.Predicate;
import com.srotya.sidewinder.core.storage.compression.Reader;
+import com.srotya.sidewinder.core.storage.compression.Writer;
/**
* @author ambud
@@ -57,7 +59,7 @@ public void configure(Map conf, String measurementName, String b
public void close() throws IOException;
- public ByteBuffer createNewBuffer(String seriesId) throws IOException;
+ public ByteBuffer createNewBuffer(String seriesId, String tsBucket) throws IOException;
public TimeSeries getOrCreateTimeSeries(String valueFieldName, List tags, int timeBucketSize, boolean fp,
Map conf) throws IOException;
@@ -89,7 +91,7 @@ public default String constructSeriesId(String valueFieldName, List tags
return rowKeyBuilder.toString();
}
- public static List decodeStringToTags(TagIndex tagLookupTable, String tagString) {
+ public static List decodeStringToTags(TagIndex tagLookupTable, String tagString) throws IOException {
List tagList = new ArrayList<>();
if (tagString == null || tagString.isEmpty()) {
return tagList;
@@ -156,10 +158,12 @@ public default Set getTagFilteredRowKeys(String valueFieldName, Filter entry : getTimeSeriesMap().entrySet()) {
try {
- List garbage = entry.getValue().collectGarbage();
- for (TimeSeriesBucket timeSeriesBucket : garbage) {
- timeSeriesBucket.delete();
- getLogger().info("Collecting garbage for bucket:" + entry.getKey());
+ List garbage = entry.getValue().collectGarbage();
+ for (Writer timeSeriesBucket : garbage) {
+ // TODO delete
+ // timeSeriesBucket.delete();
+ getLogger().info("Collecting garbage for bucket:" + entry.getKey() + "\tOffset:"
+ + timeSeriesBucket.currentOffset());
}
getLogger().fine("Collecting garbage for time series:" + entry.getKey());
} catch (IOException e) {
@@ -222,9 +226,8 @@ public default void queryDataPoints(String valueFieldName, long startTime, long
}
TimeSeries value = getTimeSeriesMap().get(entry);
if (value == null) {
- getLogger().severe(
- "Invalid time series value " + entry + "\t" + rowKeys + "\t" + "\n\n");
-// continue;
+ getLogger().severe("Invalid time series value " + entry + "\t" + rowKeys + "\t" + "\n\n");
+ continue;
}
points = value.queryDataPoints(keys[0], seriesTags, startTime, endTime, valuePredicate);
if (aggregationFunction != null) {
@@ -266,10 +269,12 @@ public default void queryReaders(String valueFieldName, long startTime, long end
}
}
- public default Set getTags() {
+ public default Set getTags() throws IOException {
return getTagIndex().getTags();
}
public Logger getLogger();
+ public SortedMap> createNewBucketMap(String seriesId);
+
}
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/StorageEngine.java b/core/src/main/java/com/srotya/sidewinder/core/storage/StorageEngine.java
index d1e598e..2233813 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/storage/StorageEngine.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/storage/StorageEngine.java
@@ -16,10 +16,14 @@
package com.srotya.sidewinder.core.storage;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import com.srotya.sidewinder.core.aggregators.AggregationFunction;
import com.srotya.sidewinder.core.filters.AnyFilter;
@@ -40,7 +44,7 @@ public interface StorageEngine {
"Datapoint is missing required values");
public static final String DEFAULT_COMPRESSION_CLASS = "com.srotya.sidewinder.core.storage.compression.byzantine.ByzantineWriter";
public static final String COMPRESSION_CLASS = "compression.class";
- public static final int DEFAULT_TIME_BUCKET_CONSTANT = 4096;
+ public static final int DEFAULT_TIME_BUCKET_CONSTANT = 32768;
public static final String DEFAULT_BUCKET_SIZE = "default.bucket.size";
public static final String RETENTION_HOURS = "default.series.retention.hours";
public static final int DEFAULT_RETENTION_HOURS = (int) Math
@@ -54,6 +58,7 @@ public interface StorageEngine {
/**
* @param conf
+ * @param bgTaskPool
* @throws IOException
*/
public void configure(Map conf, ScheduledExecutorService bgTaskPool) throws IOException;
@@ -78,11 +83,52 @@ public interface StorageEngine {
* @param dp
* @throws IOException
*/
- public void writeDataPoint(DataPoint dp) throws IOException;
+ public default void writeDataPoint(DataPoint dp) throws IOException {
+ StorageEngine.validateDataPoint(dp.getDbName(), dp.getMeasurementName(), dp.getValueFieldName(), dp.getTags(),
+ TimeUnit.MILLISECONDS);
+ TimeSeries timeSeries = getOrCreateTimeSeries(dp.getDbName(), dp.getMeasurementName(), dp.getValueFieldName(),
+ dp.getTags(), getDefaultTimebucketSize(), dp.isFp());
+ if (dp.isFp() != timeSeries.isFp()) {
+ // drop this datapoint, mixed series are not allowed
+ throw FP_MISMATCH_EXCEPTION;
+ }
+ if (dp.isFp()) {
+ timeSeries.addDataPoint(TimeUnit.MILLISECONDS, dp.getTimestamp(), dp.getValue());
+ } else {
+ timeSeries.addDataPoint(TimeUnit.MILLISECONDS, dp.getTimestamp(), dp.getLongValue());
+ }
+ getCounter().incrementAndGet();
+ }
+
+ public default void writeDataPoint(String dbName, String measurementName, String valueFieldName, List tags,
+ long timestamp, long value) throws IOException {
+ StorageEngine.validateDataPoint(dbName, measurementName, valueFieldName, tags, TimeUnit.MILLISECONDS);
+ TimeSeries timeSeries = getOrCreateTimeSeries(dbName, measurementName, valueFieldName, tags,
+ getDefaultTimebucketSize(), false);
+ if (timeSeries.isFp()) {
+ // drop this datapoint, mixed series are not allowed
+ throw FP_MISMATCH_EXCEPTION;
+ }
+ timeSeries.addDataPoint(TimeUnit.MILLISECONDS, timestamp, value);
+ getCounter().incrementAndGet();
+ }
+
+ public default void writeDataPoint(String dbName, String measurementName, String valueFieldName, List tags,
+ long timestamp, double value) throws IOException {
+ StorageEngine.validateDataPoint(dbName, measurementName, valueFieldName, tags, TimeUnit.MILLISECONDS);
+ TimeSeries timeSeries = getOrCreateTimeSeries(dbName, measurementName, valueFieldName, tags,
+ getDefaultTimebucketSize(), true);
+ if (!timeSeries.isFp()) {
+ // drop this datapoint, mixed series are not allowed
+ throw FP_MISMATCH_EXCEPTION;
+ }
+ timeSeries.addDataPoint(TimeUnit.MILLISECONDS, timestamp, value);
+ getCounter().incrementAndGet();
+ }
/**
- * Query timeseries from the storage engine given the supplied attributes.
- * This function doesn't allow use of {@link AggregationFunction}.
+ * Query timeseries from the storage engine given the supplied attributes. This
+ * function doesn't allow use of {@link AggregationFunction}.
*
* @param dbName
* @param measurementName
@@ -91,7 +137,7 @@ public interface StorageEngine {
* @param endTime
* @param tags
* @param valuePredicate
- * @return timeSeriesResultMap
+ * @return
* @throws ItemNotFoundException
* @throws IOException
*/
@@ -103,8 +149,8 @@ public default Set queryDataPoints(String dbName, String meas
}
/**
- * Query timeseries from the storage engine given the supplied attributes.
- * This function does allow use of {@link AggregationFunction}.
+ * Query timeseries from the storage engine given the supplied attributes. This
+ * function does allow use of {@link AggregationFunction}.
*
* @param dbName
* @param measurementName
@@ -115,14 +161,20 @@ public default Set queryDataPoints(String dbName, String meas
* @param tagFilter
* @param valuePredicate
* @param aggregationFunction
- * @return timeSeriesResultMap
- * @throws ItemNotFoundException
+ * @return
* @throws IOException
*/
- public Set queryDataPoints(String dbName, String measurementName, String valueFieldName,
+ public default Set queryDataPoints(String dbName, String measurementName, String valueFieldName,
long startTime, long endTime, List tagList, Filter> tagFilter,
- Predicate valuePredicate, AggregationFunction aggregationFunction)
- throws ItemNotFoundException, IOException;
+ Predicate valuePredicate, AggregationFunction aggregationFunction) throws IOException {
+ if (!checkIfExists(dbName, measurementName)) {
+ throw NOT_FOUND_EXCEPTION;
+ }
+ Set resultMap = new HashSet<>();
+ getDatabaseMap().get(dbName).get(measurementName).queryDataPoints(valueFieldName, startTime, endTime, tagList,
+ tagFilter, valuePredicate, aggregationFunction, resultMap);
+ return resultMap;
+ }
/**
* List measurements containing the supplied keyword
@@ -132,7 +184,24 @@ public Set queryDataPoints(String dbName, String measurementN
* @return measurements
* @throws Exception
*/
- public Set getMeasurementsLike(String dbName, String partialMeasurementName) throws Exception;
+ public default Set getMeasurementsLike(String dbName, String partialMeasurementName) throws Exception {
+ if (!checkIfExists(dbName)) {
+ throw NOT_FOUND_EXCEPTION;
+ }
+ Map measurementMap = getDatabaseMap().get(dbName);
+ partialMeasurementName = partialMeasurementName.trim();
+ if (partialMeasurementName.isEmpty()) {
+ return measurementMap.keySet();
+ } else {
+ Set filteredSeries = new HashSet<>();
+ for (String measurementName : measurementMap.keySet()) {
+ if (measurementName.contains(partialMeasurementName)) {
+ filteredSeries.add(measurementName);
+ }
+ }
+ return filteredSeries;
+ }
+ }
/**
* List databases
@@ -149,27 +218,43 @@ public Set queryDataPoints(String dbName, String measurementN
* @return measurements
* @throws Exception
*/
- public Set getAllMeasurementsForDb(String dbName) throws Exception;
+ public default Set getAllMeasurementsForDb(String dbName) throws Exception {
+ if (checkIfExists(dbName)) {
+ return getDatabaseMap().get(dbName).keySet();
+ } else {
+ throw NOT_FOUND_EXCEPTION;
+ }
+ }
/**
* List all tags for the supplied measurement
*
- * @param dbname
+ * @param dbName
* @param measurementName
* @return tags
* @throws Exception
*/
- public Set getTagsForMeasurement(String dbname, String measurementName) throws Exception;
+ public default Set getTagsForMeasurement(String dbName, String measurementName) throws Exception {
+ if (!checkIfExists(dbName, measurementName)) {
+ throw NOT_FOUND_EXCEPTION;
+ }
+ return getDatabaseMap().get(dbName).get(measurementName).getTags();
+ }
/**
- * @param dbname
+ * @param dbName
* @param measurementName
* @param valueFieldName
- * @return
+ * @return tags for the supplied parameters
* @throws Exception
*/
- public List> getTagsForMeasurement(String dbname, String measurementName, String valueFieldName)
- throws Exception;
+ public default List> getTagsForMeasurement(String dbName, String measurementName,
+ String valueFieldName) throws Exception {
+ if (!checkIfExists(dbName, measurementName)) {
+ throw NOT_FOUND_EXCEPTION;
+ }
+ return getDatabaseMap().get(dbName).get(measurementName).getTagsForMeasurement(valueFieldName);
+ }
/**
* Delete all data in this instance
@@ -183,9 +268,9 @@ public List> getTagsForMeasurement(String dbname, String measuremen
*
* @param dbName
* @return true if db exists
- * @throws Exception
+ * @throws IOException
*/
- public boolean checkIfExists(String dbName) throws Exception;
+ public boolean checkIfExists(String dbName) throws IOException;
/**
* Check if measurement exists
@@ -193,9 +278,15 @@ public List> getTagsForMeasurement(String dbname, String measuremen
* @param dbName
* @param measurement
* @return true if measurement and db exists
- * @throws Exception
+ * @throws IOException
*/
- public boolean checkIfExists(String dbName, String measurement) throws Exception;
+ public default boolean checkIfExists(String dbName, String measurement) throws IOException {
+ if (checkIfExists(dbName)) {
+ return getDatabaseMap().get(dbName).containsKey(measurement);
+ } else {
+ return false;
+ }
+ }
/**
* Drop database, all data for this database will be deleted
@@ -222,7 +313,12 @@ public List> getTagsForMeasurement(String dbname, String measuremen
* @return
* @throws Exception
*/
- public Set getFieldsForMeasurement(String dbName, String measurementName) throws Exception;
+ public default Set getFieldsForMeasurement(String dbName, String measurementName) throws Exception {
+ if (!checkIfExists(dbName, measurementName)) {
+ throw NOT_FOUND_EXCEPTION;
+ }
+ return getDatabaseMap().get(dbName).get(measurementName).getFieldsForMeasurement();
+ }
// retention policy update methods
/**
@@ -245,8 +341,10 @@ public void updateTimeSeriesRetentionPolicy(String dbName, String measurementNam
* @param measurementName
* @param retentionHours
* @throws ItemNotFoundException
+ * @throws IOException
*/
- public void updateTimeSeriesRetentionPolicy(String dbName, String measurementName, int retentionHours) throws ItemNotFoundException;
+ public void updateTimeSeriesRetentionPolicy(String dbName, String measurementName, int retentionHours)
+ throws ItemNotFoundException, IOException;
/**
* Update default retention policy for a database
@@ -293,8 +391,7 @@ public default void updateDefaultTimeSeriesRetentionPolicy(String dbName, int re
* @return measurementMap
* @throws IOException
*/
- public Map getOrCreateDatabase(String dbName, int retentionPolicy)
- throws IOException;
+ public Map getOrCreateDatabase(String dbName, int retentionPolicy) throws IOException;
/**
* Gets the measurement, creates it if it doesn't already exist
@@ -339,14 +436,21 @@ public boolean isMeasurementFieldFP(String dbName, String measurementName, Strin
*
* @param dbName
* @param measurementName
- * @param fieldName
- * @param key
- * @param value
- * @return readers
+ * @param valueFieldName
+ * @param startTime
+ * @param endTime
+ * @return
* @throws Exception
*/
- public Map queryReaders(String dbName, String measurementName, String fieldName, long key,
- long value) throws Exception;
+ public default LinkedHashMap queryReaders(String dbName, String measurementName,
+ String valueFieldName, long startTime, long endTime) throws Exception {
+ if (!checkIfExists(dbName, measurementName)) {
+ throw NOT_FOUND_EXCEPTION;
+ }
+ LinkedHashMap readers = new LinkedHashMap<>();
+ getDatabaseMap().get(dbName).get(measurementName).queryReaders(valueFieldName, startTime, endTime, readers);
+ return readers;
+ }
/**
* Check if timeseries exists
@@ -358,8 +462,15 @@ public Map queryReaders(String dbName, String measurementName,
* @return
* @throws Exception
*/
- public boolean checkTimeSeriesExists(String dbName, String measurementName, String valueFieldName,
- List tags) throws Exception;
+ public default boolean checkTimeSeriesExists(String dbName, String measurementName, String valueFieldName,
+ List tags) throws Exception {
+ if (!checkIfExists(dbName, measurementName)) {
+ return false;
+ }
+ // check and create timeseries
+ TimeSeries timeSeries = getDatabaseMap().get(dbName).get(measurementName).getTimeSeries(valueFieldName, tags);
+ return timeSeries != null;
+ }
/**
* Get timeseries object
@@ -369,10 +480,18 @@ public boolean checkTimeSeriesExists(String dbName, String measurementName, Stri
* @param valueFieldName
* @param tags
* @return
- * @throws Exception
+ * @throws IOException
*/
- public TimeSeries getTimeSeries(String dbName, String measurementName, String valueFieldName, List tags)
- throws Exception;
+ public default TimeSeries getTimeSeries(String dbName, String measurementName, String valueFieldName,
+ List tags) throws IOException {
+ if (!checkIfExists(dbName, measurementName)) {
+ throw NOT_FOUND_EXCEPTION;
+ }
+ // get timeseries
+ Measurement measurement = getDatabaseMap().get(dbName).get(measurementName);
+ TimeSeries timeSeries = measurement.getTimeSeries(valueFieldName, tags);
+ return timeSeries;
+ }
/**
* Get metadata map
@@ -380,12 +499,37 @@ public TimeSeries getTimeSeries(String dbName, String measurementName, String va
* @return metadata map
*/
public Map getDbMetadataMap();
-
+
public Map> getMeasurementMap();
- Set getSeriesIdsWhereTags(String dbName, String measurementName, List rawTags) throws ItemNotFoundException, Exception;
+ public default Set getSeriesIdsWhereTags(String dbName, String measurementName, List tags)
+ throws ItemNotFoundException, Exception {
+ if (!checkIfExists(dbName, measurementName)) {
+ throw NOT_FOUND_EXCEPTION;
+ }
+ return getDatabaseMap().get(dbName).get(measurementName).getSeriesIdsWhereTags(tags);
+ }
+
+ public default Set getTagFilteredRowKeys(String dbName, String measurementName, String valueFieldName,
+ Filter> tagFilterTree, List rawTags) throws IOException {
+ if (!checkIfExists(dbName, measurementName)) {
+ throw NOT_FOUND_EXCEPTION;
+ }
+ return getDatabaseMap().get(dbName).get(measurementName).getTagFilteredRowKeys(valueFieldName, tagFilterTree,
+ rawTags);
+ }
+
+ public Map> getDatabaseMap();
+
+ public static void validateDataPoint(String dbName, String measurementName, String valueFieldName,
+ List tags, TimeUnit unit) throws RejectException {
+ if (dbName == null || measurementName == null || valueFieldName == null || tags == null || unit == null) {
+ throw INVALID_DATAPOINT_EXCEPTION;
+ }
+ }
+
+ public int getDefaultTimebucketSize();
- Set getTagFilteredRowKeys(String dbName, String measurementName, String valueFieldName,
- Filter> tagFilterTree, List tags) throws ItemNotFoundException, Exception;
+ public AtomicInteger getCounter();
}
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/TagIndex.java b/core/src/main/java/com/srotya/sidewinder/core/storage/TagIndex.java
index 36f77db..a416b90 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/storage/TagIndex.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/storage/TagIndex.java
@@ -24,17 +24,15 @@
public interface TagIndex {
/**
- * Hashes the tag to UI
- *
* @param tag
* @return uid
* @throws IOException
*/
public String createEntry(String tag) throws IOException;
- public String getEntry(String hexString);
+ public String getEntry(String hexString) throws IOException;
- public Set getTags();
+ public Set getTags() throws IOException;
/**
* Indexes tag in the row key, creating an adjacency list
@@ -48,7 +46,10 @@ public interface TagIndex {
/**
* @param tag
* @return
+ * @throws IOException
*/
- public Set searchRowKeysForTag(String tag);
+ public Set searchRowKeysForTag(String tag) throws IOException;
+
+ public void close() throws IOException;
}
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/TimeSeries.java b/core/src/main/java/com/srotya/sidewinder/core/storage/TimeSeries.java
index a939350..484a1cb 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/storage/TimeSeries.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/storage/TimeSeries.java
@@ -24,7 +24,6 @@
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
@@ -34,6 +33,7 @@
import com.srotya.sidewinder.core.predicates.Predicate;
import com.srotya.sidewinder.core.storage.compression.Reader;
import com.srotya.sidewinder.core.storage.compression.RollOverException;
+import com.srotya.sidewinder.core.storage.compression.Writer;
import com.srotya.sidewinder.core.utils.TimeUtils;
/**
@@ -50,7 +50,7 @@
public class TimeSeries {
private static final Logger logger = Logger.getLogger(TimeSeries.class.getName());
- private SortedMap> bucketMap;
+ private SortedMap> bucketMap;
private boolean fp;
private AtomicInteger retentionBuckets;
private String seriesId;
@@ -80,12 +80,12 @@ public TimeSeries(Measurement measurement, String compressionFQCN, String series
retentionBuckets = new AtomicInteger(0);
setRetentionHours(metadata.getRetentionHours());
this.fp = fp;
- bucketMap = new ConcurrentSkipListMap<>();
+ bucketMap = measurement.createNewBucketMap(seriesId);
}
- public TimeSeriesBucket getOrCreateSeriesBucket(TimeUnit unit, long timestamp) throws IOException {
+ public Writer getOrCreateSeriesBucket(TimeUnit unit, long timestamp) throws IOException {
String tsBucket = getTimeBucket(unit, timestamp, timeBucketSize);
- List list = bucketMap.get(tsBucket);
+ List list = bucketMap.get(tsBucket);
if (list == null) {
synchronized (bucketMap) {
if ((list = bucketMap.get(tsBucket)) == null) {
@@ -97,7 +97,7 @@ public TimeSeriesBucket getOrCreateSeriesBucket(TimeUnit unit, long timestamp) t
}
synchronized (list) {
- TimeSeriesBucket ans = list.get(list.size() - 1);
+ Writer ans = list.get(list.size() - 1);
if (ans.isFull()) {
if ((ans = list.get(list.size() - 1)).isFull()) {
logger.fine("Requesting new time series:" + seriesId + ",measurement:"
@@ -126,43 +126,54 @@ public static String getTimeBucket(TimeUnit unit, long timestamp, int timeBucket
return tsBucket;
}
- private TimeSeriesBucket createNewTimeSeriesBucket(long timestamp, String tsBucket, List list)
- throws IOException {
- ByteBuffer buf = measurement.createNewBuffer(seriesId);
+ private Writer createNewTimeSeriesBucket(long timestamp, String tsBucket, List list) throws IOException {
+ ByteBuffer buf = measurement.createNewBuffer(seriesId, tsBucket);
// writeStringToBuffer(seriesId, buf);
- writeStringToBuffer(tsBucket, buf);
- buf.putLong(timestamp);
- buf = buf.slice();
- TimeSeriesBucket bucketEntry = new TimeSeriesBucket(compressionFQCN, timestamp, conf, buf, true);
- list.add(bucketEntry);
+ Writer writer;
+ writer = getWriterInstance();
+ writer.configure(conf, buf, true);
+ writer.setHeaderTimestamp(timestamp);
+ list.add(writer);
bucketCount++;
- return bucketEntry;
+ return writer;
+ }
+
+ private Writer getWriterInstance() {
+ try {
+ Writer writer = (Writer) Class.forName(compressionFQCN).newInstance();
+ return writer;
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
}
/**
* Function to check and recover existing bucket map, if one exists.
- * @param bufList
+ *
+ * @param bufferEntries
* @throws IOException
*/
- public void loadBucketMap(List bufList) throws IOException {
+ public void loadBucketMap(List> bufferEntries) throws IOException {
Map cacheConf = new HashMap<>(conf);
logger.fine("Scanning buffer for:" + seriesId);
- for (ByteBuffer entry : bufList) {
- ByteBuffer duplicate = entry.duplicate();
+ for (Entry entry : bufferEntries) {
+ ByteBuffer duplicate = entry.getValue();
duplicate.rewind();
// String series = getStringFromBuffer(duplicate);
// if (!series.equalsIgnoreCase(seriesId)) {
// continue;
// }
- String tsBucket = getStringFromBuffer(duplicate);
- List list = bucketMap.get(tsBucket);
+ String tsBucket = entry.getKey();
+ List list = bucketMap.get(tsBucket);
if (list == null) {
list = new ArrayList<>();
bucketMap.put(tsBucket, list);
}
- long bucketTimestamp = duplicate.getLong();
+ // long bucketTimestamp = duplicate.getLong();
ByteBuffer slice = duplicate.slice();
- list.add(new TimeSeriesBucket(compressionFQCN, bucketTimestamp, cacheConf, slice, false));
+ Writer writer = getWriterInstance();
+ writer.configure(cacheConf, slice, false);
+ list.add(writer);
logger.fine("Loading bucketmap:" + seriesId + "\t" + tsBucket);
}
}
@@ -217,12 +228,11 @@ public List queryDataPoints(String appendFieldValueName, List
if (endTime == Long.MAX_VALUE) {
endTsBucket = bucketMap.lastKey();
}
- SortedMap> series = bucketMap.subMap(startTsBucket,
- endTsBucket + Character.MAX_VALUE);
- for (List timeSeries : series.values()) {
- for (TimeSeriesBucket bucketEntry : timeSeries) {
- readers.add(bucketEntry.getReader(timeRangePredicate, valuePredicate, fp, appendFieldValueName,
- appendTags));
+ SortedMap> series = bucketMap.subMap(startTsBucket, endTsBucket + Character.MAX_VALUE);
+ for (List writers : series.values()) {
+ for (Writer writer : writers) {
+ readers.add(
+ getReader(writer, timeRangePredicate, valuePredicate, fp, appendFieldValueName, appendTags));
}
}
List points = new ArrayList<>();
@@ -232,6 +242,45 @@ public List queryDataPoints(String appendFieldValueName, List
return points;
}
+ /**
+ * Get {@link Reader} with time and value filter predicates pushed-down to it.
+ * Along with {@link DataPoint} enrichments pushed to it.
+ *
+ * @param timePredicate
+ * @param valuePredicate
+ * @param isFp
+ * @param appendFieldValueName
+ * @param appendTags
+ * @return point in time instance of reader
+ * @throws IOException
+ */
+ public static Reader getReader(Writer writer, Predicate timePredicate, Predicate valuePredicate, boolean isFp,
+ String appendFieldValueName, List appendTags) throws IOException {
+ Reader reader = writer.getReader();
+ reader.setTimePredicate(timePredicate);
+ reader.setValuePredicate(valuePredicate);
+ reader.setFieldName(appendFieldValueName);
+ reader.setIsFP(isFp);
+ reader.setTags(appendTags);
+ return reader;
+ }
+
+ /**
+ * Get {@link Reader} with time and value filter predicates pushed-down to it.
+ *
+ * @param timePredicate
+ * @param valuePredicate
+ * @return point in time instance of reader
+ * @throws IOException
+ */
+ public static Reader getReader(Writer writer, Predicate timePredicate, Predicate valuePredicate)
+ throws IOException {
+ Reader reader = writer.getReader();
+ reader.setTimePredicate(timePredicate);
+ reader.setValuePredicate(valuePredicate);
+ return reader;
+ }
+
/**
* Extract list of readers for the supplied time range and value predicate.
*
@@ -264,12 +313,11 @@ public List queryReader(String appendFieldValueName, List append
String startTsBucket = Integer.toHexString(tsStartBucket);
int tsEndBucket = TimeUtils.getTimeBucket(TimeUnit.MILLISECONDS, endTime, timeBucketSize);
String endTsBucket = Integer.toHexString(tsEndBucket);
- SortedMap> series = bucketMap.subMap(startTsBucket,
- endTsBucket + Character.MAX_VALUE);
- for (List timeSeries : series.values()) {
- for (TimeSeriesBucket bucketEntry : timeSeries) {
- readers.add(bucketEntry.getReader(timeRangePredicate, valuePredicate, fp, appendFieldValueName,
- appendTags));
+ SortedMap> series = bucketMap.subMap(startTsBucket, endTsBucket + Character.MAX_VALUE);
+ for (List writers : series.values()) {
+ for (Writer writer : writers) {
+ readers.add(
+ getReader(writer, timeRangePredicate, valuePredicate, fp, appendFieldValueName, appendTags));
}
}
return readers;
@@ -287,11 +335,10 @@ public List queryReader(String appendFieldValueName, List append
* @throws IOException
*/
public void addDataPoint(TimeUnit unit, long timestamp, double value) throws IOException {
- TimeSeriesBucket timeseriesBucket = getOrCreateSeriesBucket(unit, timestamp);
+ Writer writer = getOrCreateSeriesBucket(unit, timestamp);
try {
- timeseriesBucket.addDataPoint(timestamp, value);
+ writer.addValue(timestamp, value);
} catch (RollOverException e) {
- timeseriesBucket.setFull(true);
addDataPoint(unit, timestamp, value);
} catch (NullPointerException e) {
logger.log(Level.SEVERE, "\n\nNPE occurred for add datapoint operation\n\n", e);
@@ -310,11 +357,10 @@ public void addDataPoint(TimeUnit unit, long timestamp, double value) throws IOE
* @throws IOException
*/
public void addDataPoint(TimeUnit unit, long timestamp, long value) throws IOException {
- TimeSeriesBucket timeseriesBucket = getOrCreateSeriesBucket(unit, timestamp);
+ Writer timeseriesBucket = getOrCreateSeriesBucket(unit, timestamp);
try {
- timeseriesBucket.addDataPoint(timestamp, value);
+ timeseriesBucket.addValue(timestamp, value);
} catch (RollOverException e) {
- timeseriesBucket.setFull(true);
addDataPoint(unit, timestamp, value);
} catch (NullPointerException e) {
logger.log(Level.SEVERE, "\n\nNPE occurred for add datapoint operation\n\n", e);
@@ -332,7 +378,7 @@ public void addDataPoint(TimeUnit unit, long timestamp, long value) throws IOExc
*
* @param points
* list data points are appended to
- * @param timeSeries
+ * @param writer
* to extract the data points from
* @param timePredicate
* time range filter
@@ -342,9 +388,9 @@ public void addDataPoint(TimeUnit unit, long timestamp, long value) throws IOExc
* @throws IOException
*/
public static List seriesToDataPoints(String appendFieldValueName, List appendTags,
- List points, TimeSeriesBucket timeSeries, Predicate timePredicate, Predicate valuePredicate,
- boolean isFp) throws IOException {
- Reader reader = timeSeries.getReader(timePredicate, valuePredicate, isFp, appendFieldValueName, appendTags);
+ List points, Writer writer, Predicate timePredicate, Predicate valuePredicate, boolean isFp)
+ throws IOException {
+ Reader reader = getReader(writer, timePredicate, valuePredicate, isFp, appendFieldValueName, appendTags);
DataPoint point = null;
while (true) {
try {
@@ -395,14 +441,15 @@ public static List readerToDataPoints(List points, Reader
*
* @throws IOException
*/
- public List collectGarbage() throws IOException {
- List gcedBuckets = new ArrayList<>();
+ public List collectGarbage() throws IOException {
+ List gcedBuckets = new ArrayList<>();
while (bucketMap.size() > retentionBuckets.get()) {
int oldSize = bucketMap.size();
String key = bucketMap.firstKey();
- List buckets = bucketMap.remove(key);
- for (TimeSeriesBucket bucket : buckets) {
- bucket.close();
+ List buckets = bucketMap.remove(key);
+ for (Writer bucket : buckets) {
+ // TODO close
+ // bucket.close();
gcedBuckets.add(bucket);
logger.log(Level.INFO,
"GC," + measurement.getMeasurementName() + ":" + seriesId + " removing bucket:" + key
@@ -426,7 +473,7 @@ public void setRetentionHours(int retentionHours) {
}
/**
- * @return number of {@link TimeSeriesBucket}s to retain for this
+ * @return number of buckets to retain for this
* {@link TimeSeries}
*/
public int getRetentionBuckets() {
@@ -436,19 +483,19 @@ public int getRetentionBuckets() {
/**
* @return the bucketMap
*/
- public SortedMap getBucketMap() {
- SortedMap map = new TreeMap<>();
- for (Entry> entry : bucketMap.entrySet()) {
- List value = entry.getValue();
+ public SortedMap getBucketMap() {
+ SortedMap map = new TreeMap<>();
+ for (Entry> entry : bucketMap.entrySet()) {
+ List value = entry.getValue();
for (int i = 0; i < value.size(); i++) {
- TimeSeriesBucket bucketEntry = value.get(i);
+ Writer bucketEntry = value.get(i);
map.put(entry.getKey() + i, bucketEntry);
}
}
return map;
}
-
- public SortedMap> getBucketRawMap() {
+
+ public SortedMap> getBucketRawMap() {
return bucketMap;
}
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/TimeSeriesBucket.java b/core/src/main/java/com/srotya/sidewinder/core/storage/TimeSeriesBucket.java
deleted file mode 100644
index 967b353..0000000
--- a/core/src/main/java/com/srotya/sidewinder/core/storage/TimeSeriesBucket.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Copyright 2017 Ambud Sharma
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.srotya.sidewinder.core.storage;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import com.srotya.sidewinder.core.predicates.Predicate;
-import com.srotya.sidewinder.core.storage.compression.Reader;
-import com.srotya.sidewinder.core.storage.compression.RollOverException;
-import com.srotya.sidewinder.core.storage.compression.Writer;
-
-/**
- * In-memory representation of a time series based on Facebook's Gorilla
- * compression. This class wraps the compressed time series byte representation
- * of Gorilla and adds read-write concurrency and thread-safety using re-entrant
- * locks.
- *
- * @author ambud
- */
-public class TimeSeriesBucket implements Serializable {
-
- private static final long serialVersionUID = 1L;
- // private static final RejectException OLD_DATA_POINT = new
- // RejectException("Rejected older datapoint");
- private Writer writer;
- private TimeSeriesBucket prev, next;
- private long headerTimestamp;
- private volatile boolean full;
-
- public TimeSeriesBucket(String compressionFQCN, long headerTimestamp, Map conf, ByteBuffer buf,
- boolean isNew) {
- this.headerTimestamp = headerTimestamp;
- try {
- writer = (Writer) Class.forName(compressionFQCN).newInstance();
- writer.configure(conf, buf, isNew);
- } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | IOException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- if (isNew) {
- writer.setHeaderTimestamp(headerTimestamp);
- }
- }
-
- /**
- * Add data point with a double value and timestamp.
- *
- * Throws {@link RejectException} if the caller tries to add data older than the
- * current timestamp.
- *
- * @param timestamp
- * @param value
- * @throws RollOverException
- * @throws IOException
- */
- public void addDataPoint(long timestamp, double value) throws IOException, RollOverException {
- writer.addValue(timestamp, value);
- }
-
- /**
- * Add data point with a long value and timestamp.
- *
- * Throws {@link RejectException} if the caller tries to add data older than the
- * current timestamp.
- *
- * @param timestamp
- * @param value
- * @throws RollOverException
- * @throws IOException
- */
- public void addDataPoint(long timestamp, long value) throws IOException, RollOverException {
- writer.addValue(timestamp, value);
- }
-
- /**
- * Get {@link Reader} with time and value filter predicates pushed-down to it.
- * Along with {@link DataPoint} enrichments pushed to it.
- *
- * @param timePredicate
- * @param valuePredicate
- * @param isFp
- * @param appendFieldValueName
- * @param appendTags
- * @return point in time instance of reader
- * @throws IOException
- */
- public Reader getReader(Predicate timePredicate, Predicate valuePredicate, boolean isFp,
- String appendFieldValueName, List appendTags) throws IOException {
- Reader reader = writer.getReader();
- reader.setTimePredicate(timePredicate);
- reader.setValuePredicate(valuePredicate);
- reader.setFieldName(appendFieldValueName);
- reader.setIsFP(isFp);
- reader.setTags(appendTags);
- return reader;
- }
-
- /**
- * Get {@link Reader} with time and value filter predicates pushed-down to it.
- *
- * @param timePredicate
- * @param valuePredicate
- * @return point in time instance of reader
- * @throws IOException
- */
- public Reader getReader(Predicate timePredicate, Predicate valuePredicate) throws IOException {
- Reader reader = writer.getReader();
- reader.setTimePredicate(timePredicate);
- reader.setValuePredicate(valuePredicate);
- return reader;
- }
-
- /**
- * Get count of data points currently hosted in this bucket
- *
- * @return
- * @throws IOException
- */
- public int getCount() throws IOException {
- return writer.getCount();
- }
-
- /**
- * Analytical method used for monitoring compression ratios for a given
- * timeseries. Ratio = expected number of bytes / actual number of bytes.
- *
- * @return compression ratio
- */
- public double getCompressionRatio() {
- return writer.getCompressionRatio();
- }
-
- public Writer getWriter() {
- return writer;
- }
-
- public void close() throws IOException {
- }
-
- public void delete() throws IOException {
- }
-
- /**
- * @return the prev
- */
- public TimeSeriesBucket getPrev() {
- return prev;
- }
-
- /**
- * @return the next
- */
- public TimeSeriesBucket getNext() {
- return next;
- }
-
- /**
- * @return the headerTimestamp
- */
- public long getHeaderTimestamp() {
- return headerTimestamp;
- }
-
- /**
- * @return the full
- */
- public boolean isFull() {
- return full;
- }
-
- /**
- * @param full
- * the full to set
- */
- public void setFull(boolean full) {
- this.full = full;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Object#toString()
- */
- @Override
- public String toString() {
- return "TimeSeriesBucket [headerTimestamp=" + headerTimestamp + "]";
- }
-}
\ No newline at end of file
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/compression/CompressionCaller.java b/core/src/main/java/com/srotya/sidewinder/core/storage/compression/CompressionCaller.java
deleted file mode 100644
index 805650e..0000000
--- a/core/src/main/java/com/srotya/sidewinder/core/storage/compression/CompressionCaller.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Copyright 2017 Ambud Sharma
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.srotya.sidewinder.core.storage.compression;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-/**
- * @author ambud
- */
-public interface CompressionCaller {
-
- public ByteBuffer expandBuffer() throws IOException;
-
-}
\ No newline at end of file
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/compression/Writer.java b/core/src/main/java/com/srotya/sidewinder/core/storage/compression/Writer.java
index 6c9db89..efd3933 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/storage/compression/Writer.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/storage/compression/Writer.java
@@ -59,4 +59,8 @@ public interface Writer {
public int getCount();
+ public boolean isFull();
+
+ public long getHeaderTimestamp();
+
}
\ No newline at end of file
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/compression/byzantine/ByzantineWriter.java b/core/src/main/java/com/srotya/sidewinder/core/storage/compression/byzantine/ByzantineWriter.java
index 4ed55cc..3c71dc1 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/storage/compression/byzantine/ByzantineWriter.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/storage/compression/byzantine/ByzantineWriter.java
@@ -43,6 +43,7 @@ public class ByzantineWriter implements Writer {
private ByteBuffer buf;
private long prevValue;
private boolean readOnly;
+ private volatile boolean full;
public ByzantineWriter() {
}
@@ -154,6 +155,7 @@ private void compressAndWriteValue(ByteBuffer tBuf, long value) {
private void checkAndExpandBuffer() throws IOException {
if (buf.remaining() < 20 || buf.isReadOnly()) {
+ full = true;
throw BUF_ROLLOVER_EXCEPTION;
}
}
@@ -223,6 +225,11 @@ public void setHeaderTimestamp(long timestamp) {
buf.putLong(timestamp);
}
}
+
+ @Override
+ public long getHeaderTimestamp() {
+ return buf.getLong(4);
+ }
/**
* @return the read
@@ -323,5 +330,10 @@ public int currentOffset() {
read.unlock();
return offset;
}
-
+
+ @Override
+ public boolean isFull() {
+ return full;
+ }
+
}
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/compression/dod/DodWriter.java b/core/src/main/java/com/srotya/sidewinder/core/storage/compression/dod/DodWriter.java
index 2078b2e..d78f420 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/storage/compression/dod/DodWriter.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/storage/compression/dod/DodWriter.java
@@ -44,6 +44,7 @@ public class DodWriter implements Writer {
private int count;
private long lastTs;
private boolean readOnly;
+ private boolean full;
public DodWriter() {
}
@@ -179,4 +180,14 @@ public int currentOffset() {
public int getCount() {
return count;
}
+
+ @Override
+ public boolean isFull() {
+ return full;
+ }
+
+ @Override
+ public long getHeaderTimestamp() {
+ return 0;
+ }
}
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/disk/DiskStorageEngine.java b/core/src/main/java/com/srotya/sidewinder/core/storage/disk/DiskStorageEngine.java
index fc06618..f55a182 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/storage/disk/DiskStorageEngine.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/storage/disk/DiskStorageEngine.java
@@ -20,8 +20,6 @@
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -36,19 +34,11 @@
import java.util.logging.Logger;
import com.google.gson.Gson;
-import com.srotya.sidewinder.core.aggregators.AggregationFunction;
-import com.srotya.sidewinder.core.filters.Filter;
-import com.srotya.sidewinder.core.predicates.Predicate;
import com.srotya.sidewinder.core.storage.Archiver;
import com.srotya.sidewinder.core.storage.DBMetadata;
-import com.srotya.sidewinder.core.storage.DataPoint;
-import com.srotya.sidewinder.core.storage.ItemNotFoundException;
import com.srotya.sidewinder.core.storage.Measurement;
-import com.srotya.sidewinder.core.storage.RejectException;
-import com.srotya.sidewinder.core.storage.SeriesQueryOutput;
import com.srotya.sidewinder.core.storage.StorageEngine;
import com.srotya.sidewinder.core.storage.TimeSeries;
-import com.srotya.sidewinder.core.storage.compression.Reader;
import com.srotya.sidewinder.core.storage.mem.archival.NoneArchiver;
import com.srotya.sidewinder.core.utils.MiscUtils;
@@ -101,6 +91,7 @@ public void configure(Map conf, ScheduledExecutorService bgTaskP
}
this.defaultTimebucketSize = Integer
.parseInt(conf.getOrDefault(DEFAULT_BUCKET_SIZE, String.valueOf(DEFAULT_TIME_BUCKET_CONSTANT)));
+ logger.info("Configuring default time bucket:" + getDefaultTimebucketSize());
if (Boolean.parseBoolean(conf.getOrDefault("gc.enabled", "true"))) {
bgTaskPool.scheduleAtFixedRate(() -> {
for (Entry> measurementMap : databaseMap.entrySet()) {
@@ -162,84 +153,6 @@ public void updateTimeSeriesRetentionPolicy(String dbName, int retentionHours) t
}
}
- /**
- * @param dbName
- * @param measurementName
- * @param valueFieldName
- * @param startTime
- * @param endTime
- * @return
- * @throws Exception
- */
- @Override
- public LinkedHashMap queryReaders(String dbName, String measurementName, String valueFieldName,
- long startTime, long endTime) throws Exception {
- if (!checkIfExists(dbName, measurementName)) {
- throw NOT_FOUND_EXCEPTION;
- }
- LinkedHashMap readers = new LinkedHashMap<>();
- databaseMap.get(dbName).get(measurementName).queryReaders(valueFieldName, startTime, endTime, readers);
- return readers;
- }
-
- @Override
- public Set queryDataPoints(String dbName, String measurementName, String valueFieldName,
- long startTime, long endTime, List tagList, Filter> tagFilter,
- Predicate valuePredicate, AggregationFunction aggregationFunction) throws IOException {
- if (!checkIfExists(dbName, measurementName)) {
- throw NOT_FOUND_EXCEPTION;
- }
- Set resultMap = new HashSet<>();
- databaseMap.get(dbName).get(measurementName).queryDataPoints(valueFieldName, startTime, endTime, tagList,
- tagFilter, valuePredicate, aggregationFunction, resultMap);
- return resultMap;
- }
-
- @Override
- public Set getMeasurementsLike(String dbName, String partialMeasurementName) throws Exception {
- if (!checkIfExists(dbName)) {
- throw NOT_FOUND_EXCEPTION;
- }
- Map measurementMap = databaseMap.get(dbName);
- partialMeasurementName = partialMeasurementName.trim();
- if (partialMeasurementName.isEmpty()) {
- return measurementMap.keySet();
- } else {
- Set filteredSeries = new HashSet<>();
- for (String measurementName : measurementMap.keySet()) {
- if (measurementName.contains(partialMeasurementName)) {
- filteredSeries.add(measurementName);
- }
- }
- return filteredSeries;
- }
- }
-
- public static void validateDataPoint(String dbName, String measurementName, String valueFieldName,
- List tags, TimeUnit unit) throws RejectException {
- if (dbName == null || measurementName == null || valueFieldName == null || tags == null || unit == null) {
- throw INVALID_DATAPOINT_EXCEPTION;
- }
- }
-
- @Override
- public void writeDataPoint(DataPoint dp) throws IOException {
- validateDataPoint(dp.getDbName(), dp.getMeasurementName(), dp.getValueFieldName(), dp.getTags(),
- TimeUnit.MILLISECONDS);
- TimeSeries timeSeries = getOrCreateTimeSeries(dp.getDbName(), dp.getMeasurementName(), dp.getValueFieldName(),
- dp.getTags(), defaultTimebucketSize, dp.isFp());
- if (dp.isFp() != timeSeries.isFp()) {
- // drop this datapoint, mixed series are not allowed
- throw FP_MISMATCH_EXCEPTION;
- }
- if (dp.isFp()) {
- timeSeries.addDataPoint(TimeUnit.MILLISECONDS, dp.getTimestamp(), dp.getValue());
- } else {
- timeSeries.addDataPoint(TimeUnit.MILLISECONDS, dp.getTimestamp(), dp.getLongValue());
- }
- counter.incrementAndGet();
- }
-
@Override
public Map getOrCreateDatabase(String dbName) throws IOException {
Map measurementMap = databaseMap.get(dbName);
@@ -394,18 +307,6 @@ protected void loadMeasurements(String dbName, Map measurem
}
}
- @Override
- public TimeSeries getTimeSeries(String dbName, String measurementName, String valueFieldName, List tags)
- throws IOException {
- if (!checkIfExists(dbName, measurementName)) {
- throw NOT_FOUND_EXCEPTION;
- }
- // get timeseries
- Measurement measurement = databaseMap.get(dbName).get(measurementName);
- TimeSeries timeSeries = measurement.getTimeSeries(valueFieldName, tags);
- return timeSeries;
- }
-
@Override
public TimeSeries getOrCreateTimeSeries(String dbName, String measurementName, String valueFieldName,
List tags, int timeBucketSize, boolean fp) throws IOException {
@@ -448,33 +349,11 @@ public boolean isMeasurementFieldFP(String dbName, String measurementName, Strin
throw NOT_FOUND_EXCEPTION;
}
- @Override
- public boolean checkTimeSeriesExists(String dbName, String measurementName, String valueFieldName,
- List tags) throws Exception {
- if (!checkIfExists(dbName, measurementName)) {
- return false;
- }
-
- Measurement measurement = databaseMap.get(dbName).get(measurementName);
- // check timeseries
- TimeSeries timeSeries = measurement.getTimeSeries(valueFieldName, tags);
- return timeSeries == null;
- }
-
@Override
public Set getDatabases() throws Exception {
return databaseMap.keySet();
}
- @Override
- public Set getAllMeasurementsForDb(String dbName) throws Exception {
- if (checkIfExists(dbName)) {
- return databaseMap.get(dbName).keySet();
- } else {
- throw NOT_FOUND_EXCEPTION;
- }
- }
-
@Override
public void deleteAllData() throws Exception {
// Extremely dangerous operation
@@ -506,16 +385,6 @@ public void dropMeasurement(String dbName, String measurementName) throws Except
}
}
- @Override
- public List> getTagsForMeasurement(String dbName, String measurementName, String valueFieldName)
- throws Exception {
- if (!checkIfExists(dbName, measurementName)) {
- throw new ItemNotFoundException("Database " + dbName + " not found");
- }
- Measurement measurement = databaseMap.get(dbName).get(measurementName);
- return measurement.getTagsForMeasurement(valueFieldName);
- }
-
/**
* Function for unit testing
*
@@ -543,34 +412,6 @@ public void disconnect() throws IOException {
System.gc();
}
- @Override
- public boolean checkIfExists(String dbName, String measurement) throws IOException {
- if (checkIfExists(dbName)) {
- return databaseMap.get(dbName).containsKey(measurement);
- } else {
- logger.info("Database not found");
- return false;
- }
- }
-
- @Override
- public Set getTagsForMeasurement(String dbName, String measurementName) throws Exception {
- if (!checkIfExists(dbName, measurementName)) {
- throw new ItemNotFoundException("Database " + dbName + " & " + measurementName + " not found");
- }
- return databaseMap.get(dbName).get(measurementName).getTags();
- }
-
- @Override
- public Set getFieldsForMeasurement(String dbName, String measurementName) throws Exception {
- if (!checkIfExists(dbName, measurementName)) {
- throw NOT_FOUND_EXCEPTION;
- }
- Map measurementMap = databaseMap.get(dbName);
- Measurement measurement = measurementMap.get(measurementName);
- return measurement.getFieldsForMeasurement();
- }
-
@Override
public Map getDbMetadataMap() {
return dbMetadataMap;
@@ -582,22 +423,18 @@ public Map> getMeasurementMap() {
}
@Override
- public Set getTagFilteredRowKeys(String dbName, String measurementName, String valueFieldName,
- Filter> tagFilterTree, List rawTags) throws ItemNotFoundException, Exception {
- if (!checkIfExists(dbName, measurementName)) {
- throw NOT_FOUND_EXCEPTION;
- }
- return databaseMap.get(dbName).get(measurementName).getTagFilteredRowKeys(valueFieldName, tagFilterTree,
- rawTags);
+ public Map> getDatabaseMap() {
+ return databaseMap;
}
@Override
- public Set getSeriesIdsWhereTags(String dbName, String measurementName, List tags)
- throws ItemNotFoundException, Exception {
- if (!checkIfExists(dbName, measurementName)) {
- throw NOT_FOUND_EXCEPTION;
- }
- return databaseMap.get(dbName).get(measurementName).getSeriesIdsWhereTags(tags);
+ public int getDefaultTimebucketSize() {
+ return defaultTimebucketSize;
+ }
+
+ @Override
+ public AtomicInteger getCounter() {
+ return counter;
}
}
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/disk/DiskTagIndex.java b/core/src/main/java/com/srotya/sidewinder/core/storage/disk/DiskTagIndex.java
index e7fe979..da3c5ad 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/storage/disk/DiskTagIndex.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/storage/disk/DiskTagIndex.java
@@ -51,8 +51,8 @@ public class DiskTagIndex implements TagIndex {
public DiskTagIndex(String indexDir, String measurementName) throws IOException {
this.indexPath = indexDir + "/" + measurementName;
- tagMap = new ConcurrentHashMap<>(1000);
- rowKeyIndex = new ConcurrentHashMap<>(1000);
+ tagMap = new ConcurrentHashMap<>(10000);
+ rowKeyIndex = new ConcurrentHashMap<>(10000);
fwdIndex = indexPath + ".fwd";
revIndex = indexPath + ".rev";
prFwd = new PrintWriter( new FileOutputStream(new File(fwdIndex), true));
@@ -85,7 +85,7 @@ protected void loadTagIndex() throws IOException {
String tag = split[0];
Set set = rowKeyIndex.get(tag);
if (set == null) {
- set = Collections.newSetFromMap(new ConcurrentHashMap());
+ set = Collections.newSetFromMap(new ConcurrentHashMap(100));
rowKeyIndex.put(split[0], set);
}
String rowKey = split[1];
@@ -144,9 +144,10 @@ public Set searchRowKeysForTag(String tag) {
return rowKeyIndex.get(tag);
}
+ @Override
public void close() throws IOException {
prFwd.close();
prRv.close();
}
-
+
}
\ No newline at end of file
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/disk/LuceneTagIndex.java b/core/src/main/java/com/srotya/sidewinder/core/storage/disk/LuceneTagIndex.java
new file mode 100644
index 0000000..60aea79
--- /dev/null
+++ b/core/src/main/java/com/srotya/sidewinder/core/storage/disk/LuceneTagIndex.java
@@ -0,0 +1,143 @@
+/**
+ * Copyright 2017 Ambud Sharma
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.srotya.sidewinder.core.storage.disk;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.IndexWriterConfig.OpenMode;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.MMapDirectory;
+import org.apache.lucene.util.QueryBuilder;
+
+import com.srotya.sidewinder.core.storage.TagIndex;
+
+import net.jpountz.xxhash.XXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+
+/**
+ * Lucene based tag index
+ *
+ * @author ambud
+ */
+public class LuceneTagIndex implements TagIndex {
+
+ private static final String ID = "id";
+ private static final String TAG = "tag";
+ private static final String ROW_KEY = "rowKey";
+ private static final String TAG_2 = "tag2";
+ private XXHashFactory factory = XXHashFactory.fastestInstance();
+ private IndexWriter indexWriter;
+ private XXHash32 hash;
+ private StandardAnalyzer analyzer;
+ private Directory index;
+
+ public LuceneTagIndex(String indexDirectory, String measurementName) throws IOException {
+ analyzer = new StandardAnalyzer();
+// index = new RAMDirectory();
+ index = new MMapDirectory(new File(indexDirectory + "/" + measurementName).toPath());
+ IndexWriterConfig config = new IndexWriterConfig(analyzer);
+ config.setOpenMode(OpenMode.CREATE_OR_APPEND);
+ config.setCommitOnClose(true);
+ config.setMaxBufferedDocs(10000);
+ config.setRAMBufferSizeMB(512);
+ config.setRAMPerThreadHardLimitMB(256);
+ config.setReaderPooling(true);
+ indexWriter = new IndexWriter(index, config);
+ hash = factory.hash32();
+ }
+
+ @Override
+ public String createEntry(String tag) throws IOException {
+ String hash32 = Integer.toHexString(hash.hash(tag.getBytes(), 0, tag.length(), 57));
+ addDoc(indexWriter, tag, hash32);
+ return hash32;
+ }
+
+ private static void addDoc(IndexWriter w, String tag, String tagHash) throws IOException {
+ Document doc = new Document();
+ doc.add(new TextField(TAG, tag, Field.Store.YES));
+ doc.add(new TextField(ID, tagHash, Field.Store.NO));
+ w.addDocument(doc);
+ }
+
+ @Override
+ public String getEntry(String hexString) throws IOException {
+ DirectoryReader reader = DirectoryReader.open(indexWriter);
+ IndexSearcher searcher = new IndexSearcher(reader);
+ Query query = new QueryBuilder(analyzer).createPhraseQuery(ID, hexString);
+ TopDocs search = searcher.search(query, 1);
+ return searcher.doc(search.scoreDocs[0].doc).get(TAG);
+ }
+
+ @Override
+ public Set getTags() throws IOException {
+ DirectoryReader reader = DirectoryReader.open(indexWriter);
+ IndexSearcher searcher = new IndexSearcher(reader);
+ Query query = new QueryBuilder(analyzer).createPhraseQuery(TAG, "*");
+ TopDocs search = searcher.search(query, Integer.MAX_VALUE);
+ Set set = new HashSet<>();
+ Set field = new HashSet<>(Arrays.asList(TAG));
+ for (ScoreDoc doc : search.scoreDocs) {
+ set.add(searcher.doc(doc.doc, field).get(TAG));
+ }
+ return set;
+ }
+
+ @Override
+ public void index(String tag, String rowKey) throws IOException {
+ Document doc = new Document();
+ doc.add(new TextField(TAG_2, tag, Field.Store.NO));
+ doc.add(new TextField(ROW_KEY, rowKey, Field.Store.YES));
+ indexWriter.addDocument(doc);
+ }
+
+ @Override
+ public Set searchRowKeysForTag(String tag) throws IOException {
+ DirectoryReader reader = DirectoryReader.open(indexWriter);
+ IndexSearcher searcher = new IndexSearcher(reader);
+ Query query = new QueryBuilder(analyzer).createPhraseQuery(TAG_2, tag);
+ TopDocs search = searcher.search(query, Integer.MAX_VALUE);
+ Set set = new HashSet<>();
+ Set field = new HashSet<>(Arrays.asList(ROW_KEY));
+ for (ScoreDoc doc : search.scoreDocs) {
+ set.add(searcher.doc(doc.doc, field).get(ROW_KEY));
+ }
+ return set;
+ }
+
+ @Override
+ public void close() throws IOException {
+ indexWriter.commit();
+ indexWriter.close();
+ index.close();
+ }
+
+}
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/disk/MapDBTagIndex.java b/core/src/main/java/com/srotya/sidewinder/core/storage/disk/MapDBTagIndex.java
new file mode 100644
index 0000000..9184876
--- /dev/null
+++ b/core/src/main/java/com/srotya/sidewinder/core/storage/disk/MapDBTagIndex.java
@@ -0,0 +1,127 @@
+/**
+ * Copyright 2017 Ambud Sharma
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.srotya.sidewinder.core.storage.disk;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
+import org.mapdb.DataInput2;
+import org.mapdb.DataOutput2;
+import org.mapdb.Serializer;
+
+import com.srotya.sidewinder.core.storage.TagIndex;
+
+import net.jpountz.xxhash.XXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+
+/**
+ * Tag hash lookup table + Tag inverted index
+ *
+ * @author ambud
+ */
+public class MapDBTagIndex implements TagIndex {
+
+ private Map tagMap;
+ private Map> rowKeyIndex;
+ private XXHashFactory factory = XXHashFactory.fastestInstance();
+ private XXHash32 hash;
+ private DB db;
+
+ @SuppressWarnings("unchecked")
+ public MapDBTagIndex(String indexDir, String measurementName) throws IOException {
+ String indexPath = indexDir + "/" + measurementName + "/idx";
+ db = DBMaker.fileDB(indexPath).fileMmapEnableIfSupported().concurrencyScale(4)
+ .allocateStartSize(1024 * 1024 * 10).allocateIncrement(1024 * 1024 * 10).make();
+ tagMap = (Map) db.hashMap("fwd").createOrOpen();
+ rowKeyIndex = (Map>) db.hashMap("rev").valueSerializer(new ValueSerializer()).createOrOpen();
+ hash = factory.hash32();
+ }
+
+ @Override
+ public String createEntry(String tag) throws IOException {
+ int hash32 = hash.hash(tag.getBytes(), 0, tag.length(), 57);
+ String val = tagMap.get(hash32);
+ if (val == null) {
+ synchronized (tagMap) {
+ tagMap.put(hash32, tag);
+ }
+ }
+ return Integer.toHexString(hash32);
+ }
+
+ @Override
+ public String getEntry(String hexString) {
+ return tagMap.get(Integer.parseUnsignedInt(hexString, 16));
+ }
+
+ @Override
+ public Set getTags() {
+ return new HashSet<>(tagMap.values());
+ }
+
+ @Override
+ public void index(String tag, String rowKey) throws IOException {
+ Set rowKeySet = rowKeyIndex.get(tag);
+ if (rowKeySet == null) {
+ synchronized (rowKeyIndex) {
+ if ((rowKeySet = rowKeyIndex.get(tag)) == null) {
+ rowKeySet = Collections.newSetFromMap(new ConcurrentHashMap());
+ rowKeyIndex.put(tag, rowKeySet);
+ }
+ }
+ }
+ if (!rowKeySet.contains(rowKey)) {
+ rowKeySet.add(rowKey);
+ }
+ }
+
+ @Override
+ public Set searchRowKeysForTag(String tag) {
+ return rowKeyIndex.get(tag);
+ }
+
+ @Override
+ public void close() throws IOException {
+ db.close();
+ }
+
+ public static final class ValueSerializer implements Serializer> {
+ @Override
+ public void serialize(DataOutput2 out, Set value) throws IOException {
+ out.write(value.size());
+ for (String val : value) {
+ out.writeUTF(val);
+ }
+ }
+
+ @Override
+ public Set deserialize(DataInput2 input, int available) throws IOException {
+ int count = input.readInt();
+ Set set = Collections.newSetFromMap(new ConcurrentHashMap());
+ for (int i = 0; i < count; i++) {
+ set.add(input.readUTF());
+ }
+ return set;
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/disk/PersistentMeasurement.java b/core/src/main/java/com/srotya/sidewinder/core/storage/disk/PersistentMeasurement.java
index f92b969..b4bfc97 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/storage/disk/PersistentMeasurement.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/storage/disk/PersistentMeasurement.java
@@ -25,15 +25,20 @@
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel.MapMode;
+import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
+import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -42,6 +47,7 @@
import com.srotya.sidewinder.core.storage.StorageEngine;
import com.srotya.sidewinder.core.storage.TagIndex;
import com.srotya.sidewinder.core.storage.TimeSeries;
+import com.srotya.sidewinder.core.storage.compression.Writer;
import com.srotya.sidewinder.core.utils.MiscUtils;
/**
@@ -49,13 +55,16 @@
*/
public class PersistentMeasurement implements Measurement {
+ private static final AtomicLong BUF_COUNTER = new AtomicLong();
private static final int DEFAULT_BUF_INCREMENT = 1048576;
private static final int DEFAULT_MAX_FILE_SIZE = Integer.MAX_VALUE;
+ private static final int DEFAULT_INCREMENT_SIZE = 32768;
public static final String CONF_MEASUREMENT_FILE_MAX = "measurement.file.max";
+ public static final String CONF_MEASUREMENT_INCREMENT_SIZE = "measurement.buf.increment";
public static final String CONF_MEASUREMENT_FILE_INCREMENT = "measurement.file.increment";
private static final Logger logger = Logger.getLogger(PersistentMeasurement.class.getName());
private Map seriesMap;
- private DiskTagIndex tagIndex;
+ private TagIndex tagIndex;
private String compressionClass;
private String dataDirectory;
private DBMetadata metadata;
@@ -64,7 +73,7 @@ public class PersistentMeasurement implements Measurement {
private List bufTracker;
private int itr;
private int fileMapIncrement;
- private int increment = 4096;
+ private int increment;
private int curr;
private int base;
private int fcnt;
@@ -87,25 +96,29 @@ public void configure(Map conf, String measurementName, String i
if (metadata == null) {
throw new IOException("Metadata can't be null");
}
- this.metadata = metadata;
- this.seriesMap = new ConcurrentHashMap<>(10000);
- this.tagIndex = new DiskTagIndex(this.indexDirectory, measurementName);
- this.compressionClass = conf.getOrDefault(StorageEngine.COMPRESSION_CLASS,
- StorageEngine.DEFAULT_COMPRESSION_CLASS);
- this.measurementName = measurementName;
- this.bufTracker = new ArrayList<>();
- this.prBufPointers = new PrintWriter(new FileOutputStream(new File(getPtrPath()), true));
- this.prMetadata = new PrintWriter(new FileOutputStream(new File(getMetadataPath()), true));
this.fileMapIncrement = Integer
.parseInt(conf.getOrDefault(CONF_MEASUREMENT_FILE_INCREMENT, String.valueOf(DEFAULT_BUF_INCREMENT)));
this.maxFileSize = Integer
.parseInt(conf.getOrDefault(CONF_MEASUREMENT_FILE_MAX, String.valueOf(DEFAULT_MAX_FILE_SIZE)));
+ this.increment = Integer
+ .parseInt(conf.getOrDefault(CONF_MEASUREMENT_INCREMENT_SIZE, String.valueOf(DEFAULT_INCREMENT_SIZE)));
if (maxFileSize < 0) {
throw new IllegalArgumentException("File size can't be negative or greater than:" + Integer.MAX_VALUE);
}
if (fileMapIncrement >= maxFileSize) {
throw new IllegalArgumentException("File increment can't be greater than or equal to file size");
}
+ this.metadata = metadata;
+ this.seriesMap = new ConcurrentHashMap<>(10000);
+ this.compressionClass = conf.getOrDefault(StorageEngine.COMPRESSION_CLASS,
+ StorageEngine.DEFAULT_COMPRESSION_CLASS);
+ this.measurementName = measurementName;
+ this.bufTracker = new ArrayList<>();
+ this.prBufPointers = new PrintWriter(new FileOutputStream(new File(getPtrPath()), true));
+ this.prMetadata = new PrintWriter(new FileOutputStream(new File(getMetadataPath()), true));
+ this.tagIndex = new DiskTagIndex(this.indexDirectory, measurementName);
+ // bgTaskPool.scheduleAtFixedRate(()->System.out.println("Buffers:"+BUF_COUNTER.get()),
+ // 2, 2, TimeUnit.SECONDS);
}
private String getPtrPath() {
@@ -166,7 +179,7 @@ protected void appendTimeseriesToMeasurementMetadata(String seriesId, boolean fp
DiskStorageEngine.appendLineToFile(seriesId + "\t" + fp + "\t" + timeBucketSize, prMetadata);
}
- private Map> seriesBufferMap() throws FileNotFoundException, IOException {
+ private Map>> seriesBufferMap() throws FileNotFoundException, IOException {
Map bufferMap = new ConcurrentHashMap<>();
File[] listFiles = new File(dataDirectory).listFiles(new FilenameFilter() {
@@ -188,13 +201,13 @@ public boolean accept(File dir, String name) {
}
}
fcnt = listFiles.length;
- Map> seriesBuffers = new HashMap<>();
+ Map>> seriesBuffers = new HashMap<>();
sliceMappedBuffersForBuckets(bufferMap, seriesBuffers);
return seriesBuffers;
}
private void sliceMappedBuffersForBuckets(Map bufferMap,
- Map> seriesBuffers) throws IOException {
+ Map>> seriesBuffers) throws IOException {
List lines = MiscUtils.readAllLines(new File(getPtrPath()));
for (String line : lines) {
String[] splits = line.split("\\s+");
@@ -205,14 +218,15 @@ private void sliceMappedBuffersForBuckets(Map bufferMa
MappedByteBuffer buf = bufferMap.get(fileName);
int position = positionOffset + pointer;
buf.position(position);
+ String tsBucket = TimeSeries.getStringFromBuffer(buf);
ByteBuffer slice = buf.slice();
slice.limit(increment);
- List list = seriesBuffers.get(seriesId);
+ List> list = seriesBuffers.get(seriesId);
if (list == null) {
list = new ArrayList<>();
seriesBuffers.put(seriesId, list);
}
- list.add(slice);
+ list.add(new AbstractMap.SimpleEntry<>(tsBucket, slice));
}
}
@@ -246,10 +260,10 @@ public void loadTimeseriesFromMeasurements() throws IOException {
List seriesEntries = MiscUtils.readAllLines(file);
loadSeriesEntries(seriesEntries);
- Map> seriesBuffers = seriesBufferMap();
+ Map>> seriesBuffers = seriesBufferMap();
for (String series : seriesMap.keySet()) {
TimeSeries ts = seriesMap.get(series);
- List list = seriesBuffers.get(series);
+ List> list = seriesBuffers.get(series);
if (list != null) {
try {
ts.loadBucketMap(list);
@@ -261,7 +275,7 @@ public void loadTimeseriesFromMeasurements() throws IOException {
}
@Override
- public ByteBuffer createNewBuffer(String seriesId) throws IOException {
+ public ByteBuffer createNewBuffer(String seriesId, String tsBucket) throws IOException {
if (activeFile == null) {
synchronized (seriesMap) {
if (activeFile == null) {
@@ -290,7 +304,7 @@ public ByteBuffer createNewBuffer(String seriesId) throws IOException {
+ filename);
activeFile.close();
activeFile = null;
- return createNewBuffer(seriesId);
+ return createNewBuffer(seriesId, tsBucket);
}
memoryMappedBuffer = activeFile.getChannel().map(MapMode.READ_WRITE, offset, fileMapIncrement);
logger.fine("Buffer expansion:" + offset + "\t\t" + curr);
@@ -300,10 +314,12 @@ public ByteBuffer createNewBuffer(String seriesId) throws IOException {
memoryMappedBuffer.position(curr);
appendBufferPointersToDisk(seriesId, filename, curr, offset);
base++;
+ TimeSeries.writeStringToBuffer(tsBucket, memoryMappedBuffer);
ByteBuffer buf = memoryMappedBuffer.slice();
buf.limit(increment);
// System.out.println("Position:" + buf.position() + "\t" + buf.limit() + "\t" +
// buf.capacity());
+ BUF_COUNTER.incrementAndGet();
return buf;
}
}
@@ -319,7 +335,7 @@ public String getMeasurementName() {
}
@Override
- public Set getTags() {
+ public Set getTags() throws IOException {
return tagIndex.getTags();
}
@@ -352,4 +368,9 @@ public String toString() {
return "PersistentMeasurement [seriesMap=" + seriesMap + ", measurementName=" + measurementName + "]";
}
+ @Override
+ public SortedMap> createNewBucketMap(String seriesId) {
+ return new ConcurrentSkipListMap<>();
+ }
+
}
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/mem/MemStorageEngine.java b/core/src/main/java/com/srotya/sidewinder/core/storage/mem/MemStorageEngine.java
index ca62a65..33d5fe6 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/storage/mem/MemStorageEngine.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/storage/mem/MemStorageEngine.java
@@ -16,8 +16,6 @@
package com.srotya.sidewinder.core.storage.mem;
import java.io.IOException;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -29,20 +27,12 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import com.srotya.sidewinder.core.aggregators.AggregationFunction;
-import com.srotya.sidewinder.core.filters.Filter;
-import com.srotya.sidewinder.core.predicates.Predicate;
import com.srotya.sidewinder.core.storage.Archiver;
import com.srotya.sidewinder.core.storage.DBMetadata;
-import com.srotya.sidewinder.core.storage.DataPoint;
-import com.srotya.sidewinder.core.storage.ItemNotFoundException;
import com.srotya.sidewinder.core.storage.Measurement;
-import com.srotya.sidewinder.core.storage.RejectException;
-import com.srotya.sidewinder.core.storage.SeriesQueryOutput;
import com.srotya.sidewinder.core.storage.StorageEngine;
import com.srotya.sidewinder.core.storage.TimeSeries;
-import com.srotya.sidewinder.core.storage.TimeSeriesBucket;
-import com.srotya.sidewinder.core.storage.compression.Reader;
+import com.srotya.sidewinder.core.storage.compression.Writer;
import com.srotya.sidewinder.core.storage.mem.archival.NoneArchiver;
/**
@@ -60,7 +50,7 @@
*
*
*
- * {@link TimeSeriesBucket} is uses compressed in-memory representation of the
+ * {@link Writer} is uses compressed in-memory representation of the
* actual data. Periodic checks against size ensure that Sidewinder server
* doesn't run out of memory.
*
@@ -70,10 +60,6 @@
public class MemStorageEngine implements StorageEngine {
private static final Logger logger = Logger.getLogger(MemStorageEngine.class.getName());
- private static ItemNotFoundException NOT_FOUND_EXCEPTION = new ItemNotFoundException("Item not found");
- private static RejectException FP_MISMATCH_EXCEPTION = new RejectException("Floating point mismatch");
- private static RejectException INVALID_DATAPOINT_EXCEPTION = new RejectException(
- "Datapoint is missing required values");
private Map> databaseMap;
private AtomicInteger counter = new AtomicInteger(0);
private Map dbMetadataMap;
@@ -130,7 +116,7 @@ public void updateTimeSeriesRetentionPolicy(String dbName, String measurementNam
@Override
public void updateTimeSeriesRetentionPolicy(String dbName, String measurementName, int retentionHours)
- throws ItemNotFoundException {
+ throws IOException {
if (!checkIfExists(dbName, measurementName)) {
throw NOT_FOUND_EXCEPTION;
}
@@ -156,95 +142,6 @@ public void updateTimeSeriesRetentionPolicy(String dbName, int retentionHours) {
}
}
- /**
- * @param dbName
- * @param measurementName
- * @param valueFieldName
- * @param startTime
- * @param endTime
- * @return
- * @throws Exception
- */
- @Override
- public LinkedHashMap queryReaders(String dbName, String measurementName, String valueFieldName,
- long startTime, long endTime) throws Exception {
- LinkedHashMap readers = new LinkedHashMap<>();
- if (!checkIfExists(dbName, measurementName)) {
- throw NOT_FOUND_EXCEPTION;
- }
- databaseMap.get(dbName).get(measurementName).queryReaders(valueFieldName, startTime, endTime, readers);
- return readers;
- }
-
- @Override
- public Set queryDataPoints(String dbName, String measurementName, String valueFieldName,
- long startTime, long endTime, List tagList, Filter> tagFilter,
- Predicate valuePredicate, AggregationFunction aggregationFunction) throws ItemNotFoundException {
- Set resultMap = new HashSet<>();
- Map measurementMap = databaseMap.get(dbName);
- if (measurementMap != null) {
- Measurement measurement = measurementMap.get(measurementName);
- if (measurement != null) {
- try {
- measurement.queryDataPoints(valueFieldName, startTime, endTime, tagList, tagFilter, valuePredicate,
- aggregationFunction, resultMap);
- } catch (IOException e) {
- logger.log(Level.SEVERE, "Error running query on measurement", e);
- }
- } else {
- throw new ItemNotFoundException("Measurement " + measurementName + " not found");
- }
- } else {
- throw new ItemNotFoundException("Database " + dbName + " not found");
- }
- return resultMap;
- }
-
- @Override
- public Set getMeasurementsLike(String dbName, String partialMeasurementName) throws Exception {
- if (!checkIfExists(dbName)) {
- throw NOT_FOUND_EXCEPTION;
- }
- Map measurementMap = databaseMap.get(dbName);
- partialMeasurementName = partialMeasurementName.trim();
- if (partialMeasurementName.isEmpty()) {
- return measurementMap.keySet();
- } else {
- Set filteredSeries = new HashSet<>();
- for (String measurementName : measurementMap.keySet()) {
- if (measurementName.contains(partialMeasurementName)) {
- filteredSeries.add(measurementName);
- }
- }
- return filteredSeries;
- }
- }
-
- public static void validateDataPoint(String dbName, String measurementName, String valueFieldName,
- List tags, TimeUnit unit) throws RejectException {
- if (dbName == null || measurementName == null || valueFieldName == null || tags == null || unit == null) {
- throw INVALID_DATAPOINT_EXCEPTION;
- }
- }
-
- @Override
- public void writeDataPoint(DataPoint dp) throws IOException {
- validateDataPoint(dp.getDbName(), dp.getMeasurementName(), dp.getValueFieldName(), dp.getTags(),
- TimeUnit.MILLISECONDS);
- TimeSeries timeSeries = getOrCreateTimeSeries(dp.getDbName(), dp.getMeasurementName(), dp.getValueFieldName(),
- dp.getTags(), defaultTimebucketSize, dp.isFp());
- if (dp.isFp() != timeSeries.isFp()) {
- // drop this datapoint, mixed series are not allowed
- throw FP_MISMATCH_EXCEPTION;
- }
- if (dp.isFp()) {
- timeSeries.addDataPoint(TimeUnit.MILLISECONDS, dp.getTimestamp(), dp.getValue());
- } else {
- timeSeries.addDataPoint(TimeUnit.MILLISECONDS, dp.getTimestamp(), dp.getLongValue());
- }
- counter.incrementAndGet();
- }
-
@Override
public Map getOrCreateDatabase(String dbName) {
Map measurementMap = databaseMap.get(dbName);
@@ -294,17 +191,6 @@ protected Measurement getOrCreateMeasurement(Map measuremen
return measurement;
}
- @Override
- public boolean checkTimeSeriesExists(String dbName, String measurementName, String valueFieldName,
- List tags) throws Exception {
- if (!checkIfExists(dbName, measurementName)) {
- return false;
- }
- // check and create timeseries
- TimeSeries timeSeries = databaseMap.get(dbName).get(measurementName).getTimeSeries(valueFieldName, tags);
- return timeSeries != null;
- }
-
@Override
public TimeSeries getOrCreateTimeSeries(String dbName, String measurementName, String valueFieldName,
List tags, int timeBucketSize, boolean fp) throws IOException {
@@ -318,19 +204,6 @@ public TimeSeries getOrCreateTimeSeries(String dbName, String measurementName, S
return measurement.getOrCreateTimeSeries(valueFieldName, tags, timeBucketSize, fp, conf);
}
- @Override
- public TimeSeries getTimeSeries(String dbName, String measurementName, String valueFieldName, List tags)
- throws IOException {
- // check and create database map
- Map dbMap = getOrCreateDatabase(dbName);
-
- // check and create measurement map
- Measurement measurementMap = getOrCreateMeasurement(dbMap, dbName, measurementName);
-
- // check and create timeseries
- return measurementMap.getTimeSeries(valueFieldName, tags);
- }
-
@Override
public boolean isMeasurementFieldFP(String dbName, String measurementName, String valueFieldName)
throws IOException {
@@ -346,39 +219,11 @@ public boolean isMeasurementFieldFP(String dbName, String measurementName, Strin
throw NOT_FOUND_EXCEPTION;
}
- @Override
- public Set getSeriesIdsWhereTags(String dbName, String measurementName, List tags)
- throws IOException {
- if (!checkIfExists(dbName, measurementName)) {
- throw NOT_FOUND_EXCEPTION;
- }
- return databaseMap.get(dbName).get(measurementName).getSeriesIdsWhereTags(tags);
- }
-
- @Override
- public Set getTagFilteredRowKeys(String dbName, String measurementName, String valueFieldName,
- Filter> tagFilterTree, List rawTags) throws IOException {
- if (!checkIfExists(dbName, measurementName)) {
- throw NOT_FOUND_EXCEPTION;
- }
- return databaseMap.get(dbName).get(measurementName).getTagFilteredRowKeys(valueFieldName, tagFilterTree,
- rawTags);
- }
-
@Override
public Set getDatabases() throws Exception {
return databaseMap.keySet();
}
- @Override
- public Set getAllMeasurementsForDb(String dbName) throws Exception {
- if (checkIfExists(dbName)) {
- return databaseMap.get(dbName).keySet();
- } else {
- throw NOT_FOUND_EXCEPTION;
- }
- }
-
@Override
public void deleteAllData() throws Exception {
// Extremely dangerous operation
@@ -419,46 +264,27 @@ public void connect() throws IOException {
public void disconnect() throws IOException {
}
- @Override
- public boolean checkIfExists(String dbName, String measurement) {
- if (checkIfExists(dbName)) {
- return databaseMap.get(dbName).containsKey(measurement);
- } else {
- return false;
- }
+ public Map getDbMetadataMap() {
+ return dbMetadataMap;
}
@Override
- public Set getTagsForMeasurement(String dbName, String measurementName) throws Exception {
- if (!checkIfExists(dbName, measurementName)) {
- throw NOT_FOUND_EXCEPTION;
- }
- return databaseMap.get(dbName).get(measurementName).getTags();
+ public Map> getMeasurementMap() {
+ return databaseMap;
}
@Override
- public List> getTagsForMeasurement(String dbName, String measurementName, String valueFieldName)
- throws Exception {
- if (!checkIfExists(dbName, measurementName)) {
- throw NOT_FOUND_EXCEPTION;
- }
- return databaseMap.get(dbName).get(measurementName).getTagsForMeasurement(valueFieldName);
+ public Map> getDatabaseMap() {
+ return databaseMap;
}
@Override
- public Set getFieldsForMeasurement(String dbName, String measurementName) throws Exception {
- if (!checkIfExists(dbName, measurementName)) {
- throw NOT_FOUND_EXCEPTION;
- }
- return databaseMap.get(dbName).get(measurementName).getFieldsForMeasurement();
- }
-
- public Map getDbMetadataMap() {
- return dbMetadataMap;
+ public int getDefaultTimebucketSize() {
+ return defaultTimebucketSize;
}
@Override
- public Map> getMeasurementMap() {
- return databaseMap;
+ public AtomicInteger getCounter() {
+ return counter;
}
}
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/mem/MemTagIndex.java b/core/src/main/java/com/srotya/sidewinder/core/storage/mem/MemTagIndex.java
index 1317976..87e4885 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/storage/mem/MemTagIndex.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/storage/mem/MemTagIndex.java
@@ -15,6 +15,7 @@
*/
package com.srotya.sidewinder.core.storage.mem;
+import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
@@ -92,4 +93,8 @@ public Set searchRowKeysForTag(String tag) {
return rowKeyIndex.get(tag);
}
+ @Override
+ public void close() throws IOException {
+ }
+
}
\ No newline at end of file
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/mem/MemoryMeasurement.java b/core/src/main/java/com/srotya/sidewinder/core/storage/mem/MemoryMeasurement.java
index 2a2fe94..7162680 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/storage/mem/MemoryMeasurement.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/storage/mem/MemoryMeasurement.java
@@ -22,7 +22,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Logger;
@@ -31,6 +33,7 @@
import com.srotya.sidewinder.core.storage.StorageEngine;
import com.srotya.sidewinder.core.storage.TagIndex;
import com.srotya.sidewinder.core.storage.TimeSeries;
+import com.srotya.sidewinder.core.storage.compression.Writer;
/**
* @author ambud
@@ -81,7 +84,7 @@ public void close() throws IOException {
}
@Override
- public ByteBuffer createNewBuffer(String seriesId) throws IOException {
+ public ByteBuffer createNewBuffer(String seriesId, String tsBucket) throws IOException {
ByteBuffer allocateDirect = ByteBuffer.allocateDirect(1024);
synchronized (bufTracker) {
bufTracker.add(allocateDirect);
@@ -127,4 +130,19 @@ public Logger getLogger() {
return logger;
}
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return "MemoryMeasurement [measurementName=" + measurementName + ", metadata=" + metadata + ", seriesMap="
+ + seriesMap + ", tagIndex=" + tagIndex + ", bufTracker=" + bufTracker + ", compressionClass="
+ + compressionClass + "]";
+ }
+
+ @Override
+ public SortedMap> createNewBucketMap(String seriesId) {
+ return new ConcurrentSkipListMap<>();
+ }
+
}
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/mem/archival/DiskArchiver.java b/core/src/main/java/com/srotya/sidewinder/core/storage/mem/archival/DiskArchiver.java
index ac544b1..9fecc99 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/storage/mem/archival/DiskArchiver.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/storage/mem/archival/DiskArchiver.java
@@ -91,7 +91,7 @@ public static void serializeToStream(DataOutputStream bos, TimeSeriesArchivalObj
bos.writeUTF(blob.getDb());
bos.writeUTF(blob.getMeasurement());
bos.writeUTF(blob.getKey());
- bos.writeLong(blob.getBucket().getHeaderTimestamp());
+// bos.writeLong(blob.getBucket().getHeaderTimestamp());
// bos.writeInt(blob.getBucket().getCount());
// Reader reader = blob.getBucket().getReader(null, null);
// byte[] buf = reader.toByteArray();
diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/mem/archival/TimeSeriesArchivalObject.java b/core/src/main/java/com/srotya/sidewinder/core/storage/mem/archival/TimeSeriesArchivalObject.java
index b9d5794..9979eb2 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/storage/mem/archival/TimeSeriesArchivalObject.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/storage/mem/archival/TimeSeriesArchivalObject.java
@@ -15,7 +15,7 @@
*/
package com.srotya.sidewinder.core.storage.mem.archival;
-import com.srotya.sidewinder.core.storage.TimeSeriesBucket;
+import com.srotya.sidewinder.core.storage.compression.Writer;
/**
* @author ambud
@@ -25,12 +25,12 @@ public class TimeSeriesArchivalObject {
private String db;
private String measurement;
private String key;
- private TimeSeriesBucket bucket;
+ private Writer bucket;
public TimeSeriesArchivalObject() {
}
- public TimeSeriesArchivalObject(String db, String measurement, String key, TimeSeriesBucket bucket) {
+ public TimeSeriesArchivalObject(String db, String measurement, String key, Writer bucket) {
this.db = db;
this.measurement = measurement;
this.key = key;
@@ -85,16 +85,16 @@ public void setKey(String key) {
/**
* @return the bucket
*/
- public TimeSeriesBucket getBucket() {
+ public Writer getBucket() {
return bucket;
}
/**
- * @param bucket
- * the bucket to set
+ * @param bucket the bucket to set
*/
- public void setBucket(TimeSeriesBucket bucket) {
+ public void setBucket(Writer bucket) {
this.bucket = bucket;
}
+
}
\ No newline at end of file
diff --git a/core/src/main/java/com/srotya/sidewinder/core/utils/ByteUtils.java b/core/src/main/java/com/srotya/sidewinder/core/utils/ByteUtils.java
index 68823a8..418b9eb 100644
--- a/core/src/main/java/com/srotya/sidewinder/core/utils/ByteUtils.java
+++ b/core/src/main/java/com/srotya/sidewinder/core/utils/ByteUtils.java
@@ -15,7 +15,6 @@
*/
package com.srotya.sidewinder.core.utils;
-import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
@@ -53,40 +52,6 @@ public static int bytesToIntMSB(byte[] bytes) {
return val;
}
- /**
- * @param x
- * @return
- */
- public static byte[] longToBytes(long x) {
- ByteBuffer buffer = ByteBuffer.allocate(8);
- buffer.putLong(x);
- return buffer.array();
- }
-
- public static long byteToLong(byte[] bytes) {
- ByteBuffer buffer = ByteBuffer.wrap(bytes);
- return buffer.getLong();
- }
-
- /**
- * @param x
- * @return
- */
- public static byte[] doubleToBytes(double x){
- ByteBuffer buffer = ByteBuffer.allocate(8);
- buffer.putDouble(x);
- return buffer.array();
- }
-
- /**
- * @param bytes
- * @return
- */
- public static double bytesToDouble(byte[] bytes) {
- ByteBuffer buffer = ByteBuffer.wrap(bytes);
- return buffer.getDouble();
- }
-
/**
* @param in
* @return
diff --git a/core/src/main/resources/configs/sidewinder-env.sh b/core/src/main/resources/configs/sidewinder-env.sh
index eae33ff..67df142 100644
--- a/core/src/main/resources/configs/sidewinder-env.sh
+++ b/core/src/main/resources/configs/sidewinder-env.sh
@@ -9,7 +9,7 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
source /etc/environment
-export CHECK_UPGRADE="true"
+export CHECK_UPGRADE="false"
function getCurrentVersion() {
export VERSION=`rpm -qa | grep sidewinder | awk -F'-' '{ print $3 }'`
diff --git a/core/src/test/java/com/srotya/sidewinder/core/graphite/TestGraphiteDecoder.java b/core/src/test/java/com/srotya/sidewinder/core/graphite/TestGraphiteDecoder.java
new file mode 100644
index 0000000..0df3cae
--- /dev/null
+++ b/core/src/test/java/com/srotya/sidewinder/core/graphite/TestGraphiteDecoder.java
@@ -0,0 +1,76 @@
+/**
+ * Copyright 2017 Ambud Sharma
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.srotya.sidewinder.core.graphite;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.srotya.sidewinder.core.storage.StorageEngine;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.string.StringDecoder;
+
+/**
+ * @author ambud
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestGraphiteDecoder {
+
+ @Mock
+ private StorageEngine engine;
+
+ @Test
+ public void testHandler() throws IOException {
+ EmbeddedChannel ch = new EmbeddedChannel(new StringDecoder(), new GraphiteDecoder("test", engine));
+ ch.writeInbound(Unpooled.copiedBuffer("app1.server1.jvm.heap.max 233123 1497720452", Charset.defaultCharset()));
+ ch.readInbound();
+ verify(engine, times(1)).writeDataPoint("test", "server1", "max", Arrays.asList("app1", "jvm", "heap"),
+ ((long) 1497720452) * 1000, 233123);
+ ch.close();
+ }
+
+ @Test
+ public void testParseAndInsert() throws IOException {
+ GraphiteDecoder.parseAndInsertDataPoints("test", "app1.server1.jvm.heap.max 233123 1497720452", engine);
+ verify(engine, times(1)).writeDataPoint("test", "server1", "max", Arrays.asList("app1", "jvm", "heap"),
+ ((long) 1497720452) * 1000, 233123);
+ }
+
+ @Test
+ public void testIncorrectParseSkip() throws IOException {
+ GraphiteDecoder.parseAndInsertDataPoints("test", "app1 1497720452", engine);
+ GraphiteDecoder.parseAndInsertDataPoints("test", "app1.app2 1497720452", engine);
+
+ GraphiteDecoder.parseAndInsertDataPoints("test",
+ "app1.server1.jvm.heap.max233123 1497720452\n" + "app1.server2.jvm.heap.max 2331231497720452", engine);
+ verify(engine, times(0)).writeDataPoint("test", "server2", "max", Arrays.asList("app1", "jvm", "heap"),
+ ((long) 1497720452) * 1000, 233123);
+
+ GraphiteDecoder.parseAndInsertDataPoints("test", "app1.server1.heap 233123 1497720452", engine);
+ verify(engine, times(0)).writeDataPoint("test", "server1", "max", Arrays.asList("app1"),
+ ((long) 1497720452) * 1000, 233123);
+ }
+}
diff --git a/core/src/test/java/com/srotya/sidewinder/core/rpc/TestGRPWriterServiceImpl.java b/core/src/test/java/com/srotya/sidewinder/core/rpc/TestGRPWriterServiceImpl.java
index 2b4442e..5cda2ee 100644
--- a/core/src/test/java/com/srotya/sidewinder/core/rpc/TestGRPWriterServiceImpl.java
+++ b/core/src/test/java/com/srotya/sidewinder/core/rpc/TestGRPWriterServiceImpl.java
@@ -124,8 +124,11 @@ public void testPointWritesRejects() throws Exception {
Point.newBuilder().setDbName(dbName).setFp(true).setMeasurementName(measurementName).addTags("host1")
.setTimestamp(sts + 1).setValue(2L).setValueFieldName("usage").build());
try {
- client.writeBatchDataPoint(BatchData.newBuilder().setMessageId(sts).addAllPoints(points).build());
- fail("Exception must be thrown");
+ Ack response = client
+ .writeBatchDataPoint(BatchData.newBuilder().setMessageId(sts).addAllPoints(points).build());
+ if (response.getResponseCode() == 200) {
+ fail("Exception must be thrown");
+ }
} catch (Exception e) {
}
// second data point should have been rejected
diff --git a/core/src/test/java/com/srotya/sidewinder/core/storage/MockMeasurement.java b/core/src/test/java/com/srotya/sidewinder/core/storage/MockMeasurement.java
index e791d42..107b48d 100644
--- a/core/src/test/java/com/srotya/sidewinder/core/storage/MockMeasurement.java
+++ b/core/src/test/java/com/srotya/sidewinder/core/storage/MockMeasurement.java
@@ -17,20 +17,26 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Logger;
+import com.srotya.sidewinder.core.storage.compression.Writer;
+
/**
* @author ambud
*/
public class MockMeasurement implements Measurement {
private int bufferRenewCounter = 0;
- private List list;
+ private List