# Use Ray in PySpark

Spark + AI Summit 2020 talk, 
[Dean Wampler](mailto:dean@anyscale.com)

This notebook demonstrates one way to integrate Ray and PySpark applications, where Ray is embedded in a _UDF_. The use case simulates the requirement for _data governance_, where we want to trace each record processed by a PySpark job. 

Another, more conventional way to meet this requirement is to run a separate webservice and make remote calls to it (usually over HTTP). This approach is demonstrated in the `ray-serve` directory. (See the [README](README.md) for details.)

This notebook embeds Ray in a UDF, where the Ray cluster is co-resident on the same nodes as PySpark. We'll actually just use a single machine, but the results generalize to real cluster deployments with minor changes (noted where applicable).

Why use this approach instead of the standalone system? Here are the pros and cons:

**Pros:**
* Avoiding a network/HTTP call may be more efficient in many cases.
* Fewer services to manage. Once PySpark and Ray clusters are setup, you can allow them to do all the scaling and distribution required. Spark handles the data partitions, Ray handles distribution of the other tasks and object graphs (for distributed state).

**Cons:**
* You might prefer explicitly separate services for runtime visibility and independent management. For example, it's easier to upgrade a separate web service behind a router, whereas in the example here the PySpark and Data Governance "hook" are more closely linked.

