Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow different window assigners / time windows in WindowGraphAggregation #25

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.apache.flink.graph.Edge;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;
Expand All @@ -30,18 +32,26 @@
public class WindowGraphAggregation<K, EV, S extends Serializable, T> extends GraphAggregation<K, EV, S, T> {

private static final long serialVersionUID = 1L;
private long timeMillis;
private WindowAssigner windowAssigner;


public WindowGraphAggregation(EdgesFold<K, EV, S> updateFun, ReduceFunction<S> combineFun, MapFunction<S, T> transformFun, S initialVal, long timeMillis, boolean transientState) {
super(updateFun, combineFun, transformFun, initialVal, transientState);
this.timeMillis = timeMillis;
this(updateFun, combineFun, transformFun, initialVal, TumblingEventTimeWindows.of(Time.milliseconds(timeMillis)), transientState);
}

public WindowGraphAggregation(EdgesFold<K, EV, S> updateFun, ReduceFunction<S> combineFun, S initialVal, long timeMillis, boolean transientState) {
this(updateFun, combineFun, null, initialVal, timeMillis, transientState);
}

public WindowGraphAggregation(EdgesFold<K, EV, S> updateFun, ReduceFunction<S> combineFun, S initialVal, WindowAssigner newWindowAssigner, boolean transientState) {
this(updateFun, combineFun, null, initialVal, newWindowAssigner, transientState);
}

public WindowGraphAggregation(EdgesFold<K, EV, S> updateFun, ReduceFunction<S> combineFun, MapFunction<S, T> transformFun, S initialVal, WindowAssigner newWindowAssigner, boolean transientState) {
super(updateFun, combineFun, transformFun, initialVal, transientState);
windowAssigner = newWindowAssigner;
}

@SuppressWarnings("unchecked")
@Override
public DataStream<T> run(final DataStream<Edge<K, EV>> edgeStream) {
Expand All @@ -54,7 +64,7 @@ public DataStream<T> run(final DataStream<Edge<K, EV>> edgeStream) {
DataStream<S> partialAgg = edgeStream
.map(new InitialMapper<K, EV>()).returns(typeInfo)
.keyBy(0)
.timeWindow(Time.of(timeMillis, TimeUnit.MILLISECONDS))
.window(windowAssigner)
.fold(getInitialValue(), new PartialAgg<>(getUpdateFun())).flatMap(getAggregator(edgeStream)).setParallelism(1);

if (getTrasform() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ public void merge(DisjointSet<R> other) {

@Override
public String toString() {
return buildMap().toString();
}

public Map<R, List<R>> buildMap() {
Map<R, List<R>> comps = new HashMap<>();

for (R vertex : getMatches().keySet()) {
Expand All @@ -149,7 +153,7 @@ public String toString() {
comps.put(parent, cc);
}
}
return comps.toString();
return comps;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import org.apache.flink.graph.streaming.WindowGraphAggregation;
import org.apache.flink.graph.streaming.example.IterativeConnectedComponents;
import org.apache.flink.graph.streaming.example.util.DisjointSet;
import org.apache.flink.types.NullValue;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;


import java.io.Serializable;

Expand Down Expand Up @@ -55,6 +56,19 @@ public ConnectedComponents(long mergeWindowTime) {
super(new UpdateCC(), new CombineCC(), new DisjointSet<K>(), mergeWindowTime, false);
}

/**
* Creates a ConnectedComponents object using WindowGraphAggregation class.
* To find number of Connected Components the ConnectedComponents object is passed as an argument
* to the aggregate function of the {@link org.apache.flink.graph.streaming.GraphStream} class.
* Creating the ConnectedComponents object sets the EdgeFold, ReduceFunction, Initial Value,
* Window Assigner and Transient State for using the Window Graph Aggregation class.
*
* @param windowAssigner custom Window Assigner to be used for the merger.
*/
public ConnectedComponents(WindowAssigner windowAssigner) {
super(new UpdateCC(), new CombineCC(), new DisjointSet<K>(), windowAssigner, false);
}

/**
* Implements EdgesFold Interface, applies foldEdges function to
* a vertex neighborhood
Expand All @@ -65,7 +79,7 @@ public ConnectedComponents(long mergeWindowTime) {
*
* @param <K> the vertex ID type
*/
public final static class UpdateCC<K extends Serializable> implements EdgesFold<K, NullValue, DisjointSet<K>> {
public final static class UpdateCC<K extends Serializable, EV> implements EdgesFold<K, EV, DisjointSet<K>> {

/**
* Implements foldEdges method of EdgesFold interface for combining
Expand All @@ -82,7 +96,7 @@ public final static class UpdateCC<K extends Serializable> implements EdgesFold<
* @throws Exception
*/
@Override
public DisjointSet<K> foldEdges(DisjointSet<K> ds, K vertex, K vertex2, NullValue edgeValue) throws Exception {
public DisjointSet<K> foldEdges(DisjointSet<K> ds, K vertex, K vertex2, EV edgeValue) throws Exception {
ds.union(vertex, vertex2);
return ds;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.streaming.library;

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.graph.streaming.EdgesFold;
import org.apache.flink.graph.streaming.GraphWindowStream;
import org.apache.flink.graph.streaming.WindowGraphAggregation;
import org.apache.flink.graph.streaming.example.IterativeConnectedComponents;
import org.apache.flink.graph.streaming.example.util.DisjointSet;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;


import java.io.Serializable;

/**
* The Connected Components library method assigns a component ID to each vertex in the graph.
* Vertices that belong to the same component have the same component ID.
* This algorithm computes _weakly_ connected components, i.e. edge direction is ignored.
* <p>
* This is a window-specific connected components implementation, with a no-op reduce step.
* Each window's connected components are emitted independently, un-merged with earlier windows.
*
* @param <K> the vertex ID type
* @param <EV> the edge value type
*/
public class WindowConnectedComponents<K extends Serializable, EV> extends WindowGraphAggregation<K, EV, DisjointSet<K>, DisjointSet<K>> implements Serializable {

/**
* Creates a ConnectedComponents object using WindowGraphAggregation class.
* To find number of Connected Components the ConnectedComponents object is passed as an argument
* to the aggregate function of the {@link org.apache.flink.graph.streaming.GraphStream} class.
* Creating the ConnectedComponents object sets the EdgeFold, ReduceFunction (No-Op), Initial Value,
* MergeWindow Time and Transient State for using the Window Graph Aggregation class.
*
* @param mergeWindowTime Window time in millisec for the merger.
*/
public WindowConnectedComponents(long mergeWindowTime) {
super(new UpdateCC(), new CombineCC(), new DisjointSet<K>(), mergeWindowTime, false);
}

/**
* Creates a ConnectedComponents object using WindowGraphAggregation class.
* To find number of Connected Components the ConnectedComponents object is passed as an argument
* to the aggregate function of the {@link org.apache.flink.graph.streaming.GraphStream} class.
* Creating the ConnectedComponents object sets the EdgeFold, ReduceFunction (No-Op), Initial Value,
* Window Assigner and Transient State for using the Window Graph Aggregation class.
*
* @param windowAssigner custom Window Assigner to be used for the merger.
*/
public WindowConnectedComponents(WindowAssigner windowAssigner) {
super(new UpdateCC(), new CombineCC(), new DisjointSet<K>(), windowAssigner, false);
}

/**
* Implements EdgesFold Interface, applies foldEdges function to
* a vertex neighborhood
* The Edge stream is divided into different windows, the foldEdges function
* is applied on each window incrementally and the aggregate state for each window
* is updated, in this case it checks the connected components in a window. If
* there is an edge between two vertices then they become part of a connected component.
*
* @param <K> the vertex ID type
*/
public final static class UpdateCC<K extends Serializable, EV> implements EdgesFold<K, EV, DisjointSet<K>> {

/**
* Implements foldEdges method of EdgesFold interface for combining
* two edges values into same type using union method of the DisjointSet class.
* In this case it computes the connected components in a partition by
* by checking which vertices are connected checking their edges, all the connected
* vertices are assigned the same component ID.
*
* @param ds the initial value and accumulator
* @param vertex the vertex ID
* @param vertex2 the neighbor's ID
* @param edgeValue the edge value
* @return The data stream that is the result of applying the foldEdges function to the graph window.
* @throws Exception
*/
@Override
public DisjointSet<K> foldEdges(DisjointSet<K> ds, K vertex, K vertex2, EV edgeValue) throws Exception {
ds.union(vertex, vertex2);
return ds;
}
}

/**
* Implements the ReduceFunction Interface, applies reduce function to
* combine group of elements into a single value.
* The aggregated states from different windows are combined together
* and reduced to a single result.
* In this case the values of the vertices belonging to Connected Components form
* each window are merged to find the Connected Components for the whole graph.
*/
public static class CombineCC<K extends Serializable> implements ReduceFunction<DisjointSet<K>> {

/**
* Implements a no-op reduce method of ReduceFunction interface.
*
* See {@link org.apache.flink.graph.streaming.GraphAggregation.Merger#flatMap}
* for the order of arguments.
*
* @param s1 The first value to combine.
* @param s2 The second value to combine.
* @return The combined value of both input values.
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
@Override
public DisjointSet<K> reduce(DisjointSet<K> s1, DisjointSet<K> s2) throws Exception {
// return the new state, discard the old
return s1;
}
}
}







Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.apache.flink.graph.streaming.example.test;

import org.apache.flink.graph.Edge;
import org.apache.flink.graph.streaming.GraphStream;
import org.apache.flink.graph.streaming.SimpleEdgeStream;
import org.apache.flink.graph.streaming.example.util.DisjointSet;
import org.apache.flink.graph.streaming.library.ConnectedComponents;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.types.NullValue;
import org.junit.Assert;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class ConnectedComponentsSlidingWindowTest extends StreamingProgramTestBase {
public static final String Connected_RESULT =
"{1=[1, 2, 3]}\n"
+ "{1=[1, 2, 3, 5]}\n"
+ "{1=[1, 2, 3, 5], 7=[6, 7]}\n"
+"{1=[1, 2, 3, 5], 7=[6, 7], 9=[8, 9]}\n"
+"{1=[1, 2, 3, 5], 7=[6, 7], 9=[8, 9]}";
protected String resultPath;

@SuppressWarnings("serial")
private static DataStream<Edge<Long, NullValue>> getGraphStream(StreamExecutionEnvironment env) {
return env.fromCollection(getEdges());
}

public static final List<Edge<Long, NullValue>> getEdges() {
List<Edge<Long, NullValue>> edges = new ArrayList<>();
edges.add(new Edge<>(1L, 2L, NullValue.getInstance()));
edges.add(new Edge<>(1L, 3L, NullValue.getInstance()));
edges.add(new Edge<>(2L, 3L, NullValue.getInstance()));
edges.add(new Edge<>(1L, 5L, NullValue.getInstance()));
edges.add(new Edge<>(6L, 7L, NullValue.getInstance()));
edges.add(new Edge<>(8L, 9L, NullValue.getInstance()));
return edges;
}

@Override
protected void preSubmit() throws Exception {
setParallelism(1); //needed to ensure total ordering for windows
resultPath = getTempDirPath("output");
}

@Override
protected void postSubmit() throws Exception {
String expectedResultStr = Connected_RESULT;
String[] excludePrefixes = new String[0];
ArrayList<String> list = new ArrayList<String>();
readAllResultLines(list, resultPath, excludePrefixes, false);
String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n");
ArrayList<String> expectedArr = new ArrayList<String>(Arrays.asList(expected));
Assert.assertEquals("Different number of lines in expected and obtained result.", expectedArr.size(), list.size());
Assert.assertEquals(expectedArr, list);
}

@Override
protected void testProgram() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Edge<Long, NullValue>> edges = getGraphStream(env);
GraphStream<Long, NullValue, NullValue> graph = new SimpleEdgeStream<>(
edges,
new AscendingTimestampExtractor<Edge<Long, NullValue>>(){
@Override
public long extractAscendingTimestamp(Edge<Long, NullValue> edge) {
return edge.getTarget();
}
}, env);
SlidingEventTimeWindows windowAssigner = SlidingEventTimeWindows.of(Time.milliseconds(4), Time.milliseconds(2));
DataStream<DisjointSet<Long>> cc = graph.aggregate(new ConnectedComponents<Long, NullValue>(windowAssigner));
cc.writeAsText(resultPath);
env.execute("Streaming Connected ComponentsCheck");
}
}

Loading