Skip to content
This repository has been archived by the owner on Mar 27, 2021. It is now read-only.

Commit

Permalink
[all] introduce id-based operations in MetadataBackend
Browse files Browse the repository at this point in the history
  • Loading branch information
udoprog committed Jun 7, 2016
1 parent dc5c152 commit 54f4443
Show file tree
Hide file tree
Showing 12 changed files with 532 additions and 73 deletions.
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2015 Spotify AB.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.heroic.metadata;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.spotify.heroic.cluster.ClusterShard;
import com.spotify.heroic.common.DateRange;
import com.spotify.heroic.common.OptionalLimit;
import com.spotify.heroic.filter.Filter;
import com.spotify.heroic.metric.RequestError;
import com.spotify.heroic.metric.ShardError;
import eu.toolchain.async.Collector;
import eu.toolchain.async.Transform;
import lombok.Data;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

@Data
public class FindSeriesIds {
private final List<RequestError> errors;
private final Set<String> ids;
private final boolean limited;

public static FindSeriesIds of() {
return new FindSeriesIds(ImmutableList.of(), ImmutableSet.of(), false);
}

public static FindSeriesIds of(final Set<String> ids, final boolean limited) {
return new FindSeriesIds(ImmutableList.of(), ids, limited);
}

public static Collector<FindSeriesIds, FindSeriesIds> reduce(final OptionalLimit limit) {
return results -> {
final List<RequestError> errors = new ArrayList<>();
final ImmutableSet.Builder<String> ids = ImmutableSet.builder();
boolean limited = false;

for (final FindSeriesIds result : results) {
errors.addAll(result.errors);
ids.addAll(result.ids);
limited |= result.limited;
}

final Set<String> s = ids.build();
return new FindSeriesIds(errors, limit.limitSet(s),
limited || limit.isGreater(s.size()));
};
}

public static Transform<Throwable, FindSeriesIds> shardError(final ClusterShard shard) {
return e -> new FindSeriesIds(ImmutableList.of(ShardError.fromThrowable(shard, e)),
ImmutableSet.of(), false);
}

@JsonIgnore
public boolean isEmpty() {
return ids.isEmpty();
}

@Data
public static class Request {
private final Filter filter;
private final DateRange range;
private final OptionalLimit limit;
}
}
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2015 Spotify AB.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.heroic.metadata;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.ImmutableSet;
import com.spotify.heroic.common.DateRange;
import com.spotify.heroic.common.OptionalLimit;
import com.spotify.heroic.filter.Filter;
import lombok.Data;

import java.util.Set;

@Data
public class FindSeriesIdsStream {
private final Set<String> ids;

public static FindSeriesIdsStream of() {
return new FindSeriesIdsStream(ImmutableSet.of());
}

public static FindSeriesIdsStream of(final Set<String> ids) {
return new FindSeriesIdsStream(ids);
}

@JsonIgnore
public boolean isEmpty() {
return ids.isEmpty();
}

@Data
public static class Request {
private final Filter filter;
private final DateRange range;
private final OptionalLimit limit;
}
}
Expand Up @@ -53,6 +53,14 @@ default AsyncObservable<FindSeriesStream> findSeriesStream(FindSeries.Request re
return AsyncObservable.empty();
}

AsyncFuture<FindSeriesIds> findSeriesIds(FindSeriesIds.Request request);

default AsyncObservable<FindSeriesIdsStream> findSeriesIdsStream(
FindSeriesIds.Request request
) {
return AsyncObservable.empty();
}

AsyncFuture<CountSeries> countSeries(CountSeries.Request request);

AsyncFuture<DeleteSeries> deleteSeries(DeleteSeries.Request request);
Expand Down
Expand Up @@ -60,11 +60,24 @@ public AsyncFuture<FindSeries> findSeries(final FindSeries.Request request) {
FindSeries.reduce(request.getLimit()));
}

@Override
public AsyncFuture<FindSeriesIds> findSeriesIds(final FindSeriesIds.Request request) {
return async.collect(run(v -> v.findSeriesIds(request)),
FindSeriesIds.reduce(request.getLimit()));
}

@Override
public AsyncObservable<FindSeriesStream> findSeriesStream(final FindSeries.Request request) {
return AsyncObservable.chain(run(b -> b.findSeriesStream(request)));
}

@Override
public AsyncObservable<FindSeriesIdsStream> findSeriesIdsStream(
final FindSeriesIds.Request request
) {
return AsyncObservable.chain(run(b -> b.findSeriesIdsStream(request)));
}

