A library for reading and writing data from and to Redis with Apache Spark
Spark-Redis provides access to all of Redis' data structures - String, Hash, List, Set and Sorted Set - from Spark as RDDs. The library can be used both with Redis stand-alone as well as clustered databases. When used with Redis cluster, Spark-Redis is aware of its partitioning scheme and adjusts in response to resharding and node failure events.
Spark-Redis also provides Spark-Streaming support.
You'll need the the following to use Spark-Redis:
- Apache Spark v1.4.0
- Scala v2.10.4
- Jedis v2.9
- Redis v2.8.12 or v3.0.3
- Java, Python and R API bindings are not provided at this time
- The package was only tested with the following stack:
- Apache Spark v1.4.0
- Scala v2.10.4
- Jedis v2.7 and v2.8 pre-release (see below for details)
- Redis v2.8.12 and v3.0.3
This library is work in progress so the API may change before the official release.
You can download the library's source and build it:
git clone https://github.com/RedisLabs/spark-redis.git
cd spark-redis
mvn clean package -DskipTests
Jedis' current version - v2.7 - does not support reading from Redis cluster's slave nodes. This functionality will only be included in its upcoming version, v2.8.
To use Spark-Redis with Redis cluster's slave nodes, the library's source includes a pre-release of Jedis v2.8 under the with-slaves
branch. Switch to that branch by entering the following before running mvn clean install
:
git checkout with-slaves
Add Spark-Redis to Spark with the --jars
command line option. For example, use it from spark-shell, include it in the following manner:
$ bin/spark-shell --jars <path-to>/spark-redis-<version>.jar,<path-to>/jedis-<version>.jar
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_79)
...
The following sections contain code snippets that demonstrate the use of Spark-Redis. To use the sample code, you'll need to replace your.redis.server
and 6379
with your Redis database's IP address or hostname and port, respectively.
Below is an example configuration of SparkContext with redis configuration:
import com.redislabs.provider.redis._
...
sc = new SparkContext(new SparkConf()
.setMaster("local")
.setAppName("myApp")
// initial redis host - can be any node in cluster mode
.set("redis.host", "localhost")
// initial redis port
.set("redis.port", "6379")
// optional redis AUTH password
.set("redis.auth", "")
)
The supported configuration keys are:
redis.host
- host or IP of the initial node we connect to. The connector will read the cluster topology from the initial node, so there is no need to provide the rest of the cluster nodes.redis.port
- the inital node's TCP redis port.redis.auth
- the initial node's AUTH passwordredis.db
- optional DB number. Avoid using this, especially in cluster mode.
Since data access in Redis is based on keys, to use Spark-Redis you'll first need a keys RDD. The following example shows how to read key names from Redis into an RDD:
import com.redislabs.provider.redis._
val keysRDD = sc.fromRedisKeyPattern("foo*", 5)
val keysRDD = sc.fromRedisKeys(Array("foo", "bar"), 5)
The above example populates the keys RDD by retrieving the key names from Redis that match the given pattern (foo*
) or the keys can be listed by an Array. Furthermore, it overrides the default setting of 3 partitions in the RDD with a new value of 5 - each partition consists of a set of Redis cluster hashslots contain the matched key names.
Each of Redis' data types can be read to an RDD. The following snippet demonstrates reading Redis Strings.
import com.redislabs.provider.redis._
val stringRDD = sc.fromRedisKV("keyPattern*")
val stringRDD = sc.fromRedisKV(Array("foo", "bar"))
Once run, stringRDD: RDD[(String, String)]
will contain the string values of all keys whose names are provided by keyPattern or Array[String].
val hashRDD = sc.fromRedisHash("keyPattern*")
val hashRDD = sc.fromRedisHash(Array("foo", "bar"))
This will populate hashRDD: RDD[(String, String)]
with the fields and values of the Redis Hashes, the hashes' names are provided by keyPattern or Array[String]
val listRDD = sc.fromRedisList("keyPattern*")
val listRDD = sc.fromRedisList(Array("foo", "bar"))
The contents (members) of the Redis Lists in whose names are provided by keyPattern or Array[String] will be stored in listRDD: RDD[String]
val setRDD = sc.fromRedisSet("keyPattern*")
val setRDD = sc.fromRedisSet(Array("foo", "bar"))
The Redis Sets' members will be written to setRDD: RDD[String]
.
val zsetRDD = sc.fromRedisZSetWithScore("keyPattern*")
val zsetRDD = sc.fromRedisZSetWithScore(Array("foo", "bar"))
Using fromRedisZSetWithScore
will store in zsetRDD: RDD[(String, Double)]
, an RDD that consists of members and their scores, from the Redis Sorted Sets whose keys are provided by keyPattern or Array[String].
val zsetRDD = sc.fromRedisZSet("keyPattern*")
val zsetRDD = sc.fromRedisZSet(Array("foo", "bar"))
Using fromRedisZSet
will store in zsetRDD: RDD[String]
, an RDD that consists of members, from the Redis Sorted Sets whose keys are provided by keyPattern or Array[String].
val startPos: Int = _
val endPos: Int = _
val zsetRDD = sc.fromRedisZRangeWithScore("keyPattern*", startPos, endPos)
val zsetRDD = sc.fromRedisZRangeWithScore(Array("foo", "bar"), startPos, endPos)
Using fromRedisZRangeWithScore
will store in zsetRDD: RDD[(String, Double)]
, an RDD that consists of members and the members' ranges are within [startPos, endPos] of its own Sorted Set, from the Redis Sorted Sets whose keys are provided by keyPattern or Array[String].
val startPos: Int = _
val endPos: Int = _
val zsetRDD = sc.fromRedisZRange("keyPattern*", startPos, endPos)
val zsetRDD = sc.fromRedisZRange(Array("foo", "bar"), startPos, endPos)
Using fromRedisZSet
will store in zsetRDD: RDD[String]
, an RDD that consists of members and the members' ranges are within [startPos, endPos] of its own Sorted Set, from the Redis Sorted Sets whose keys are provided by keyPattern or Array[String].
val min: Double = _
val max: Double = _
val zsetRDD = sc.fromRedisZRangeByScoreWithScore("keyPattern*", min, max)
val zsetRDD = sc.fromRedisZRangeByScoreWithScore(Array("foo", "bar"), min, max)
Using fromRedisZRangeByScoreWithScore
will store in zsetRDD: RDD[(String, Double)]
, an RDD that consists of members and the members' scores are within [min, max], from the Redis Sorted Sets whose keys are provided by keyPattern or Array[String].
val min: Double = _
val max: Double = _
val zsetRDD = sc.fromRedisZRangeByScore("keyPattern*", min, max)
val zsetRDD = sc.fromRedisZRangeByScore(Array("foo", "bar"), min, max)
Using fromRedisZSet
will store in zsetRDD: RDD[String]
, an RDD that consists of members and the members' scores are within [min, max], from the Redis Sorted Sets whose keys are provided by keyPattern or Array[String].
To write data from Spark to Redis, you'll need to prepare the appropriate RDD depending on the data type you want to use for storing the data in it.
For String values, your RDD should consist of the key-value pairs that are to be written. Assuming that the strings RDD is called stringRDD
, use the following snippet for writing it to Redis:
...
sc.toRedisKV(stringRDD)
To store a Redis Hash, the RDD should consist of its field-value pairs. If the RDD is called hashRDD
, the following should be used for storing it in the key name specified by hashName
:
...
sc.toRedisHASH(hashRDD, hashName)
Use the following to store an RDD in a Redis List:
...
sc.toRedisLIST(listRDD, listName)
Use the following to store an RDD in a fixed-size Redis List:
...
sc.toRedisFixedLIST(listRDD, listName, listSize)
The listRDD
is an RDD that contains all of the list's string elements in order, and listName
is the list's key name.
listSize
is an integer which specifies the size of the redis list; it is optional, and will default to an unlimited size.
For storing data in a Redis Set, use toRedisSET
as follows:
...
sc.toRedisSET(setRDD, setName)
Where setRDD
is an RDD with the set's string elements and setName
is the name of the key for that set.
...
sc.toRedisZSET(zsetRDD, zsetName)
The above example demonstrates storing data in Redis in a Sorted Set. The zsetRDD
in the example should contain pairs of members and their scores, whereas zsetName
is the name for that key.
Spark-Redis support streaming data from Redis instance/cluster, currently streaming data are fetched from Redis' List by the blpop
command. Users are required to provide an array which stores all the List names they are interested in. The storageLevel is MEMORY_AND_DISK_SER_2
by default, you can change it on your demand.
createRedisStream
will create a (listName, value)
stream, but if you don't care about which list feeds the value, you can use createRedisStreamWithoutListname
to get the only value
stream.
Use the following to get a (listName, value)
stream from foo
and bar
list
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import com.redislabs.provider.redis._
val ssc = new StreamingContext(sc, Seconds(1))
val redisStream = ssc.createRedisStream(Array("foo", "bar"), storageLevel = StorageLevel.MEMORY_AND_DISK_2)
redisStream.print
ssc.awaitTermination()
Use the following to get a value
stream from foo
and bar
list
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import com.redislabs.provider.redis._
val ssc = new StreamingContext(sc, Seconds(1))
val redisStream = ssc.createRedisStreamWithoutListname(Array("foo", "bar"), storageLevel = StorageLevel.MEMORY_AND_DISK_2)
redisStream.print
ssc.awaitTermination()
/**
* @param mp Map RDD of redis command, key, field, value, ttl....
*/
def toRedisCmdMap(mp: RDD[Map[String, String]])
the map contains redis command、key、value and other parameters which need.
key | type | support value | optional |
---|---|---|---|
cmd | string | hincrby,decrby,incrby,set,eval | Y |
key | string | Y | |
value | string | Y | |
field | string | N | |
ttl | string | N | |
script | string | N |
example:
...
var seq= Seq(Map(("cmd"->"set"), ("key"->"key"), ("value"->"value")),
Map(("cmd"->"set"), ("key"->"key"), ("value"->"value")))
var rdd=sc.parallelize(seq)
sc.toRedisCmdMap(rdd)
def twoEndpointExample ( sc: SparkContext) = {
val redisConfig1 = new RedisConfig(new RedisEndpoint("127.0.0.1", 6379, "passwd"))
val redisConfig2 = new RedisConfig(new RedisEndpoint("127.0.0.1", 7379))
val rddFromEndpoint1 = {
//endpoint("127.0.0.1", 6379) as the default connection in this block
implicit val c = redisConfig1
sc.fromRedisKV("*")
}
val rddFromEndpoint2 = {
//endpoint("127.0.0.1", 7379) as the default connection in this block
implicit val c = redisConfig2
sc.fromRedisKV("*")
}
}
If you want to use multiple redis clusters/instances, an implicit RedisConfig can be used in a code block to specify the target cluster/instance in that block.
You're encouraged to contribute to the open source Spark-Redis project. There are two ways you can do so.
If you encounter an issue while using the Spark-Redis library, please report it at the project's issues tracker.
Code contributions to the Spark-Redis project can be made using pull requests. To submit a pull request:
- Fork this project.
- Make and commit your changes.
- Submit your changes as a pull request.