In [0]:
%pip install -qU databricks-feature-engineering mlflow ray[default] ray[data] databricks-sql-connector sqlalchemy-databricks

dbutils.library.restartPython()

In [0]:
catalog = "jon_cheung"
schema = "ray_gtm_examples"
table = "data_synthetic_timeseries_mini"
write_table = "prophet_forecasts"
label="y"

## Optional: Generate a massive time-series dataset

In [0]:
%run ./generate_timeseries_data

In [0]:
# Synthetic data generation 
import pandas as pd

if not spark.catalog.tableExists(f"{catalog}.{schema}.{table}"): 
  # Create table for features
  id_sdf.write.mode('overwrite').saveAsTable(f"{catalog}.{schema}.{table}")
  print(f"... OK!")

## Ray Core with SQL Connection


In [0]:
# Get all unique group names for Ray Core for loop
uniques = spark.sql(f"SELECT DISTINCT group_name FROM {catalog}.{schema}.{table}").toPandas()
group_names = list(uniques['group_name'])

In [0]:
import ray
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

# The recommended configuration for a Ray cluster is as follows:
# - set the num_cpus_per_node to the CPU count per worker node (with this configuration, each Apache Spark worker node launches one Ray worker node that will fully utilize the resources of each Apache Spark worker node.)
# - set min_worker_nodes to the number of Ray worker nodes you want to launch on each node.
# - set max_worker_nodes to the total amount of worker nodes (this and `min_worker_nodes` together enable autoscaling)
setup_ray_cluster(
  min_worker_nodes=2,
  max_worker_nodes=8,
  num_cpus_per_node=16,
  num_gpus_worker_node=0,
  collect_log_to_path="/dbfs/Users/jon.cheung@databricks.com/ray_collected_logs",
  RAY_memory_monitor_refresh_ms=0
)

In [0]:
import pandas as pd
from databricks import sql
import pandas.io.sql as psql
from prophet import Prophet
import pickle
import mlflow
import os
from mlflow.utils.databricks_utils import get_databricks_env_vars

experiment_name = '/Users/jon.cheung@databricks.com/ray_core_prophet'
mlflow.set_experiment(experiment_name)
mlflow_db_creds = get_databricks_env_vars("databricks")

# Here we transform our training code to one that works with Ray. We simply add a @ray.remote decorator to the function along with some mlflow logging parameters for a nested child runs
@ray.remote
def train_and_inference_prophet(group_name:str, 
                                horizon:int,
                                parent_run_id:str
                                ):
        # It's inefficient for each actor to inherit a sliver of the data from the head node. 
        # A better approach is to create a connection from each worker to our data, load, and train. 
        # The below three parameters are inherited from the SQL warehouse via these steps:
        # 1. Create an SQL warehouse (See Compute // SQL Warehouse); I used x-small for my 50k rows per model query) > Connection Details > Python
        # 2. I then put these values as environment variables under the Advanced Section for my compute cluster
        connection = sql.connect(
                        server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                        http_path = os.getenv("DATABRICKS_HTTP_PATH"),
                        access_token = os.getenv("DATABRICKS_TOKEN"))
        with connection.cursor() as cursor:
            query=f"SELECT * FROM {catalog}.{schema}.{table} WHERE group_name = '{group_name}'"
            selected_data = psql.read_sql(query, connection)

        # Set mlflow credentials and active MLflow experiment within each Ray task
        os.environ.update(mlflow_db_creds)
        mlflow.set_experiment(experiment_name)

        with mlflow.start_run(run_name = f"{group_name}",
                              parent_run_id=parent_run_id):

                dataset = mlflow.data.from_pandas(selected_data)
                mlflow.log_input(dataset)
                
                m = Prophet(daily_seasonality=True)
                m.fit(selected_data)
                future = m.make_future_dataframe(periods=horizon)
                forecast = m.predict(future)
                mlflow.prophet.log_model(pr_model=m,
                                         artifact_path="prophet_model")
        
        # Write the horizon results to the database using our SQL warehouse
        forecast['group_name'] = group_name
        to_write = forecast[['group_name', 'ds', 'yhat']].iloc[-horizon:]
        values = ",".join([str(tuple(x)) for x in to_write.values])
        with connection.cursor() as cursor:
                cursor.execute(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{write_table} (group_name string, ds date, yhat float)")
                cursor.execute(f"INSERT INTO {catalog}.{schema}.{write_table} VALUES {values}")
        
        return forecast

# Here, the call to the train_and_inference_prophet function creates an object reference.
# Using 8 workers (each with 64GB memory and 16 cores; i.e. m5.2xlarge on Azure), we can parallelize our training to 128 tasks in parallel. 
with mlflow.start_run(run_name="prophet_models_250224") as parent_run: 
        # Start parent run on the main driver process
        forecasts_obj_ref = [train_and_inference_prophet
                        .options(num_cpus=1)
                        .remote(group_name=group,
                                horizon=14,
                                parent_run_id=parent_run.info.run_id
                                ) 
                        for group in group_names]

        # We need to call ray.get() on the referenced object to create a blocking call. 
        # Blocking call is one which will not return until the action it performs is complete.
        forecasts = ray.get(forecasts_obj_ref)

In [0]:
shutdown_ray_cluster()