Rheem is an efficient and scalable distributed data processing framework developed by the data analytics group at Qatar Computing Research Institute. It has three distinctive features:
- it allows users to easily specify their jobs with easy-to-use interfaces,
- it provides developers with opportunities to optimize performance in different ways, and
- it can run on any execution platform, such as Spark or MapReduce and combinations of those.
- You can download the latest (v0.1) from here (with spark 1.6 included), or here (no spark).
- This version v0.1 provides the platform independence feature (looping and cross-platform optimization features are coming very soon, keep tuned!)
- The source code can be found here
To be able to run a Rheem application, the following software is needed:
- Java 1.8
- Apache Maven
- Include the rheem jar files into your project.
- In case spark is needed; Spark (v1.6 and over), hadoop (v2.2 to v2.6.2)
- Java (standalone JVM)
- Apache Spark
- Coming soon:
- Include the rheem jar as a library in your application.
- Steps for writing a rheem application:
- Define a Rheem plan using rheem operators. For a list of all currently supported rheem operators check the api documentation
- Create a rheem context.
- Register required platforms with rheem context. You might want to include an "app.properties" file in the root directory of your application to set the platform specific properties (see below).
- Execute Rheem plan.
# app.properties file
spark.master = local
spark.appName= myapp
You can download the complete source of the examples from here.
In this simple example, we take a text file iterate through its stream of lines, perform a String UpperCase operation on each line, and output the result to standard Java output. To define a Rheem plan for this example, we (i) instantiate a new RheemPlan, (ii) create new operators instances (lines 4 to 11), (iii) connect operators together (lines 13-15), and (iv) add a sink to the RheemPlan instance. Additionally, we create a Rheem context and register the required platforms (lines 21 and 22). Notice that for this simple example we only registered the Standalone JVM platform. Finally, we execute the RheemPlan instance via the RheemContext (line 25).
// Build the RheemPlan that reads from a text file as source,
// performs an uppercase on all characters and output to a localcallback sink
// Create a plan
RheemPlan rheemPlan = new RheemPlan();
// Define the operators.
TextFileSource textFileSource = new TextFileSource(inputFileUrl);
MapOperator<String, String> upperOperator = new MapOperator<>(
String::toUpperCase, String.class, String.class
);
LocalCallbackSink<String> stdoutSink = LocalCallbackSink.createStdoutSink(String.class);
// Connect the operators together.
textFileSource.connectTo(0, upperOperator, 0);
upperOperator.connectTo(0, stdoutSink, 0);
// Add a sink to the rheem plan.
rheemPlan.addSink(stdoutSink);
// Instantiate Rheem context and register the backends.
RheemContext rheemContext = new RheemContext();
rheemContext.register(JavaPlatform.getInstance());
//Execute the plan
rheemContext.execute(rheemPlan);
In this WordCount example, we first use a FlatMap operator to split each line to a set of words and then a Map operator to transform each word into lowercase and output a pair of the form (word, 1). Then, we use a ReduceBy operator to group the pairs using the word as the key and add their occurences. The operators are then connected via the connectTo() function to form a Rheem plan and the plan is executed. Note that the same example could be done without the Map operator, however, we show here the use of the Map operator. Also note that in this example we registered 2 platforms (lines 3-4), which means that for an optimal execution time, the Rheem optimizer will choose between the 2 platforms when executing each operator.
// Instantiate Rheem and activate the backend.
RheemContext rheemContext = new RheemContext();
rheemContext.register(JavaPlatform.getInstance());
rheemContext.register(SparkPlatform.getInstance());
TextFileSource textFileSource = new TextFileSource(inputFileUrl);
// for each line (input) output an iterator of the words
FlatMapOperator<String, String> flatMapOperator = new FlatMapOperator<>(
new FlatMapDescriptor<>(line -> Arrays.asList(line.split(" ")), String.class, String.class));
// for each word transform it to lowercase and output a key-value pair (word, 1)
MapOperator<String, Tuple2> mapOperator = new MapOperator<>(
new TransformationDescriptor<>(word -> new Tuple2<>(word.toLowerCase(), 1),
String.class,
Tuple2.class));
// groupby the key (word) and add up the values (frequency)
ReduceByOperator<Tuple2, String> reduceByOperator = new ReduceByOperator<>(
new TransformationDescriptor<>(pair -> ((Tuple2<String, Integer>)pair).field0, Tuple2.class, String.class),
new ReduceDescriptor<>(
((a, b) -> {
((Tuple2<String, Integer>)a).field1 += ((Tuple2<String, Integer>)b).field1;
return a;
}), Tuple2.class));
// write results to a sink
List<Tuple2> results = new ArrayList<>();
LocalCallbackSink<Tuple2> sink = LocalCallbackSink.createCollectingSink(results, Tuple2.class);
// Build Rheem plan by connecting operators
textFileSource.connectTo(OUTPUT0, flatMapOperator, INPUT0);
flatMapOperator.connectTo(OUTPUT0, mapOperator, INPUT0);
mapOperator.connectTo(OUTPUT0, reduceByOperator, INPUT0);
reduceByOperator.connectTo(OUTPUT0, sink, INPUT0);
// Have Rheem execute the plan and print results.
rheemContext.execute(new RheemPlan(sink));
System.out.println("results:" + results);
Unless explicitly stated otherwise all files in this repository are licensed under the Apache Software License 2.0
Copyright 2016 Qatar Computing Research Institute
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.