由于 flink 窗口操作涉及的代码较多,因此我决定用几篇文章分别针对不同的点进行讲解,这篇文章我们来讲解一下窗口的一些组件类
Window 是一个抽象类,代表窗口,一个窗口是将元素分组为有限的桶。窗口有一个最大时间戳,意味着在这个时间点,所有属于该窗口的元素都将到达
public abstract class Window {
/**
* 获取仍属于此窗口的最大时间戳
*/
public abstract long maxTimestamp();
}
TimeWindow 是 Window 的实现类,TimeWindow 是一个时间窗口,代表一段时间,从 start 到 end,[start, end),左闭右开,start 端点属于窗口,end 端点不属于窗口,下面我们来看看 TimeWindow 的几个重要的方法
-
构造函数
private final long start; private final long end; /** * 构造函数,start 代表左端点,end 代表右端点 */ public TimeWindow(long start, long end) { this.start = start; this.end = end; }
-
maxTimestamp
maxTimestamp 实现 Window 中定义的抽象方法
/** * 获取属于该窗口的最大时间戳,因为区间是左闭右开,所以是 end - 1 */ public long maxTimestamp() { return end - 1; }
-
getWindowStartWithOffset
getWindowStartWithOffset 用于获取元素所属窗口的开端,timestamp 指代元素的时间戳,windowSize 指代窗口的大小,offset 指代偏移量
假设 timestamp 为 90,windowSize 为 60,如果 offset 为 0,窗口则为 0 - 60,60 - 120 ... 如果 offset 为 20,窗口贼为 20 - 80,80 - 140 ...
/** * 获取窗口的开端 */ public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { return timestamp - (timestamp - offset + windowSize) % windowSize; }
-
窗口合并相关
相交的窗口可以合并为一个窗口,intersects 方法判断本窗口和 other 窗口是否相交,cover 返回能够覆盖本窗口和给定的窗口的最小窗口
/** * 返回窗口是否与 other 窗口相交 */ public boolean intersects(TimeWindow other) { return this.start <= other.end && this.end >= other.start; } /** * 返回最小的窗口能够覆盖本窗口和给定的窗口 */ public TimeWindow cover(TimeWindow other) { return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end)); }
mergeWindows 方法接受一个时间窗口的集合,合并集合中所有相交的时间窗口,得到一个元素为
Tuple2<TimeWindow, Set<TimeWindow>>
的 List,Tuple2.f0 指代合并得到的时间窗口,Tuple2.f1 指代合并成 Tuple2.f0 的所有时间窗口组成的 set/** * 合并重叠的时间窗口 * 会在合并窗口分配器的时候被用到 */ public static void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) { // 将窗口按照开始时间排序,然后合并有重合的窗口 List<TimeWindow> sortedWindows = new ArrayList<>(windows); Collections.sort(sortedWindows, new Comparator<TimeWindow>() { @Override public int compare(TimeWindow o1, TimeWindow o2) { return Long.compare(o1.getStart(), o2.getStart()); } }); List<Tuple2<TimeWindow, Set<TimeWindow>>> merged = new ArrayList<>(); Tuple2<TimeWindow, Set<TimeWindow>> currentMerge = null; for (TimeWindow candidate: sortedWindows) { // currentMerge 为空的时候,初始化,直接赋值 if (currentMerge == null) { currentMerge = new Tuple2<>(); currentMerge.f0 = candidate; currentMerge.f1 = new HashSet<>(); currentMerge.f1.add(candidate); } else if (currentMerge.f0.intersects(candidate)) { // 当前窗口和 currentMerge.f0 的窗口相交 currentMerge.f0 = currentMerge.f0.cover(candidate); currentMerge.f1.add(candidate); } else { // 当不相交的时候,需要新建立 currentMerge merged.add(currentMerge); currentMerge = new Tuple2<>(); currentMerge.f0 = candidate; currentMerge.f1 = new HashSet<>(); currentMerge.f1.add(candidate); } } if (currentMerge != null) { merged.add(currentMerge); } for (Tuple2<TimeWindow, Set<TimeWindow>> m: merged) { // set 的 size() 大于 1,表明这个 Tuple2 是多个原始窗口合并的 // 调用 merge 的回调函数 if (m.f1.size() > 1) { c.merge(m.f1, m.f0); } } }
Time 是窗口中使用的时间单位,由 TimeUnit 和 size 组成,TimeUnit 指代 Time 使用的时间单位,size 指代窗口由多少 TimeUnit 组成
public final class Time {
// 时间单位
private final TimeUnit unit;
// 时间量级
private final long size;
private Time(long size, TimeUnit unit) {
this.unit = checkNotNull(unit, "time unit may not be null");
this.size = size;
}
/**
* 将窗口长度转为毫秒表示
*/
public long toMilliseconds() {
return unit.toMillis(size);
}
}
WindowAssigner 是窗口分配器,用于给一个元素分配一个或多个窗口
-
WindowAssigner 抽象类
public abstract class WindowAssigner<T, W extends Window> implements Serializable { private static final long serialVersionUID = 1L; /** * 返回应该分配给元素的窗口集合 * * @param element 被分配窗口的元素 * @param timestamp 元素的 ts * @param context 分配器在其中操作的 WindowAssignerContext */ public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context); /** * 返回与该 WindowAssigner 有关的默认触发器 */ public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env); /** * 返回一个类型序列器,用来序列化窗口 */ public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig); /** * 返回元素是否是按照 event time 来分配窗口的 */ public abstract boolean isEventTime(); /** * WindowAssigner 的 context,可以用于请求当前的进程时间 */ public abstract static class WindowAssignerContext { /** * Returns the current processing time. */ /** * 返回当前的进程时间 */ public abstract long getCurrentProcessingTime(); } }
-
TumblingProcessingTimeWindows
TumblingProcessingTimeWindows 是翻转进程时间窗口分配器,根据机器的当前系统时间,将元素放入窗口中,窗口不能重叠,例如窗口 size 为 60,offset 为 0,0 - 60 为第一个窗口,60 - 120 为第二个,依此类推。 可以看到 assignWindows 方法调用了前文讲过的
TimeWindow.getWindowStartWithOffset(now, offset, size)
来获取窗口的开端public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { private final long size; // 窗口大小 private final long offset; // 窗口偏移 private TumblingProcessingTimeWindows(long size, long offset) { // 当 offset 大于 size 的时候,抛出异常 if (Math.abs(offset) >= size) { throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy abs(offset) < size"); } this.size = size; this.offset = offset; } @Override // 根据当前进程的时间分配窗口,与 element 本身无关,只取决的于当前的进程时间 public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { final long now = context.getCurrentProcessingTime(); // 获取元素应该位于窗口的开端 long start = TimeWindow.getWindowStartWithOffset(now, offset, size); return Collections.singletonList(new TimeWindow(start, start + size)); } }
-
TumblingEventTimeWindows
TumblingEventTimeWindows 是翻转事件时间窗口分配器,根据元素的时间戳,将窗口元素放入窗口中,窗口不能重叠,assignWindows 中校验了 timestamp 是否大于 Long.MIN_VALUE,当采用 ProcessingTime 模式的时候,StreamRecord 的 ts 始终是 Long.MIN_VALUE,当采用 EventTime 模式的时候,如果 StreamSource 产生元素的时候没有调用 collectWithTimestamp,StreamRecord 最初的 ts 也是 Long.MIN_VALUE,需要调用 DataStream 的 assignTimestampsAndWatermark 方法来给 StreamRecord 设置 timestamp 以及生成 Watermark
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { private static final long serialVersionUID = 1L; private final long size; // 窗口大小 private final long offset; // 窗口偏移 protected TumblingEventTimeWindows(long size, long offset) { if (Math.abs(offset) >= size) { throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size"); } this.size = size; this.offset = offset; } @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { // 当 StreamRecord 没有 ts 的时候,getTimestamp 返回 Long.MIN_VALUE long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size); return Collections.singletonList(new TimeWindow(start, start + size)); } else { // 说明 StreamRecord 没有被设置 ts,有空吗没有调用 assignTimestampsAndWatermarks throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } } }
-
SlidingProcessingTimeWindows
SlidingProcessingTimeWindows 是滑动进程时间窗口分配器,根据机器的当前系统时间,将元素放入窗口中,窗口可以重叠,例如窗口 size 为 60,offset 为 0,slide 为 10,0 - 60 为第一个窗口,10 - 70 为第二个,依此类推。SlidingProcessingTimeWindows 中 offset 的绝对值不能大于 slide,因为在调用
TimeWindow.getWindowStartWithOffset
的时候,传入的 size 是 slidepublic class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { private final long size; private final long offset; private final long slide; private SlidingProcessingTimeWindows(long size, long slide, long offset) { if (Math.abs(offset) >= slide || size <= 0) { throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy " + "abs(offset) < slide and size > 0"); } this.size = size; this.slide = slide; this.offset = offset; } @Override /** * 分配元素属于的窗口 */ public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { timestamp = context.getCurrentProcessingTime(); // 每一个元素都应该至少属于 size/slide 个滑动窗口,需要记住,时间窗口都是左闭右开的 List<TimeWindow> windows = new ArrayList<>((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); // 这里感觉能优化一下,start 有可能左溢出,比如 timestamp 为 3,size 为 60 for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; } }
-
SlidingEventTimeWindows
SlidingEventTimeWindows 和 SlidingProcessingTimeWindows 分配窗口的方式相同,SlidingEventTimeWindows 依赖于元素的 timestamp,SlidingProcessingTimeWindows 依赖于当前的进程时间
public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { private final long size; private final long slide; private final long offset; protected SlidingEventTimeWindows(long size, long slide, long offset) { if (Math.abs(offset) >= slide || size <= 0) { throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy " + "abs(offset) < slide and size > 0"); } this.size = size; this.slide = slide; this.offset = offset; } @Override /** * 依据元素的时间戳给元素分配窗口 */ public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { List<TimeWindow> windows = new ArrayList<>((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; } else { throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } }
-
MergingWindowAssigner
MergingWindowAssigner 是一个抽象类,定义了一种能够合并窗口的窗口分配器,MergingWindowAssigner 的实现类通过调用 TimeWindow 的 mergeWindows 实现 mergeWindows 方法
public abstract class MergingWindowAssigner<T, W extends Window> extends WindowAssigner<T, W> { /** * 决定了哪些窗口应该被合并 */ public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback); /** * mergeWindows 中使用的回调函数 */ public interface MergeCallback<W> { /** * toBeMerged 是被合并的窗口的集合 * mergeResult 是 toBeMerged 中窗口合并的结果 */ void merge(Collection<W> toBeMerged, W mergeResult); } }
-
ProcessingTimeSessionWindows
ProcessingTimeSessionWindows 是一种 MergingWindowAssigner,根据当前的进程时间和设置的 sessionTimeout 为元素分配窗口,ProcessingTimeSessionWindows 不会去找窗口的开端,而是将当前的进程时间作为开端,sessionTimeout 作为大小。ProcessingTimeSessionWindows 通过调用 TimeWindow 的 mergeWindows 实现 mergeWindows 方法(其他 MergingWindowAssigner 的代码中就不给出这个方法了)
public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> { protected long sessionTimeout; protected ProcessingTimeSessionWindows(long sessionTimeout) { if (sessionTimeout <= 0) { throw new IllegalArgumentException("ProcessingTimeSessionWindows parameters must satisfy 0 < size"); } this.sessionTimeout = sessionTimeout; } @Override /** * 为元素分配窗口,窗口与元素本身无关,只与当前的进程时间和设置的 sessionTimeout 有关 */ public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { long currentProcessingTime = context.getCurrentProcessingTime(); return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout)); } public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) { TimeWindow.mergeWindows(windows, c); } }
-
EventTimeSessionWindows
和 ProcessingTimeSessionWindows 类似,使用元素的时间戳作为窗口的开端
public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> { protected long sessionTimeout; /** * 构造函数 */ protected EventTimeSessionWindows(long sessionTimeout) { if (sessionTimeout <= 0) { throw new IllegalArgumentException("EventTimeSessionWindows parameters must satisfy 0 < size"); } this.sessionTimeout = sessionTimeout; } @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout)); } }
-
DynamicProcessingTimeSessionWindows
DynamicProcessingTimeSessionWindows 和 ProcessingTimeSessionWindows 类似,都是通过当前的进程时间和一个 sessionTimeout 来分配端口,不过 DynamicProcessingTimeSessionWindows 调用
sessionWindowTimeGapExtractor.extract(element)
获取 sessionTimeout,针对不同的元素,有可能得到不同的 sessionTimeoutpublic class DynamicProcessingTimeSessionWindows<T> extends MergingWindowAssigner<T, TimeWindow> { protected SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor; protected DynamicProcessingTimeSessionWindows(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) { this.sessionWindowTimeGapExtractor = sessionWindowTimeGapExtractor; } @Override public Collection<TimeWindow> assignWindows(T element, long timestamp, WindowAssignerContext context) { long currentProcessingTime = context.getCurrentProcessingTime(); // 这个 sessionTimeout 和 ProcessingTimeSessionWindows.java 不一样 // 这个是根据 element 动态获取 sessionTimeout long sessionTimeout = sessionWindowTimeGapExtractor.extract(element); if (sessionTimeout <= 0) { throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap"); } return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout)); } }
-
DynamicEventTimeSessionWindows
DynamicEventTimeSessionWindows 和 DynamicProcessingTimeSessionWindows 类似,使用元素的时间戳作为窗口的开端
public class DynamicEventTimeSessionWindows<T> extends MergingWindowAssigner<T, TimeWindow> { protected SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor; protected DynamicEventTimeSessionWindows(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) { this.sessionWindowTimeGapExtractor = sessionWindowTimeGapExtractor; } @Override public Collection<TimeWindow> assignWindows(T element, long timestamp, WindowAssignerContext context) { // sessionTimeout 是根据元素动态调整了 long sessionTimeout = sessionWindowTimeGapExtractor.extract(element); if (sessionTimeout <= 0) { throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap"); } return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout)); } }
Trigger 是一个触发器,用来决定何时计算窗口 emit 该窗口部分的结果
-
Trigger
Trigger 是一个抽象类,定义了如下所示的多个方法,每个方法在代码块中都有详细的注释
public abstract class Trigger<T, W extends Window> implements Serializable { /** * 为窗格中的每个元素调用 onElement 方法,方法的返回值决定了 * 是否窗格被执行来输出结果 */ public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception; /** * 使用 ctx 创建一个进程时间定时器,定时器触发的时候调用 */ public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception; /** * 使用 ctx 来创建事件时间定时器,定时器触发的时候调用 */ public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception; /** * 当 trigger 支持 trigger 状态合并的时候返回 true * 返回 true 代表可以与 MergingWindowAssigner 一起使用 * 同时也需要实现 onMerge 方法 */ public boolean canMerge() { return false; } /** * 多个窗口被合并成一个窗口的时候调用 */ public void onMerge(W window, OnMergeContext ctx) throws Exception { throw new UnsupportedOperationException("This trigger does not support merging."); } /** * 清除触发器为窗口保留的所有状态 * 当窗口被清除的时候调用这个方法 * 用 registerEventTimeTimer 或 registerProcessingTimeTimer 设置的定时器需要在这里被删除 * 用 getPartitionedState 获取的状态也需要被删除 */ public abstract void clear(W window, TriggerContext ctx) throws Exception; // ------------------------------------------------------------------------ /** * A context object that is given to {@link Trigger} methods to allow them to register timer * callbacks and deal with state. */ /** * 一个 trigger 上下文 * trigger 可以使用 ctx 来注册定时器以及处理 state */ public interface TriggerContext { /** * 返回当前的进程时间 */ long getCurrentProcessingTime(); /** * 返回当前的 watermark 时间 */ long getCurrentWatermark(); /** * 注册一个系统时间回调。如果当前的系统时间大于注册的时候 * onProcessingTime 被调用 */ void registerProcessingTimeTimer(long time); /** * 注册一个事件时间回调。如果当前的事件时间大于注册的时候 * onEventTime 被调用 */ void registerEventTimeTimer(long time); /** * 删除给定时间的进程时间触发器 */ void deleteProcessingTimeTimer(long time); /** * 删除给定时间的事件事件触发器 */ void deleteEventTimeTimer(long time); /** * 检索可用于与容错状态交互的状态对象,容错状态的作用域是当前触发器调用的窗口(窗口用作 namespace)和键 */ <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor); } /** * TriggerContext 的扩展 * 在 onMerge 的时候使用 */ public interface OnMergeContext extends TriggerContext { <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor); } }
-
ProcessingTimeTrigger
ProcessingTimeTrigger 是一种触发器,当机器的进程时间大于窗口的末端时触发,TriggerResult.CONTINUE 代表不触发,TriggerResult.Fire 代表触发
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> { private ProcessingTimeTrigger() {} // 以窗口的末端时间戳设置进程时间定时器 @Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) { ctx.registerProcessingTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } // 当定时器触发的时候,表明窗口可以被触发了 @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteProcessingTimeTimer(window.maxTimestamp()); } @Override public boolean canMerge() { return true; } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { // 只有当时间还没有超过合并窗口的末端时才注册定时器 long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) { ctx.registerProcessingTimeTimer(windowMaxTimestamp); } } }
-
EventTimeTrigger
EventTimeTrigger 是一种触发器,当 Watermark 的 timestamp 大于窗口的末端时触发
public class EventTimeTrigger extends Trigger<Object, TimeWindow> { private EventTimeTrigger() {} @Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // 如果 watermark 已经超过窗口末端了,立即触发定时器 return TriggerResult.FIRE; } else { // 否则注册事件时间定时器,在 onEventTime 中触发ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteEventTimeTimer(window.maxTimestamp()); } @Override public boolean canMerge() { return true; } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { // 只有当 watermark 还没有超过合并窗口的末端时才注册定时器 long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentWatermark()) { ctx.registerEventTimeTimer(windowMaxTimestamp); } } }
-
DeltaTrigger
DeltaTrigger 是一种基于 DeltaFunction 和阈值的触发器,DeltaTrigger 计算上次触发的数据点和当前到达的数据点之间的增量,如果增量高于指定阈值,则触发。DeltaTrigger 不需要注册定时器,因此 onEventTime 方法和 onProcessingTime 方法直接返回 TriggerResult.CONTINUE
由于需要和上次触发的数据点比对,出于检查点的考虑,DeltaTrigger 在构造函数中实例化了一个 ValueStateDescriptor,可以把 ValueStateDescriptor 理解为一个 key,用
ctx.getPartitionedState(ValueStateDescriptor)
可以获取存储的 State,DeltaTrigger 用 State 来存储上一次触发的数据点DeltaTrigger 不能被 merge,所以没有实现 onMerge 方法
先来看看 DeltaFunction
public interface DeltaFunction<DATA> extends Serializable { // 计算给定的两个数据点之间的增量 double getDelta(DATA oldDataPoint, DATA newDataPoint); }
再看看 DeltaTrigger 的源码
public class DeltaTrigger<T, W extends Window> extends Trigger<T, W> { private final DeltaFunction<T> deltaFunction; private final double threshold; private final ValueStateDescriptor<T> stateDesc; private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) { this.deltaFunction = deltaFunction; this.threshold = threshold; stateDesc = new ValueStateDescriptor<>("last-element", stateSerializer); } @Override public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception { ValueState<T> lastElementState = ctx.getPartitionedState(stateDesc); // 获取上次的数据状态 // 如果没有上次的元素状态,更新元素状态 if (lastElementState.value() == null) { lastElementState.update(element); return TriggerResult.CONTINUE; } // 如果上次触发的数据点和当前到达的数据点之间的增量大于域值,触发 if (deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) { lastElementState.update(element); return TriggerResult.FIRE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(W window, TriggerContext ctx) throws Exception { ctx.getPartitionedState(stateDesc).clear(); } }
-
CountTrigger
CountTrigger 是一种计数触发器,当窗口中元素数量到达给定的数值的时候,窗口被触发,和 DeltaTrigger 类似,CountTrigger 同样不依赖定时器,CountTrigger 也需要一个 ReducingStateDescriptor 来保存当前窗口中的元素数量,ReducingStateDescriptor 接收 Sum 实例,在一个元素到来的时候,
count.add(1L)
会给 State 中的值加上一public class CountTrigger<W extends Window> extends Trigger<Object, W> { private final long maxCount; private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE); private CountTrigger(long maxCount) { this.maxCount = maxCount; } @Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { // 获取当前的计数状态 ReducingState<Long> count = ctx.getPartitionedState(stateDesc); count.add(1L); // 当前的计数大于 maxCount 时候,触发 if (count.get() >= maxCount) { count.clear(); return TriggerResult.FIRE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(W window, TriggerContext ctx) throws Exception { ctx.getPartitionedState(stateDesc).clear(); } @Override public boolean canMerge() { return true; } @Override public void onMerge(W window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); } } private static class Sum implements ReduceFunction<Long> { private static final long serialVersionUID = 1L; @Override public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } }
-
ContinuousProcessingTimeTrigger
ContinuousProcessingTimeTrigger 是一种根据给定的时间间隔连续触发的触发器,ContinuousProcessingTimeTrigger 使用 ReducingStateDescriptor 接收一个 Min 实例来保存触发时间中的最小值,onProcessingTime 中会再次注册进程时间定时器
public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> { private final long interval; // 当合并的时候,我们选择所有触发时间中的最小值作为新的触发时间 private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE); private ContinuousProcessingTimeTrigger(long interval) { this.interval = interval; } @Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); timestamp = ctx.getCurrentProcessingTime(); // 第一次注册,之后都会在 onProcessingTime 的时候再次注册 if (fireTimestamp.get() == null) { long start = timestamp - (timestamp % interval); long nextFireTimestamp = start + interval; // 计算下一次触发的时间 ctx.registerProcessingTimeTimer(nextFireTimestamp); fireTimestamp.add(nextFireTimestamp); return TriggerResult.CONTINUE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); if (fireTimestamp.get().equals(time)) { fireTimestamp.clear(); fireTimestamp.add(time + interval); ctx.registerProcessingTimeTimer(time + interval); // 再次注册 return TriggerResult.FIRE; } return TriggerResult.CONTINUE; } @Override public void clear(W window, TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); long timestamp = fireTimestamp.get(); ctx.deleteProcessingTimeTimer(timestamp); // 删除最后注册的定时器 fireTimestamp.clear(); // 清空状态 } @Override public boolean canMerge() { return true; } @Override public void onMerge(W window, OnMergeContext ctx) { ctx.mergePartitionedState(stateDesc); } } private static class Min implements ReduceFunction<Long> { private static final long serialVersionUID = 1L; @Override public Long reduce(Long value1, Long value2) throws Exception { return Math.min(value1, value2); } }
-
ContinuousEventTimeTrigger
ContinuousEventTimeTrigger 和 ContinuousProcessingTimeTrigger 基本一样,这里就不介绍了,大家可以结合着代码自行看看
public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object, W> { private static final long serialVersionUID = 1L; private final long interval; // 当合并的时候,我们选择所有触发时间中的最小值作为新的触发时间 private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE); private ContinuousEventTimeTrigger(long interval) { this.interval = interval; } @Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // if the watermark is already past the window fire immediately // 如果 watermark 已经大于窗口末端,立即触发 return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); } ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); if (fireTimestamp.get() == null) { long start = timestamp - (timestamp % interval); long nextFireTimestamp = start + interval; ctx.registerEventTimeTimer(nextFireTimestamp); fireTimestamp.add(nextFireTimestamp); } return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception { if (time == window.maxTimestamp()){ return TriggerResult.FIRE; } ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc); Long fireTimestamp = fireTimestampState.get(); if (fireTimestamp != null && fireTimestamp == time) { fireTimestampState.clear(); fireTimestampState.add(time + interval); ctx.registerEventTimeTimer(time + interval); return TriggerResult.FIRE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(W window, TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); Long timestamp = fireTimestamp.get(); if (timestamp != null) { ctx.deleteEventTimeTimer(timestamp); fireTimestamp.clear(); } } @Override public boolean canMerge() { return true; } @Override public void onMerge(W window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get(); if (nextFireTimestamp != null) { ctx.registerEventTimeTimer(nextFireTimestamp); } } } private static class Min implements ReduceFunction<Long> { private static final long serialVersionUID = 1L; @Override public Long reduce(Long value1, Long value2) throws Exception { return Math.min(value1, value2); } }
Evictor 用于在窗口函数执行前后删除部分窗格内的元素
-
Evictor
Evictor 是一个抽象类,定义了如下所示的多个方法,每个方法在代码块中都有详细的注释
public interface Evictor<T, W extends Window> extends Serializable { /** * 驱逐部分元素,在窗口函数执行前调用 */ void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); /** * 驱逐部分元素,在窗口函数执行后调用 */ void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); /** * Evictor 方法的上下文对象 */ interface EvictorContext { /** * 返回当前的进程时间 */ long getCurrentProcessingTime(); /** * 返回当前的 watermark */ long getCurrentWatermark(); } }
-
TimeEvictor
TimeEvictor 针对元素的时间戳判断是否驱逐,TimeEvictor 有一个 windowSize 的属性,用于定时窗口最大能保存多少时间范围的元素,在 evict 方法中,TimeEvictor 从所有的元素中获取最大时间戳(currentTime),currentTime - windowSize 就是保留的元素的最低时间限制,然后 TimeEvictor 遍历所有元素,remove 所有时间戳小于 currentTime - windowSize 的元素
private final long windowSize; // 窗口大小 private final boolean doEvictAfter; // 在窗口函数执行前还是后执行驱逐操作 // Evictor 抽象类的 evictBefore 和 evictAfter 方法都会调用 evict 方法 private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) { // 因为这个是 TimeEvictor,是需要根据 windowSize 和当前 elements 中最大的 ts 来决定 // 驱逐哪些元素的 if (!hasTimestamp(elements)) { return; } long currentTime = getMaxTimestamp(elements); long evictCutoff = currentTime - windowSize; // �currentTime 是元素集中时间戳最大的,windowSize 是窗口的大小 // evictCutoff 是能够存活的最低的界限 for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) { TimestampedValue<Object> record = iterator.next(); if (record.getTimestamp() <= evictCutoff) { iterator.remove(); } } }
-
DeltaEvictor
DeltaEvictor 通过 deltaFunction 计算每个元素与最后一个元素的差值,当差值大于 threshold 的时候,remove 该元素
DeltaFunction<T> deltaFunction; // 计算两个元素的差值的函数 private double threshold; // 窗口能接受最大的差值 private final boolean doEvictAfter; // 在窗口函数执行前还是后执行驱逐操作 // Evictor 抽象类的 evictBefore 和 evictAfter 方法都会调用 evict 方法 private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) { // 以 elements 中最后一个元素作为标杆,当 elements 中的元素与最后一个元素的 delta 数值 // 高于限制,remove 该元素 TimestampedValue<T> lastElement = Iterables.getLast(elements); for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){ TimestampedValue<T> element = iterator.next(); if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) { iterator.remove(); } } }
-
CountEvictor
CountEvictor 指代窗口最多能够保留 maxCount 数量的元素,如果元素的数量大于 maxCount,会从元素集合的开头开始 remove 元素,直到元素的数量等于 maxCount
private final long maxCount; // 最多保留多少数量的 record private final boolean doEvictAfter; // 在窗口函数执行前还是后执行驱逐操作 // 相当于一个滑动窗口,当大小超出 maxCount 的时候,从迭代器的开端 remove record private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) { if (size <= maxCount) { return; } else { int evictedCount = 0; for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){ iterator.next(); evictedCount++; if (evictedCount > size - maxCount) { // 当驱逐数量够了的时候,break break; } else { iterator.remove(); } } } }
这篇文章给大家讲解了一下几个非常重要的类,在之后的文章中,大家可以看到,在窗口操作中,这些类扮演了非常重要的角色