# Transforms with Ray Data

Ray Data (https://docs.ray.io/en/latest/data/data.html) is Ray's data service library for machine learning workloads.  Since this workshop is centered around data transformation, we're going to skip a lot of things that make it very intriguing, such as serving up data across clusters for Data Distributed Parallel training, substituting or augmenting data loaders for libraries like PyTorch, import from various sources like Huggingface, Iceberg, or more.

We'll start simple, and build from our parquet hive we created back in section 3 (you'll need that to have been done for this section to work).


In [None]:
import ray
from pathlib import Path

# just in case /tmp writes to memory, let's use the current
# directory to store spilled values
# this version writes to $HOME/ray; make sure to delete it afterwards.
# feel free to change this to whatever you want; the default
# is usually /tmp/ray
current_directory= Path.home().resolve() / "ray"
# initialize ray... again.  Note--if you have not shut down the kernel from the previous notebook, it will still be running and this may connect
# to it... which is fine here but use caution.  If you remove `address="auto"` it will start a new cluster.
ray.init(_temp_dir = str(current_directory), include_dashboard=True, ignore_reinit_error=True)

Ray data can read from various sources quite effectively, but generally can't tie them together like duckdb can.  Unfortunately even though ray data supports reading from SQL sources, a simple connection to duckdb tries to materialize queries all at once which can rapidly overload memory, and is not what we want here, so we'll read the hive files directly (which is supported).  I wouldn't be surprised to see a good interface for this soon.

