In [11]:
#sf api setup
import time
import asyncio
import logging
from IPython.display import clear_output

from os.path import join

from sfapi_client         import Client, AsyncClient
from sfapi_client.compute import Machine
from sfapi_client.jobs    import JobState

from sfapi_connector import KeyManager, OsSFAPI, OsWrapper, LOGGER
import json
import re


In [12]:
import os
from sshtunnel import SSHTunnelForwarder
import time

def ssh_to_dask(username, hostname, private_key_path, local_ports, dask_ip=None):
    # Expand user directory if needed
    private_key_path = os.path.expanduser(private_key_path)

    try:
        # Set up the SSH tunnel
        with SSHTunnelForwarder(
            (hostname, 22),
            ssh_username=username,
            ssh_private_key=private_key_path,
            local_bind_addresses=[('localhost', local_port) for local_port, _, _ in local_ports],
            remote_bind_addresses=[(remote_host, remote_port) for _, remote_host, remote_port in local_ports]
        ) as tunnel:
            
           

            # Keep the connection alive
            input("Press Enter to close the SSH tunnel.")
            
    except Exception as e:
        print(f"Failed to establish SSH tunnel: {e}")

In [13]:
# sf api send jobscript

N = 10000

target = "./sfapi_test"

job_global = None
    
job_script = f"""#!/bin/bash
#SBATCH -q debug
#SBATCH -A m669
#SBATCH -N 1
#SBATCH -n 12              # Number of tasks (64 tasks, 32 per node)
#SBATCH -C cpu
#SBATCH -t 00:30:00
#SBATCH -J sfapi-demo
#SBATCH --exclusive
#SBATCH --output={target}/sfapi-demo-%j.out
#SBATCH --error={target}/sfapi-demo-%j.error

# Print each command for debugging
set -x

# Load necessary modules, for example Python and Dask
module load python dask

echo "Starting scheduler..."

scheduler_file=$SCRATCH/scheduler_file.json
rm -f $scheduler_file

module load python

# Start scheduler
DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT=3600s \
DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP=3600s \
dask-scheduler \
    --interface hsn0 \
    --scheduler-file $scheduler_file &

dask_pid=$!

# Wait for the scheduler to start
sleep 5
until [ -f $scheduler_file ]
do
     echo "Waiting for scheduler to start..."
     sleep 5
done
echo "Scheduler started"

echo "Starting workers..."

# Start workers and redirect their output to a log file for better visibility
DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT=3600s \
DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP=3600s \
srun dask worker \
    --scheduler-file $scheduler_file \
    --interface hsn0 \
    --nworkers 1 > $SCRATCH/worker_log.out 2>&1 &

echo "Workers started. Check $SCRATCH/worker_log.out for details."

# Wait a bit to ensure workers are started
echo "Sleeping..."
sleep 10

# Check number of workers
echo "Verifying number of Dask workers..."
python -c "from dask.distributed import Client; client = Client(scheduler_file='$scheduler_file'); print('Number of workers:', len(client.scheduler_info()['workers']))"


echo "hostname: $(hostname -f)"


echo "waiting for client connection..."
wait



"""


# job_script = f"""#!/bin/bash
# #SBATCH -q debug
# #SBATCH -A m669
# #SBATCH -N 1
# #SBATCH -C cpu
# #SBATCH -t 00:01:00
# #SBATCH -J sfapi-demo
# #SBATCH --exclusive
# #SBATCH --output={target}/sfapi-demo-%j.out
# #SBATCH --error={target}/sfapi-demo-%j.error

# module load python
# # Prints N random numbers to form a normal distrobution
# python -c "import numpy as np; numbers = np.random.normal(size={N}); [print(n) for n in numbers]"
#     """ 

# os = OsWrapper()
# job_stript_path = join(target, "job_script.sh")

# with os.open(job_stript_path, "w", mk_target_dir=False) as f:
#     f.write(job_script)

km = KeyManager()

