Skip to content

Commit

Permalink
Merge pull request apache#176 from seznam/add-interface-audience-anno…
Browse files Browse the repository at this point in the history
…tations

 [euphoria-core] add API audience annotations to core
  • Loading branch information
je-ik committed Nov 10, 2017
2 parents 93a76d5 + 30afe52 commit bcfad9f
Show file tree
Hide file tree
Showing 140 changed files with 440 additions and 171 deletions.
@@ -0,0 +1,45 @@
/**
* Copyright 2016-2017 Seznam.cz, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.annotation.audience;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Intended audience of API.
*/
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.SOURCE)
public @interface Audience {

enum Type {
/** The API is intended to be used by client code. */
CLIENT,
/** The API is intended for use by executor code. */
EXECUTOR,
/** The API is intended for internal use. */
INTERNAL,
/** The API is intended for use primarily for tests. */
TESTS
}

Type[] value();

}
Expand Up @@ -15,12 +15,15 @@
*/
package cz.seznam.euphoria.core.client.accumulators;

import cz.seznam.euphoria.core.annotation.audience.Audience;

/**
* Accumulators collect values from user functions.
* Accumulators allow user to calculate statistics during the flow execution.
* <p>
* Accumulators are inspired by the Hadoop/MapReduce counters.
*/
@Audience(Audience.Type.INTERNAL)
public interface Accumulator {

}
Expand Up @@ -15,6 +15,7 @@
*/
package cz.seznam.euphoria.core.client.accumulators;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.util.Settings;