It should be noted that the Ray creators view Ray Data as "last mile" processing rather than a generic ETL engine (see https://discuss.ray.io/t/ray-is-not-meant-as-general-etl-tool/9826/11), and I think that was the original intent.  But spark can be tricky to set up and manage, and several libraries can be built or used on top of ray (Modin, Dask); even spark itself works under Ray (see the RayDP project, https://github.com/oap-project/raydp).  These are all worthwhile things to investigate; I wanted to look at Ray Data basics solely because it was tractable, used a minimal library set for a workshop, and provided a very different interface to DuckDB.  AWS recently used Ray Data to replace Spark in some tasks such as database compaction (https://aws.amazon.com/blogs/opensource/amazons-exabyte-scale-migration-from-apache-spark-to-ray-on-amazon-ec2/) to great effect.

The Ray and Anyscale teams have also begun devoting more resources to ramping up their data engines, so I expect movement in this area.

In [None]:
# note: ray data by default assumes hive partitioning if given a directory.  Different partitioning schema are available!
base_dataset = ray.data.read_parquet("../3_etl_with_duckdb_part_2/minilob")
base_dataset.show(limit=1)
dataset = base_dataset.limit(100_000)

By default, ray data streams, storing objects in an object store, and represents in a dictionary format.  These are not the only options, of course; it can represent batches in arrow format, pandas format, or more, and we can leverage that.  In this case we'll use pandas because it's very standard and widespread, but conversion to arrow and polars can be extremely effective as well.

Let's imagine a very simple ETL conversion:
* Convert the nanosecond timestamp to a cyclical time of day
* Calculate a midline price halfway between the buy and sell sides

*Note* One reason Ray Data is unsuitable as a general ETL engine is that it is not very efficient at queries--not *remotely* as efficient as duckdb or any other database engine would be.  It has robust filtering capabilities, and robust transformations, but a Ray Data `Dataset` will iterate through all available objects in its data pool.  If you need that, it can be very effective--if not, other options are better!  In one case for data ingestion and ETL using one set of tools took seven hours and Ray Data did it in 30 minutes using a large cluster of cheap machines.  Tailor your strategy to your use case.

In this case, we are limiting our dataset to the first 100,000 items for demonstration purposes.  That means we're not going to get all the data--not even all the symbols.  Let's see why.

In [None]:
# first, let's do a very simple aggregation to show that we only have one symbol in the first set of data
# We'll cut and paste our little timing context from earlier
import time

# let's make a simple little profiler
class Print_time:
    def __enter__(self):
        self.start = time.perf_counter()
        return self
    
    def __exit__(self, exc_type, exc_value, traceback):
        self.end = time.perf_counter()
        self.elapsed = self.end - self.start
        print(f"Time taken: {self.elapsed:.4f} seconds")

def square(x: int)->int:
    time.sleep(1)
    return x*x

with Print_time():
    symbols = base_dataset.limit(10_000_000).aggregate(ray.data.aggregate.Unique("symbol"))
    print(symbols)

In [None]:
# and compare to duckdb
import duckdb

with Print_time():
    conn = duckdb.connect(":memory:")
    rel=conn.sql("SELECT DISTINCT symbol FROM read_parquet(['../3_etl_with_duckdb_part_2/minilob/**/*parquet'], hive_partitioning=true)")
    print(rel.df())

There are some *very* interesting things going on there.  First note that Ray is much slower even on a tiny subset of data; in its element, *duckdb is incredible*.  Less obvious is that Ray Data operates in a less deterministic fashion and the order of processing is not deterministic (and the resulting dataset is not ordered unless you specifically `sort` it).

On the surface, that makes Ray Data the underdog for most operations.  Let's do our ETL transformation anyway just for fun.

In [None]:
import numpy as np
Symbols = dict(NFLX=0, LSTR=1, SHLS=2, SOFI=3, CSCO=4, WING=5)

def encode_timestamp(r: dict)->dict:
    ns_per_day = 24 * 3600 * 10**9  # 86,400 seconds/day * 1e9 ns/s
    ns_in_day = r['time_stamp'] % ns_per_day
    fraction = ns_in_day / ns_per_day
    return r | dict(ts_cos = np.cos(2 * np.pi * fraction), ts_sin = np.sin(2*np.pi * fraction))

def add_midline(r: dict)->dict:
    return r | dict(midline=r['buyside_price'] + r['sellside_price'] / 2)

def encode_symbol(r: dict)->dict:
    r['symbol'] = Symbols[r['symbol']]
    return r

with Print_time():
    (dataset
    .map(encode_timestamp)
    .map(add_midline)
    .map(encode_symbol)
    ).show(limit=1)

If you're thinking "that performance is lousy; surely the row-by-row conversion to dictionaries is incredibly slow", well... you're right.  But it's useful to see how Ray Data deals with some forms of data in the background.  Luckily, there's a more efficient way to deal with data, and that is batching; Ray allows you to iterate over batches and transform them that way, and a batch itself can be transformed into various formats.  Let's look again at how to handle this sort of transformation

In [None]:
import pandas as pd
def transform_pandas_batch(df: pd.DataFrame)->pd.DataFrame:
    ns_per_day = 24 * 3600* 10**9
    ns_in_day = df['time_stamp'] % ns_per_day
    fraction = ns_in_day / ns_per_day
    
    df['ts_cos'] = np.cos(2*np.pi*fraction)
    df['ts_sin'] = np.sin(2*np.pi*fraction)
    df['midline'] = (df['buyside_price']+df['sellside_price'])/2
    df['symbol'] = df['symbol'].map(Symbols)
    return df

with Print_time():
    (dataset
    .map_batches(transform_pandas_batch, batch_format="pandas")
    ).show(limit=1)

That's much much better.  Just for reference, let's try a third way.  Polars (http://pola.rs) is a relatively new kid on the block which leverages the arrow format and, written in rust, is extremely performant.  Let's see how it stacks up.

In [None]:
import polars as pl
import pyarrow

def transform_polars_batch(t: pyarrow.Table)->pyarrow.Table:
    ns_per_day = 24 * 3600 * 10**9
    df = pl.from_arrow(t)
    result = (df
          .with_columns(
              ((pl.col("buyside_price")+pl.col("sellside_price"))/2).alias("midline"),
              np.cos(((pl.col("time_stamp")%ns_per_day)/ns_per_day)).alias("ts_cos"),
              np.sin(((pl.col("time_stamp")%ns_per_day)/ns_per_day)).alias("ts_csin"),
              pl.col("symbol").map_elements(lambda x: Symbols.get(x, None), return_dtype=int).alias("symbol")
          ))
    return result.to_arrow()

with Print_time():
    (dataset
     .map_batches(transform_polars_batch, batch_format="pyarrow")
    ).show(limit=1)

That difference in performance is interesting; sometimes at this scale it can seem longer, but generally it is shorter--and it can be quite significant, particularly as batch sizes increase and depending on the data conversino necessary.  Polars is an incredible dataframe library, superior to pandas in almost all respects; it behooves you to get familiar with it!

One last thing: why is Ray.data described as a last-mile data transformation library?  Because it can present itself easily as a data loader in many machine learning frameworks:

In [None]:
iterable = (
    dataset
    .map_batches(transform_polars_batch, batch_format="pyarrow")
    .iterator()
    .iter_torch_batches(batch_size=10)
)
next(iter(iterable))


At first blush, this doesn't look very interesting--but in the background, it has transparently mapped to torch tensors in batches.  Conversion to a torch dataloader is trivial, and when utilizing the ray training infrastructure, distribution of the dataset and appropriate sharding can be done in the background very elegantly.  This seems to be slightly in flux (Ray has recently removed the trivial `to_torch` conversion) but remains a useful way to shard datasets across a cluster for training or tuning, where `train.get_dataset_shard` can retrieve the "correct" shard of data for an individual node's (or process's) training data.

In some ways, Ray Data as an ETL engine is a bit clunky; however, it scales *extremely* well horizontally across relatively cheap nodes, making it a very suitable engine for last-mile data transformations or transformations of a total data set.  However, its lack of SQL capabilities does limit it; for those tasks, seek solutions like Spark on Ray (RayDP) or ... in some rare cases... smallpond.