with Client(key=km.key) as client:
    perlmutter = client.compute(Machine.perlmutter)


    job = perlmutter.submit_job(job_script)
    job_global = job
    print(f"Submitted_job: {job.jobid}")
    job_id = job.jobid

    while True:
        job.update()
        clear_output(wait=True)

        print(f"The job state is: {job.state} ({type(job.state)})")
        if job.state not in [JobState.PENDING, JobState.RUNNING, JobState.COMPLETING]:
            if job.state == JobState.FAILED:
                print("Job failed")
            elif job.state == JobState.COMPLETED:
                print("Job completed")
                output_file = perlmutter.ls(f"/global/homes/s/sanjeevc/sfapi_test/sfapi-demo-{job_id}.out")
                output_file = output_file[0]
                with output_file.open("r") as f:
                    print(f.read())
            elif job.state == JobState.TIMEOUT:
                print("Job timeout")
                output_file = perlmutter.ls(f"/global/homes/s/sanjeevc/sfapi_test/sfapi-demo-{job_id}.out")
                output_file = output_file[0]
                with output_file.open("r") as f:
                    print(f.read())    
            break
        try: 
            output_file = perlmutter.ls(f"/global/homes/s/sanjeevc/sfapi_test/sfapi-demo-{job_id}.out")
            output_file = output_file[0]
            with output_file.open("r") as f:
                file_content = f.read()
                print(file_content)

            # Parsing the hostname using a regex pattern
            hostname_match = re.search(r"hostname: (.+)", file_content)
            if hostname_match:
                hostname = hostname_match.group(1)
                print(f"Hostname parsed: {hostname}")
                print(f"---------------------------------")
                print(f"Searching for Dask IP address...")
                output_file = perlmutter.ls(f"/pscratch/sd/s/sanjeevc/scheduler_file.json")
                output_file = output_file[0]
                with output_file.open("r") as f:
                    file_content = f.read()
                    print(file_content)
                
                                # Parse the JSON content to extract the Dask IP
                scheduler_info = json.loads(file_content)
                dask_address = scheduler_info.get("address", "")
                daskip = re.search(r"tcp://([\d.]+):", dask_address)

                # Extract and print the IP address
                if daskip:
                    daskip = daskip.group(1)
                    print(f"Dask IP address: {daskip}")
                    
                    
                    
                    # Example usage
                    username = "sanjeevc"
                    hostname = "perlmutter.nersc.gov"
                    private_key_path = "~/.ssh/nersc"
                    local_ports = [(8786, daskip, 8786), (8787, daskip, 8787)]
                    ssh_to_dask(username, hostname, private_key_path, local_ports)
                    print("----------\n SSH tunnel should be established \n------------")
                    input("Cancel job?")
                    job.cancel()
                    print("Job cancelled")
                    
                    
                    
                else:
                    print("Dask IP address not found.")
                
                
                
                break
            else:
                print("Hostname not found in the file.")
        except Exception as e:
            print(f"error: {e}")
        time.sleep(10)
        
        


The job state is: JobState.RUNNING (<enum 'JobState'>)
Starting scheduler...
Waiting for scheduler to start...
Waiting for scheduler to start...
Scheduler started
Starting workers...
Workers started. Check /pscratch/sd/s/sanjeevc/worker_log.out for details.
Sleeping...
Verifying number of Dask workers...
Number of workers: 12
hostname: x1006c1s3b0n1h0.chn.perlmutter.nersc.gov
waiting for client connection...

Hostname parsed: x1006c1s3b0n1h0.chn.perlmutter.nersc.gov
---------------------------------
Searching for Dask IP address...
b'{\n  "type": "Scheduler",\n  "id": "Scheduler-4fd6abf3-7936-4c0b-ba40-082efaac1377",\n  "address": "tcp://10.249.3.101:8786",\n  "services": {\n    "dashboard": 8787\n  },\n  "started": 1731392536.2255898,\n  "workers": {}\n}'
Dask IP address: 10.249.3.101
Dask IP address not found.
----------
 SSH tunnel should be established 
------------
Job cancelled


In [14]:
km = KeyManager()

with Client(key=km.key) as client:
    job_global.cancel()

RuntimeError: Cannot send a request, as the client has been closed.