import java.io.Serializable;
Expand All @@ -23,6 +24,7 @@
* Provides access to an accumulator backend service. It is intended to be
* implemented by third party to support different type of services.
*/
@Audience(Audience.Type.EXECUTOR)
public interface AccumulatorProvider {

/**
Expand Down
Expand Up @@ -15,9 +15,12 @@
*/
package cz.seznam.euphoria.core.client.accumulators;

import cz.seznam.euphoria.core.annotation.audience.Audience;

/**
* Counter is a type of accumulator making a sum from integral numbers.
*/
@Audience(Audience.Type.CLIENT)
public interface Counter extends Accumulator {

/**
Expand Down
Expand Up @@ -15,9 +15,12 @@
*/
package cz.seznam.euphoria.core.client.accumulators;

import cz.seznam.euphoria.core.annotation.audience.Audience;

/**
* Histogram is a type of accumulator recording a distribution of different values.
*/
@Audience(Audience.Type.CLIENT)
public interface Histogram extends Accumulator {

/**
Expand Down
Expand Up @@ -15,13 +15,15 @@
*/
package cz.seznam.euphoria.core.client.accumulators;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

/**
* Timer provides convenience API very similar to {@link Histogram}
* but extended by time unit support.
*/
@Audience(Audience.Type.CLIENT)
public interface Timer extends Accumulator {

/**
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package cz.seznam.euphoria.core.client.accumulators;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.util.Settings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,6 +27,7 @@
* Placeholder implementation of {@link AccumulatorProvider} that
* may be used in executors as a default.
*/
@Audience(Audience.Type.EXECUTOR)
public class VoidAccumulatorProvider implements AccumulatorProvider {

private static final Logger LOG = LoggerFactory.getLogger(VoidAccumulatorProvider.class);
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package cz.seznam.euphoria.core.client.dataset;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.DataSource;
Expand All @@ -29,6 +30,7 @@
*
* @param <T> type of elements of this data set
*/
@Audience(Audience.Type.CLIENT)
public interface Dataset<T> extends Serializable {

/**
Expand Down
Expand Up @@ -16,13 +16,15 @@

package cz.seznam.euphoria.core.client.dataset;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;

/**
* Various dataset related utils.
*/
@Audience(Audience.Type.EXECUTOR)
public class Datasets {

/**
Expand Down Expand Up @@ -56,7 +58,7 @@ public static <IN, OUT> Dataset<OUT> createOutputFor(
*/
public static <T> Dataset<T> createInputFromSource(
Flow flow, DataSource<T> source) {

return new InputDataset<>(flow, source, source.isBounded());
}
}
Expand Up @@ -15,6 +15,7 @@
*/
package cz.seznam.euphoria.core.client.dataset;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.DataSource;
Expand All @@ -26,6 +27,7 @@
/**
* {@code PCollection} that is input of a {@code Flow}.
*/
@Audience(Audience.Type.EXECUTOR)
class InputDataset<T> implements Dataset<T> {

private final Flow flow;
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package cz.seznam.euphoria.core.client.dataset;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.DataSource;
Expand All @@ -26,6 +27,7 @@
/**
* {@code PCollection} that is output of some operator.
*/
@Audience(Audience.Type.EXECUTOR)
class OutputDataset<T> implements Dataset<T> {

private final Flow flow;
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package cz.seznam.euphoria.core.client.dataset.windowing;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.client.triggers.CountTrigger;
import cz.seznam.euphoria.core.client.triggers.Trigger;

Expand All @@ -23,6 +24,7 @@
/**
* Count tumbling windowing.
*/
@Audience(Audience.Type.CLIENT)
public final class Count<T> implements Windowing<T, GlobalWindowing.Window> {

private final int maxCount;
Expand All @@ -44,5 +46,5 @@ public Trigger<GlobalWindowing.Window> getTrigger() {
public static <T> Count<T> of(int count) {
return new Count<>(count);
}

}
Expand Up @@ -15,6 +15,7 @@
*/
package cz.seznam.euphoria.core.client.dataset.windowing;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.client.triggers.NoopTrigger;
import cz.seznam.euphoria.core.client.triggers.Trigger;

Expand All @@ -25,6 +26,7 @@
* Windowing with single window across the whole dataset. Suitable for
* batch processing.
*/
@Audience(Audience.Type.CLIENT)
public final class GlobalWindowing<T>
implements Windowing<T, GlobalWindowing.Window> {

Expand Down
Expand Up @@ -15,10 +15,12 @@
*/
package cz.seznam.euphoria.core.client.dataset.windowing;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.client.util.Pair;

import java.util.Collection;

@Audience(Audience.Type.CLIENT)
public interface MergingWindowing<T, W extends Window<W>>
extends Windowing<T, W>
{
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package cz.seznam.euphoria.core.client.dataset.windowing;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.annotation.stability.Experimental;
import cz.seznam.euphoria.core.client.triggers.AfterFirstCompositeTrigger;
import cz.seznam.euphoria.core.client.triggers.PeriodicTimeTrigger;
Expand All @@ -36,6 +37,7 @@
/**
* Session windowing.
*/
@Audience(Audience.Type.CLIENT)
public final class Session<T> implements MergingWindowing<T, TimeInterval> {

private final long gapDurationMillis;
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package cz.seznam.euphoria.core.client.dataset.windowing;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.client.triggers.AfterFirstCompositeTrigger;
import cz.seznam.euphoria.core.client.triggers.PeriodicTimeTrigger;
import cz.seznam.euphoria.core.client.triggers.TimeTrigger;
Expand All @@ -31,6 +32,7 @@
/**
* Time based tumbling windowing. Windows can't overlap.
*/
@Audience(Audience.Type.CLIENT)
public class Time<T> implements Windowing<T, TimeInterval> {

private final long durationMillis;
Expand Down
Expand Up @@ -15,6 +15,9 @@
*/
package cz.seznam.euphoria.core.client.dataset.windowing;

import cz.seznam.euphoria.core.annotation.audience.Audience;

@Audience(Audience.Type.CLIENT)
public final class TimeInterval
extends Window<TimeInterval>
implements TimedWindow {
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package cz.seznam.euphoria.core.client.dataset.windowing;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.client.io.Collector;
import cz.seznam.euphoria.core.client.triggers.TimeTrigger;
import cz.seznam.euphoria.core.client.triggers.Trigger;
Expand All @@ -27,6 +28,7 @@
/**
* Time sliding windowing.
*/
@Audience(Audience.Type.CLIENT)
public final class TimeSliding<T>
implements Windowing<T, TimeInterval> {

Expand Down
Expand Up @@ -15,10 +15,13 @@
*/
package cz.seznam.euphoria.core.client.dataset.windowing;

import cz.seznam.euphoria.core.annotation.audience.Audience;

/**
* Extension to {@link cz.seznam.euphoria.core.client.dataset.windowing.Window}
* defining time based constraints on the implementor.
*/
@Audience(Audience.Type.INTERNAL)
public interface TimedWindow {

/**
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package cz.seznam.euphoria.core.client.dataset.windowing;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import java.io.Serializable;

/**
Expand All @@ -25,6 +26,7 @@
* Subclasses should implement {@code equals()}, {@code hashCode()} and {@code compareTo()} so that logically
* same windows are treated the same.
*/
@Audience(Audience.Type.CLIENT)
public abstract class Window<T extends Window<T>> implements Serializable, Comparable<T> {

@Override
Expand Down
Expand Up @@ -15,7 +15,7 @@
*/
package cz.seznam.euphoria.core.client.dataset.windowing;

import cz.seznam.euphoria.core.client.functional.TernaryFunction;
import cz.seznam.euphoria.core.annotation.audience.Audience;

/**
* A single data element flowing in dataset. Every such element
Expand All @@ -24,11 +24,22 @@
* @param <W> type of the window
* @param <T> type of the data element
*/
@Audience(Audience.Type.CLIENT)
public interface WindowedElement<W extends Window, T> {

/**
* @return window of element.
*/
W getWindow();

/**
* @return associated timestamp
*/
long getTimestamp();

/**
* @return the data element itself
*/
T getElement();

}
Expand Up @@ -15,6 +15,7 @@
*/
package cz.seznam.euphoria.core.client.dataset.windowing;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.client.triggers.Trigger;

import java.io.Serializable;
Expand All @@ -23,6 +24,7 @@
/**
* A windowing policy of a dataset.
*/
@Audience(Audience.Type.CLIENT)
public interface Windowing<T, W extends Window> extends Serializable {

/**
Expand Down

0 comments on commit bcfad9f

Please sign in to comment.