Italian, pronounced:
/ˈmare/
. Noun:Sea
.
MaRe (formerly EasyMapReduce) leverages the power of Docker and Spark to run and scale your serial tools in MapReduce fashion.
20 minutes introduction video:
MaRe has been developed with scientific application in mind. High-throughput methods produced massive datasets in the past decades, and using frameworks like Spark and Hadoop is a natural choice to enable high-throughput analysis. In scientific applications, many tools are highly optimized to resemble, or detect some phenomena that occur in a certain system. Hence, sometimes the effort of reimplementing scientific tools in Spark or Hadoop can't be sustained by research groups. MaRe aims to provide the means to run existing serial tools in MapReduce fashion. Since many of the available scientific tools are trivially parallelizable, MapReduce is an excellent paradigm that can be used to parallelize the computation.
Scientific tools often have many dependencies and, generally speaking, it's difficult for the system administrator to maintain software, which may be installed on each node of the cluster, in multiple version. Therefore, instead of running commands straight on the compute nodes, MaRe starts a user-provided Docker image that wraps a specific tool and all of its dependencies, and it runs the command inside the Docker container. The data goes from Spark through the Docker container, and back to Spark after being processed, via Unix files. If the TMPDIR
environment variable in the worker nodes points to a tmpfs very little overhead should occur.
DNA can be represented as a string written in a language of 4 characters: a,t,g,c
. Counting how many times g
and c
occur in a genome is a task that is often performed in genomics. In this example we use MaRe to perform this task in parallel with POSIX commands.
val rdd = sc.textFile("genome.txt")
val res = new MaRe(rdd)
.map(
inputMountPoint = TextFile("/dna"),
outputMountPoint = TextFile("/count"),
imageName = "busybox:1",
command = "grep -o '[gc]' /dna | wc -l > /count")
.reduce(
inputMountPoint = TextFile("/counts"),
outputMountPoint = TextFile("/sum"),
imageName = "busybox:1",
command = "awk '{s+=$1} END {print s}' /counts > /sum")
.rdd.collect()(0)
println(s"The GC count is: $res")
In the previous example we work with single text file (genome.txt
), which is splitted line by line and partitioned through the executors. MaRe also supports working with multiple text or binary files. The following example does the GC count from a set of gzipped DNA strings.
val rdd = sc.binaryFiles("genome_*.gz")
.map { case (path, data) => (path, data.toArray) }
val res = new MaRe(rdd)
.map(
inputMountPoint = BinaryFiles("/zipped"),
outputMountPoint = WholeTextFiles("/counts"),
imageName = "busybox:1",
command =
"""
for filename in /zipped/*; do
out=$(basename "${filename}" .gz)
gunzip -c $filename | grep -o '[gc]' /dna | wc -l > /counts/${out}.sum
done
""")
.reduce(
inputMountPoint = WholeTextFiles("/counts"),
outputMountPoint = WholeTextFiles("/sum"),
imageName = "busybox:1",
command = "awk '{s+=$1} END {print s}' /counts/*.sum > /sum/${RANDOM}.sum")
.rdd.collect()(0)
println(s"The GC count is: $res")
MaRe comes as a Scala library that you can use in your Spark applications. Please keep in mind that when submitting MaRe applications, Docker needs to be installed and properly configured on each worker node of your Spark cluster. Also, the user that runs the Spark job needs to be in the Docker group.
MaRe is packaged and distributed with Maven, all you have to do is to add its dependency to your pom.xml file:
<dependencies>
...
<dependency>
<groupId>se.uu.it</groupId>
<artifactId>mare</artifactId>
<version>0.4.0</version>
</dependency>
...
</dependencies>
API documentation is available here: https://mcapuccini.github.io/MaRe/scaladocs/.
Capuccini, Marco, et al. "MaRe: Processing Big Data with application containers on Apache Spark." GigaScience 9.5 (2020): giaa042.
@article{capuccini2020mare,
title={MaRe: Processing Big Data with application containers on Apache Spark},
author={Capuccini, Marco and Dahl{\"o}, Martin and Toor, Salman and Spjuth, Ola},
journal={GigaScience},
volume={9},
number={5},
pages={giaa042},
year={2020},
publisher={Oxford University Press}
}