# Locally interact wiht remote objects 
If we connect to the remote cluster from a notebook, we can interact with the remote objects we created in the training script in parallel 

In [23]:
import runhouse as rh 

num_nodes = 3
cluster_name = f"rh-{num_nodes}-dask-gcp-1_3_jupyter"

# The environment for the remote cluster
img = rh.Image(name="dask-img").install_packages([
        "dask-ml",
        "dask[distributed]",
        "dask[dataframe]",
        "gcsfs",
        "lightgbm",
    ])

cluster = rh.compute(
    name=cluster_name,
    instance_type= 'CPU:2+',
    memory = "12+",
    num_nodes=num_nodes,
    provider="gcp",
    region = "us-east1",
    image=img,
).up_if_not()
#cluster.teardown()

INFO | 2024-11-14 22:47:39 | runhouse.resources.secrets.secret:285 | Saving config for /paul/ssh-sky-key to Den
INFO | 2024-11-14 22:47:39 | runhouse.resources.secrets.secret:301 | Saving secrets for /paul/ssh-sky-key to Vault
INFO | 2024-11-14 22:47:39 | runhouse.rns.rns_client:502 | Saving config for /paul/dask-env to RNS
INFO | 2024-11-14 22:47:40 | runhouse.rns.rns_client:502 | Saving config for /paul/py-3-dask-gcp-1_2_jupyter to RNS
INFO | 2024-11-14 22:47:40 | runhouse.resources.hardware.on_demand_cluster:554 | Launching cluster with Den
INFO | 2024-11-14 22:47:40 | runhouse.resources.hardware.launcher_utils:91 | Checking cluster status
INFO | 2024-11-14 22:47:44 | runhouse.resources.hardware.launcher_utils:86 | Launching cluster
INFO | 2024-11-14 22:51:45 | runhouse.resources.hardware.launcher_utils:91 | Successfully launched cluster
INFO | 2024-11-14 22:51:46 | runhouse.resources.hardware.launcher_utils:101 | Successfully ran cluster operation via Den
verbose {'resource_subtype

In [3]:
# Use the return_model_details class on the remote object
trainer = cluster.get("my_trainer", remote = True, default = None)
print(trainer.return_model_details())

INFO | 2024-11-12 22:35:08 | runhouse.resources.hardware.ssh_tunnel:250 | Port 32300 is already in use. Trying next port.
INFO | 2024-11-12 22:35:08 | runhouse.resources.hardware.ssh_tunnel:250 | Port 32301 is already in use. Trying next port.
INFO | 2024-11-12 22:35:08 | runhouse.resources.hardware.ssh_tunnel:91 | Running forwarding command: ssh -T -L 32302:localhost:32300 -i ~/.ssh/sky-key -o Port=22 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR -o IdentitiesOnly=yes -o ExitOnForwardFailure=yes -o ServerAliveInterval=5 -o ServerAliveCountMax=3 -o ConnectTimeout=30s -o ForwardAgent=yes -o ControlMaster=auto -o ControlPath=/tmp/skypilot_ssh_6ba66db0/4b487bd908/%C -o ControlPersist=300s gcpuser@34.74.23.61
INFO | 2024-11-12 22:35:09 | runhouse.servers.http.http_client:439 | Calling my_trainer.return_model_details


INFO:httpx:HTTP Request: GET http://127.0.0.1:32302/logs/my_trainer/my_trainer_return_model_details@20241112@223509_408821/pickle "HTTP/1.1 200 OK"


INFO | 2024-11-12 22:35:09 | runhouse.servers.http.http_client:504 | Time to call my_trainer.return_model_details: 0.08 seconds
{'features': ['passenger_count', 'trip_distance', 'fare_amount', 'day', 'month', 'dayofweek', 'hour'], 'mse': np.float64(10.173834328554335), 'mae': np.float64(1.9945495515063427)}


In [4]:
# Run an inference on the remote model 
import numpy as np 
import dask.array as da
input_data = [3, 5, 110, 5, 11, 3, 6]

trainer.predict(input_data)

INFO | 2024-11-12 22:35:13 | runhouse.servers.http.http_client:439 | Calling my_trainer.predict


INFO:httpx:HTTP Request: GET http://127.0.0.1:32302/logs/my_trainer/my_trainer_predict@20241112@223513_267805/pickle "HTTP/1.1 200 OK"


------------
[36mpy-2-new-gcp[0m
------------
[36mdask.array<_predict_part, shape=(1,), dtype=float32, chunksize=(1,), chunktype=numpy.ndarray>
[0m[36m[10.87070356]
[0mINFO | 2024-11-12 22:35:13 | runhouse.servers.http.http_client:504 | Time to call my_trainer.predict: 0.58 seconds


10.870703558582909

In [7]:
cluster.teardown()

INFO | 2024-11-12 22:40:33 | runhouse.resources.hardware.on_demand_cluster:618 | Tearing down cluster locally via Sky.
INFO | 2024-11-12 22:40:59 | runhouse.resources.secrets.secret:285 | Saving config for /paul/ssh-sky-key to Den
INFO | 2024-11-12 22:40:59 | runhouse.resources.secrets.secret:301 | Saving secrets for /paul/ssh-sky-key to Vault
INFO | 2024-11-12 22:40:59 | runhouse.rns.rns_client:502 | Saving config for /paul/dask-env to RNS
INFO | 2024-11-12 22:40:59 | runhouse.rns.rns_client:502 | Saving config for /paul/py-2-new-gcp to RNS


In [24]:
from lightgbm_training import LightGBMModelTrainer
remote_dask_trainer = rh.cls(LightGBMModelTrainer).to(cluster)
dask_trainer = remote_dask_trainer(
    name="my_trainer"
).distribute('dask') 

INFO | 2024-11-14 22:59:57 | runhouse.resources.hardware.ssh_tunnel:91 | Running forwarding command: ssh -T -L 32300:localhost:32300 -i ~/.ssh/sky-key -o Port=22 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR -o IdentitiesOnly=yes -o ExitOnForwardFailure=yes -o ServerAliveInterval=5 -o ServerAliveCountMax=3 -o ConnectTimeout=30s -o ForwardAgent=yes -o ControlMaster=auto -o ControlPath=/tmp/skypilot_ssh_6ba66db0/05075989d7/%C -o ControlPersist=300s gcpuser@34.74.153.65
INFO | 2024-11-14 22:59:59 | runhouse.servers.http.http_client:439 | Calling dask-env._run_setup_cmds


INFO:httpx:HTTP Request: GET http://127.0.0.1:32300/logs/dask-env/dask-env__run_setup_cmds@20241114@225959_101481/pickle "HTTP/1.1 200 OK"


INFO | 2024-11-14 22:59:59 | runhouse.servers.http.http_client:504 | Time to call dask-env._run_setup_cmds: 0.55 seconds
INFO | 2024-11-14 22:59:59 | runhouse.resources.module:558 | Sending module LightGBMModelTrainer of type <class 'runhouse.resources.module.LightGBMModelTrainer'> to py-3-dask-gcp-1_2_jupyter
INFO | 2024-11-14 23:00:00 | runhouse.resources.secrets.secret:285 | Saving config for /paul/ssh-sky-key to Den
INFO | 2024-11-14 23:00:00 | runhouse.resources.secrets.secret:301 | Saving secrets for /paul/ssh-sky-key to Vault
INFO | 2024-11-14 23:00:00 | runhouse.rns.rns_client:502 | Saving config for /paul/dask-env to RNS
INFO | 2024-11-14 23:00:00 | runhouse.rns.rns_client:502 | Saving config for /paul/py-3-dask-gcp-1_2_jupyter to RNS
INFO | 2024-11-14 23:00:00 | runhouse.rns.rns_client:502 | Saving config for /paul/dask-env to RNS
INFO | 2024-11-14 23:00:00 | runhouse.rns.rns_client:502 | Saving config for /paul/LightGBMModelTrainer to RNS
INFO | 2024-11-14 23:00:00 | runhous

INFO:httpx:HTTP Request: GET http://127.0.0.1:32300/logs/LightGBMModelTrainer/LightGBMModelTrainer_name@20241114@230000_695718/pickle "HTTP/1.1 200 OK"


INFO | 2024-11-14 23:00:00 | runhouse.servers.http.http_client:504 | Time to call LightGBMModelTrainer.name: 0.07 seconds
INFO | 2024-11-14 23:00:01 | runhouse.resources.secrets.provider_secrets.ssh_secret:90 | Secrets already exist in ~/.ssh/sky-key. Skipping.
INFO | 2024-11-14 23:00:01 | runhouse.servers.http.http_client:439 | Calling my_trainer._remote_init


INFO:httpx:HTTP Request: GET http://127.0.0.1:32300/logs/my_trainer/my_trainer__remote_init@20241114@230001_492882/pickle "HTTP/1.1 200 OK"


INFO | 2024-11-14 23:00:01 | runhouse.servers.http.http_client:504 | Time to call my_trainer._remote_init: 0.08 seconds
INFO | 2024-11-14 23:00:01 | runhouse.servers.http.http_client:439 | Calling my_trainer.distribute


INFO:httpx:HTTP Request: GET http://127.0.0.1:32300/logs/my_trainer/my_trainer_distribute@20241114@230001_618842/pickle "HTTP/1.1 200 OK"


INFO | 2024-11-14 23:00:03 | runhouse.servers.http.http_client:504 | Time to call my_trainer.distribute: 1.15 seconds


In [25]:
dataset_path = "gs://rh-demo-external/*.parquet"  # 2024 NYC Taxi Data
X_vars = ["passenger_count", "trip_distance", "fare_amount"]
y_var = "tip_amount"


dask_trainer.load_client()
dask_trainer.load_data(dataset_path)


INFO | 2024-11-14 23:00:05 | runhouse.servers.http.http_client:439 | Calling dask_my_trainer.forward


INFO:httpx:HTTP Request: GET http://127.0.0.1:32300/logs/dask_my_trainer/dask_my_trainer_forward@20241114@230005_559762/pickle "HTTP/1.1 200 OK"


INFO | 2024-11-14 23:00:10 | runhouse.servers.http.http_client:504 | Time to call dask_my_trainer.forward: 5.19 seconds
INFO | 2024-11-14 23:00:10 | runhouse.servers.http.http_client:439 | Calling dask_my_trainer.forward


INFO:httpx:HTTP Request: GET http://127.0.0.1:32300/logs/dask_my_trainer/dask_my_trainer_forward@20241114@230010_835596/pickle "HTTP/1.1 200 OK"


-------------------------
[36mpy-3-dask-gcp-1_2_jupyter[0m
-------------------------
[36mIndex(['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
[0m[36m       'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag',
[0m[36m       'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra',
[0m[36m       'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge',
[0m[36m       'total_amount', 'congestion_surcharge', 'Airport_fee'],
[0m[36m      dtype='object')
[0mINFO | 2024-11-14 23:00:12 | runhouse.servers.http.http_client:504 | Time to call dask_my_trainer.forward: 1.6 seconds


In [9]:
new_date_columns = dask_trainer.preprocess(date_column="tpep_pickup_datetime")
X_vars = X_vars + new_date_columns
dask_trainer.train_test_split(target_var=y_var, features=X_vars)


INFO | 2024-11-14 22:21:27 | runhouse.servers.http.http_client:439 | Calling dask_my_trainer.forward


INFO:httpx:HTTP Request: GET http://127.0.0.1:32301/logs/dask_my_trainer/dask_my_trainer_forward@20241114@222127_143301/pickle "HTTP/1.1 200 OK"


INFO | 2024-11-14 22:21:27 | runhouse.servers.http.http_client:504 | Time to call dask_my_trainer.forward: 0.55 seconds
INFO | 2024-11-14 22:21:27 | runhouse.servers.http.http_client:439 | Calling dask_my_trainer.forward


INFO:httpx:HTTP Request: GET http://127.0.0.1:32301/logs/dask_my_trainer/dask_my_trainer_forward@20241114@222127_776523/pickle "HTTP/1.1 200 OK"


INFO | 2024-11-14 22:21:28 | runhouse.servers.http.http_client:504 | Time to call dask_my_trainer.forward: 0.72 seconds


In [10]:
# Train, test, and save the model
dask_trainer.train_model()


INFO | 2024-11-14 22:21:36 | runhouse.servers.http.http_client:439 | Calling dask_my_trainer.forward


INFO:httpx:HTTP Request: GET http://127.0.0.1:32301/logs/dask_my_trainer/dask_my_trainer_forward@20241114@222136_959772/pickle "HTTP/1.1 200 OK"


[36mFinding random open ports for workers
[0m[36mModel trained successfully. DaskLGBMRegressor(client=<Client: 'tcp://10.142.0.28:8786' processes=6 threads=6, memory=46.85 GiB>,
[0m[36m                  num_threads=1, time_out=120, tree_learner='data')
[0mINFO | 2024-11-14 22:21:50 | runhouse.servers.http.http_client:504 | Time to call dask_my_trainer.forward: 13.13 seconds


In [15]:
cluster.kill_dask() 
cluster.connect_dask()
dask_trainer.load_client()
dask_trainer.test_model()


INFO | 2024-11-14 22:27:52 | runhouse.resources.hardware.cluster:1760 | Running command on py-3-dask-gcp-1_1_jupyter: pkill -f 'dask scheduler'
INFO | 2024-11-14 22:27:53 | runhouse.resources.hardware.cluster:1760 | Running command on py-3-dask-gcp-1_1_jupyter: pkill -f 'dask worker'
INFO | 2024-11-14 22:27:53 | runhouse.resources.hardware.cluster:1760 | Running command on py-3-dask-gcp-1_1_jupyter: pkill -f 'dask worker'
INFO | 2024-11-14 22:27:54 | runhouse.resources.hardware.cluster:1760 | Running command on py-3-dask-gcp-1_1_jupyter: pkill -f 'dask worker'
INFO | 2024-11-14 22:27:54 | runhouse.resources.hardware.ssh_tunnel:91 | Running forwarding command: ssh -T -L 8786:localhost:8786 -i ~/.ssh/sky-key -o Port=22 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR -o IdentitiesOnly=yes -o ExitOnForwardFailure=yes -o ServerAliveInterval=5 -o ServerAliveCountMax=3 -o ConnectTimeout=30s -o ForwardAgent=yes -o ControlMaster=auto -o ControlPath=/tmp/skypilot_ss


+-------------+-----------+-----------+---------+
| Package     | Client    | Scheduler | Workers |
+-------------+-----------+-----------+---------+
| dask        | 2024.10.0 | 2024.11.2 | None    |
| distributed | 2024.10.0 | 2024.11.2 | None    |
+-------------+-----------+-----------+---------+
INFO:httpx:HTTP Request: GET http://127.0.0.1:32301/logs/dask_my_trainer/dask_my_trainer_forward@20241114@222757_613573/pickle "HTTP/1.1 200 OK"


INFO | 2024-11-14 22:27:58 | runhouse.servers.http.http_client:504 | Time to call dask_my_trainer.forward: 0.83 seconds
INFO | 2024-11-14 22:27:58 | runhouse.servers.http.http_client:439 | Calling dask_my_trainer.forward


INFO:httpx:HTTP Request: GET http://127.0.0.1:32301/logs/dask_my_trainer/dask_my_trainer_forward@20241114@222758_528702/pickle "HTTP/1.1 200 OK"


KeyboardInterrupt: 

In [27]:
#dask_trainer.save_model("model.pkl")
cluster.notebook()

#cluster.teardown() # Optionally, automatically teardown the cluster after training


INFO | 2024-11-14 23:08:42 | runhouse.resources.hardware.on_demand_cluster:607 | Tearing down cluster with Den.
INFO | 2024-11-14 23:08:44 | runhouse.resources.hardware.launcher_utils:91 | Tearing down cluster, keeping configs in Den
INFO | 2024-11-14 23:08:44 | runhouse.resources.hardware.launcher_utils:86 | Tearing down cluster
INFO | 2024-11-14 23:09:12 | runhouse.resources.hardware.launcher_utils:91 | Cluster successfully torn down
INFO | 2024-11-14 23:09:13 | runhouse.resources.hardware.launcher_utils:101 | Successfully ran cluster operation via Den


In [17]:
cluster.config()

{'name': '/paul/py-3-dask-gcp-1_1_jupyter',
 'resource_type': 'cluster',
 'resource_subtype': 'OnDemandCluster',
 'visibility': 'private',
 'ips': ['34.139.63.210', '34.138.212.133', '35.243.243.241'],
 'server_port': 32300,
 'server_connection_type': 'ssh',
 'den_auth': False,
 'ssh_port': 22,
 'client_port': 32301,
 'ssh_properties': {'ssh_user': 'gcpuser',
  'ssh_private_key': '~/.ssh/sky-key',
  'ssh_control_name': 'py-3-dask-gcp-1-1-jupyter-afc0',
  'ssh_proxy_command': None},
 'creds': '/paul/ssh-sky-key',
 'api_server_url': 'https://api.run.house',
 'default_env': '/paul/dask-env',
 'instance_type': 'CPU:2+',
 'num_nodes': 3,
 'provider': 'gcp',
 'open_ports': [],
 'use_spot': False,
 'region': 'us-east1',
 'stable_internal_external_ips': [('10.142.0.28', '34.139.63.210'),
  ('10.142.0.29', '34.138.212.133'),
  ('10.142.0.30', '35.243.243.241')],
 'memory': '12+',
 'sky_kwargs': {},
 'launched_properties': {'cloud': 'gcp',
  'instance_type': 'n2-highmem-2',
  'region': 'us-east1