You can learn more about Ray [here](http://ray.io).

> **Note:** Requires Java 8!

In [1]:
!java -version

java version "1.8.0_221"
Java(TM) SE Runtime Environment (build 1.8.0_221-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.221-b11, mixed mode)


In [2]:
import json, time
import pyspark
import ray
from ray.util import named_actors

In [3]:
from pyspark.sql.types import DataType, BooleanType, NullType, IntegerType, StringType, MapType

In [4]:
from pyspark.sql.functions import udf

Define a `DataGovernanceSystem` Ray actor that represents our governance system. (This is also defined in the file `data_governance_system.py`.) All it does is add each reported `id` to an internal collection. 

In a more realistic implementation, this class would be a "hook" that forwards the ids and other useful metadata asynchronously to a real governance system, like [Apache Atlas](http://atlas.apache.org/#/). 

In [5]:
@ray.remote
class DataGovernanceSystem:
    def __init__(self, name = 'DataGovernanceSystem'):
        self.name = name
        self.ids = []
        self.start_time = time.time()

    def log(self, id_to_log):
        """
        Log record ids that have been processed.
        Returns the new count.
        """
        self.ids.append(id_to_log)
        return self.get_count()

    def get_ids(self):
        """Return the ids logged. Don't call this if the list is long!"""
        return self.ids

    def get_count(self):
        """Return the count of ids logged."""
        return len(self.ids)

    def reset(self):
        """Forget all ids that have been logged."""
        self.ids = []

    def get_start_time(self):
        return self.start_time

    def get_up_time(self):
        return time.time() - self.start_time

Define a simple `Record` type with a `record_id` field, used for logging to `DataGovernanceSystem`, and an opaque `data` field with everything else.

In [6]:
class Record:
    def __init__(self, record_id, data):
        self.record_id = record_id
        self.data = data
    def __str__(self):
        return f'Record(record_id={self.record_id},data={self.data})'

Now initialize Ray. We'll run it locally, but in a Ray cluster, passing `address='auto'` tells Ray to connect to the cluster. (If this node isn't part of that cluster, i.e., Ray isn't already running on this node, then pass the correct server address and port.)

In [7]:
ray.init(ignore_reinit_error=True) # The `ignore_reinit_error=True` lets us rerun this cell without error...

2020-05-18 07:35:17,052	INFO resource_spec.py:212 -- Starting Ray with 4.3 GiB memory available for workers and up to 2.17 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-05-18 07:35:17,419	INFO services.py:1170 -- View the Ray dashboard at [1m[32mlocalhost:8265[39m[22m


{'node_ip_address': '192.168.1.149',
 'raylet_ip_address': '192.168.1.149',
 'redis_address': '192.168.1.149:57936',
 'object_store_address': '/tmp/ray/session_2020-05-18_07-35-17_042246_77727/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2020-05-18_07-35-17_042246_77727/sockets/raylet',
 'webui_url': 'localhost:8265',
 'session_dir': '/tmp/ray/session_2020-05-18_07-35-17_042246_77727'}

In [8]:
print(f'Click here to open the Ray Dashboard: http://{ray.get_webui_url()}')

Click here to open the Ray Dashboard: http://localhost:8265


In [9]:
actor_name = 'dgs'
gov = DataGovernanceSystem.remote(actor_name)
named_actors.register_actor(actor_name, gov)
gov

Actor(DataGovernanceSystem, 45b95b1c0100)

In [10]:
test_records = [Record(i, f'data: {i}') for i in range(3)] 
for record in test_records:
    print(record)
    gov.log.remote(record.record_id)

Record(record_id=0,data=data: 0)
Record(record_id=1,data=data: 1)
Record(record_id=2,data=data: 2)


In [11]:
def gov_status():
    gov = named_actors.get_actor(name='dgs')
    print(f'count:   {ray.get(gov.get_count.remote())}')
    print(f'ids:     {ray.get(gov.get_ids.remote())}')
    print(f'up time: {ray.get(gov.get_up_time.remote())}')

In [12]:
gov_status()

count:   3
ids:     [0, 1, 2]
up time: 7.880530118942261


Reset the server:

In [13]:
gov.reset.remote()
gov_status()

count:   0
ids:     []
up time: 9.221529960632324


In [14]:
def log_record(id):
    """
    This function will become a UDF for Spark. Since each Spark task runs in a separate process, 
    we'll initialize Ray, connecting to the running cluster, if it is not already initialized.
    """
    from ray.util import named_actors
    did_initialization = 0
    if not ray.is_initialized():
        ray.init(address='auto', redis_password='5241590000000000')
        did_initialization = 1
        
    gov = named_actors.get_actor(name='dgs')
    count_id = gov.log.remote(id)   # Runs asynchronously, returning an object id for a future.
    count = ray.get(count_id)       # But this blocks!
    return {'initialized': did_initialization, 'count': count}

In [15]:
spark = pyspark.sql.SparkSession.builder \
    .master("local[*]") \
    .appName("Data Governance Example") \
    .getOrCreate()

In [16]:
log_record_udf = udf(lambda id: log_record(id), MapType(StringType(), IntegerType()))

In [17]:
num_records=50

In [18]:
records = [Record(i, f'str: {i}') for i in range(num_records)] 

In [19]:
df = spark.createDataFrame(records, ['id', 'data'])

In [20]:
df_ray = df.select('id', 'data', log_record_udf('id').alias('logged'))

In [21]:
display(df_ray)

DataFrame[id: string, data: bigint, logged: map<string,int>]

In [22]:
%time df_ray.show(n=num_records, truncate=False)

+-------+----+-------------------------------+
|id     |data|logged                         |
+-------+----+-------------------------------+
|str: 0 |0   |[count -> 1, initialized -> 1] |
|str: 1 |1   |[count -> 2, initialized -> 0] |
|str: 2 |2   |[count -> 3, initialized -> 0] |
|str: 3 |3   |[count -> 4, initialized -> 0] |
|str: 4 |4   |[count -> 5, initialized -> 0] |
|str: 5 |5   |[count -> 6, initialized -> 0] |
|str: 6 |6   |[count -> 13, initialized -> 1]|
|str: 7 |7   |[count -> 14, initialized -> 0]|
|str: 8 |8   |[count -> 15, initialized -> 0]|
|str: 9 |9   |[count -> 16, initialized -> 0]|
|str: 10|10  |[count -> 18, initialized -> 0]|
|str: 11|11  |[count -> 21, initialized -> 0]|
|str: 12|12  |[count -> 20, initialized -> 1]|
|str: 13|13  |[count -> 23, initialized -> 0]|
|str: 14|14  |[count -> 25, initialized -> 0]|
|str: 15|15  |[count -> 27, initialized -> 0]|
|str: 16|16  |[count -> 29, initialized -> 0]|
|str: 17|17  |[count -> 30, initialized -> 0]|
|str: 18|18  

As you can see in the `logged` column, there are several PySpark processes (four on my laptop), each of which initializes Ray once.

You probably also see that the `count` values are out of order, because updates happen asynchronously from several PySpark tasks to the single `DataGovernanceSystem` actor, but Ray's actor model handles thread-safe updates, so that the final count is correct! 

In [23]:
gov_status()

count:   50
ids:     ['str: 0', 'str: 1', 'str: 2', 'str: 3', 'str: 4', 'str: 5', 'str: 18', 'str: 19', 'str: 20', 'str: 21', 'str: 22', 'str: 23', 'str: 6', 'str: 7', 'str: 8', 'str: 9', 'str: 24', 'str: 10', 'str: 25', 'str: 12', 'str: 11', 'str: 26', 'str: 13', 'str: 27', 'str: 14', 'str: 28', 'str: 15', 'str: 29', 'str: 16', 'str: 17', 'str: 30', 'str: 31', 'str: 36', 'str: 42', 'str: 32', 'str: 37', 'str: 43', 'str: 33', 'str: 38', 'str: 44', 'str: 34', 'str: 39', 'str: 45', 'str: 35', 'str: 40', 'str: 46', 'str: 41', 'str: 47', 'str: 48', 'str: 49']
up time: 150.6628451347351


In [24]:
gov.reset.remote()
gov_status()

count:   0
ids:     []
up time: 151.60465502738953