@Override
public AsyncFuture<DeleteSeries> deleteSeries(final DeleteSeries.Request request) {
return async.collect(run(b -> b.deleteSeries(request)), DeleteSeries.reduce());
Expand Down
2 changes: 2 additions & 0 deletions heroic-core/src/main/java/com/spotify/heroic/shell/Tasks.java
Expand Up @@ -50,6 +50,7 @@
import com.spotify.heroic.shell.task.MetadataEntries;
import com.spotify.heroic.shell.task.MetadataFetch;
import com.spotify.heroic.shell.task.MetadataFindSeries;
import com.spotify.heroic.shell.task.MetadataFindSeriesIds;
import com.spotify.heroic.shell.task.MetadataLoad;
import com.spotify.heroic.shell.task.MetadataMigrate;
import com.spotify.heroic.shell.task.MetadataTags;
Expand Down Expand Up @@ -119,6 +120,7 @@ public final class Tasks {
shellTask(MetadataCount::setup, MetadataCount.class);
shellTask(MetadataEntries::setup, MetadataEntries.class);
shellTask(MetadataFindSeries::setup, MetadataFindSeries.class);
shellTask(MetadataFindSeriesIds::setup, MetadataFindSeriesIds.class);
shellTask(MetadataMigrate::setup, MetadataMigrate.class);
shellTask(MetadataLoad::setup, MetadataLoad.class);
shellTask(SuggestTag::setup, SuggestTag.class);
Expand Down
@@ -0,0 +1,140 @@
/*
* Copyright (c) 2015 Spotify AB.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.heroic.shell.task;

import com.spotify.heroic.async.AsyncObserver;
import com.spotify.heroic.common.OptionalLimit;
import com.spotify.heroic.dagger.CoreComponent;
import com.spotify.heroic.filter.Filter;
import com.spotify.heroic.grammar.QueryParser;
import com.spotify.heroic.metadata.FindSeriesIds;
import com.spotify.heroic.metadata.FindSeriesIdsStream;
import com.spotify.heroic.metadata.MetadataBackend;
import com.spotify.heroic.metadata.MetadataManager;
import com.spotify.heroic.shell.ShellIO;
import com.spotify.heroic.shell.ShellTask;
import com.spotify.heroic.shell.TaskName;
import com.spotify.heroic.shell.TaskParameters;
import com.spotify.heroic.shell.TaskUsage;
import com.spotify.heroic.shell.Tasks;
import dagger.Component;
import eu.toolchain.async.AsyncFramework;
import eu.toolchain.async.AsyncFuture;
import eu.toolchain.async.ResolvableFuture;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.kohsuke.args4j.Argument;
import org.kohsuke.args4j.Option;

import javax.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;

@TaskUsage("Find series using the given filters")
@TaskName("metadata-find-series-ids")
@Slf4j
public class MetadataFindSeriesIds implements ShellTask {
private final MetadataManager metadata;
private final QueryParser parser;
private final AsyncFramework async;

@Inject
public MetadataFindSeriesIds(
MetadataManager metadata, QueryParser parser, AsyncFramework async
) {
this.metadata = metadata;
this.parser = parser;
this.async = async;
}

@Override
public TaskParameters params() {
return new Parameters();
}

@Override
public AsyncFuture<Void> run(final ShellIO io, TaskParameters base) throws Exception {
final Parameters params = (Parameters) base;

final Filter filter = Tasks.setupFilter(parser, params);
final MetadataBackend group = metadata.useOptionalGroup(params.group);
final Consumer<String> printer = id -> io.out().println(id);
final ResolvableFuture<Void> future = async.future();

group
.findSeriesIdsStream(
new FindSeriesIds.Request(filter, params.getRange(), params.getLimit()))
.observe(new AsyncObserver<FindSeriesIdsStream>() {
@Override
public AsyncFuture<Void> observe(final FindSeriesIdsStream value) {
value.getIds().forEach(printer);
io.out().flush();
return async.resolved();
}

@Override
public void cancel() {
future.cancel();
}

@Override
public void fail(final Throwable cause) {
future.fail(cause);
}

@Override
public void end() {
future.resolve(null);
}
});

return future;
}

@ToString
private static class Parameters extends Tasks.QueryParamsBase {
@Option(name = "-g", aliases = {"--group"}, usage = "Backend group to use",
metaVar = "<group>")
private Optional<String> group = Optional.empty();

@Option(name = "--limit", aliases = {"--limit"},
usage = "Limit the number of printed entries")
@Getter
private OptionalLimit limit = OptionalLimit.empty();

@Argument
@Getter
private List<String> query = new ArrayList<>();
}

public static MetadataFindSeriesIds setup(final CoreComponent core) {
return DaggerMetadataFindSeriesIds_C.builder().coreComponent(core).build().task();
}

@Component(dependencies = CoreComponent.class)
interface C {
MetadataFindSeriesIds task();
}
}

0 comments on commit 54f4443

Please sign in to comment.