# Connect Spark with Redis Cloud
Spark and Redis together unlock powerful capabilities for data professionals. This guide demonstrates how to integrate these technologies for enhanced analytics, real-time processing, and machine learning applications.

In this hands-on notebook, you'll learn how to make efficient use of Redis data structures alongside Spark's distributed computing framework. You'll see firsthand how to extract data from Redis, process it in Spark, and write results back to Redis for application use. Key topics include:
1. Setting up the Spark-Redis connector in Databricks
2. Writing data to Redis from Spark
3. Reading data from Redis for application access

## Databricks Cluster Setup with Redis Connector

1. Set up a new Databricks cluster
1. Go to the cluster's **Libraries** section
1. Select **Install New**
1. Choose **Maven** as your source and click **Search Packages**
1. Enter `redis-spark-connector` and select `com.redis:redis-spark-connector:x.y.z`
1. Finalize by clicking **Install** <br/>
Want to explore the connector's full capabilities? Check the [detailed documentation](https://redis-field-engineering.github.io/redis-spark)


## Setting Up Redis Cloud Environment

Redis Cloud offers a fully-managed Redis service ideal for this integration. Follow these steps:

1. Register for a [Redis Cloud account](https://redis.io/cloud/)
1. Follow the [quickstart guide](https://redis.io/docs/latest/operate/rc/rc-quickstart/) to create a free tier database
1. From your Redis Cloud database dashboard in the Configuration tab click on Import. In the dialog window select Google Cloud Storage as the source type, paste the following in the source path: [gs://jrx/nobels.rdb](gs://jrx/nobels.rdb), and finally click Import.
1. Examine the keys you've imported using **Redis Insight**, Redis' visual data browser. From your Redis Cloud database dashboard, click on **Redis Insight** and explore the data we imported in the previous step.  

## Configuring Spark with Redis Connection Details

1. From your Redis Cloud database dashboard, find your connection endpoint under **Connect**. The string follows this pattern: `redis://<user>:<pass>@<host>:<port>`
1. In Databricks, open your cluster settings and locate **Advanced Options**. Under **Spark** in the **Spark config** text area, add your Redis connection string as both `spark.redis.read.connection.uri redis://...` and `spark.redis.write.connection.uri redis://...` parameters. This configuration applies to all notebooks using this cluster. Note that it is recommended to use secrets to store sensitive Redis URIs. Refer to the [Redis Spark documentation](https://redis-field-engineering.github.io/redis-spark/#_databricks) for more details.




## Reading from Redis

To read data from Redis run the following code:


In [0]:
df = spark.read.format("redis").option("keyType", "hash").load()
display(df)

## Reading from Redis in Streaming Mode

The following code reads data from a Redis stream, appending data to a streaming in-memory dataframe.

In [0]:
import time
streamDf = spark.readStream.format("redis").option("type", "stream").option("streamKey", "stream:nobels").load()
query = streamDf.writeStream.format("memory").trigger(continuous="1 second").queryName("nobels").start()
time.sleep(3)
streamDs = spark.sql("select * from nobels")
display(streamDs)

## Writing to Redis

1. Let's use the `df` data we imported earlier and write it back to Redis as JSON.
1. Refresh **Redis Insight** and notice the new JSON keys prefixed with `spark:write`.

In [0]:
df.write.format("redis").option("type", "json").option("keyspace", "spark:write:nobel").option("key", "id").mode("append").save()

## Writing to Redis in Streaming Mode

We can also write to Redis in streaming mode.
1. Replace `<catalog>`, `<schema>`, and `<volume>` with names for a Unity Catalog volume and run the code below.
2. In **Redis Insight** refresh the database and notice the new hash keys prefixed with `spark:writeStream`


In [0]:
catalog = "<catalog>"
schema = "<schema>"
volume = "<volume>"

path_volume = f"/Volumes/{catalog}/{schema}/{volume}"
dbutils.fs.cp("http://storage.googleapis.com/jrx/nobels.jsonl", f"{path_volume}/nobels.json")

checkpoint_dir = f"{path_volume}/checkpoint"
dbutils.fs.mkdirs(checkpoint_dir)

spark.readStream.schema("id INT, year INT, category STRING, share INT, firstName STRING, lastName STRING, motivation STRING").json(f"{path_volume}/*.json") \
     .writeStream.format("redis").outputMode("append") \
                 .option("type", "hash") \
                 .option("keyspace", "spark:writeStream:nobel") \
                 .option("key", "id") \
                 .option("checkpointLocation", checkpoint_dir) \
                 .start()


<pyspark.sql.streaming.query.StreamingQuery at 0x7fd64dc7a4e0>