# Sky Manager

Sky Manager's goal is to intelligently schedule jobs and deployment across an organization's clusters. It aims to eliminate the boundraries between clusters and create the notion of "one gigantic cluster".

Sky Manager consists of an API server and a controller manager. Organizations can easily add their clusters (Kubernetes and Slurm (TODO)) to Sky Manager.

The types of objects Sky Manager supports is:
- Clusters
- Jobs (Federated across clusters)
- Deployments (Federated across clusters)
- Namespaces (Federated across clusters)
- FilterPolicies (Governance for existing jobs/deployments.)

## API Server

The API server supports CRUD operations over namespace and global objects. These operations include:
- Create
- Get (Read)
- List
- Update
- Watch (asynchronously watches objects and tracks for updates)

In [1]:
# Launch API server.
import subprocess
import signal
import os

# Get a list of all running processes.
ps = subprocess.Popen(['ps', 'aux'], stdout=subprocess.PIPE).communicate()[0]
processes = ps.splitlines()
# Iterate over each running process
for process in processes:
    # Find processes with 'api_server' in the command
    if 'launch_server.py' in process.decode('utf-8') or 'launch_sky_manager' in process.decode('utf-8'):
        # Extract the process ID (PID).
        pid = int(process.split()[1])
        # Kill api_server process.
        os.kill(pid, signal.SIGKILL)  # or signal.SIGTERM for a softer kill

In [None]:

os.system('python ../../api_server/launch_server.py &')

Below we show simple examples with the API server:

In [None]:
# List clusters
from sky_manager.utils.utils import load_manager_config

api_server_ip, api_server_port = load_manager_config()

print('Listing all clusters:')
os.system(f'curl -X GET http://{api_server_ip}:{api_server_port}/clusters')

In [None]:
print('Get cluster cluster-0:')
os.system(f'curl -X GET http://{api_server_ip}:{api_server_port}/clusters/cluster-0')

In [None]:
print('DELETE cluster cluster-0:')
os.system(f'curl -X DELETE http://{api_server_ip}:{api_server_port}/clusters/cluster-0')

print("Cluster-0 should be gone.")
os.system(f'curl -X GET http://{api_server_ip}:{api_server_port}/clusters/cluster-0')

## Programatic API and CLI

Thanks to the API server, Sky Manager layers a programmtic API and CLI that uses the API server's rest API.

In [None]:
from sky_manager.api_client import ClusterAPI

cluster_api = ClusterAPI()

print('API - List clusters.')
print(cluster_api.list())

print('API - Get cluster-1')
print(cluster_api.get('cluster-1'))

print('API - Delete cluster-1')
print(cluster_api.delete('cluster-1'))

In [None]:
print('CLI - List clusters.')
os.system('skym get clusters')

print('CLI - Get cluster-2.')
os.system('skym get cluster cluster-2')

print('CLI - Create cluster.')
os.system('skym create cluster skycluster --manager k8')

print('CLI - List clusters.')
os.system('skym get clusters')



In [None]:
from sky_manager.api_client import JobAPI

job_api = JobAPI(namespace='default')
print('List jobs.')
print(job_api.list())

print('Get job-0')
print(job_api.get('job-0'))

print('Delete job-0')
print(job_api.delete('job-0'))

print('Create job hello')
job_dict = {
    "kind": "Job",
    "metadata": {
      "name": "hello",
      "labels": {
        "testing": "hello"
      }
    },
    "spec": {
      "image": "gcr.io/sky-burst/skyburst:latest",
      "resources": {
        "cpu": 1,
      },
      "run": "echo Sky!"
    } 
}
print(job_api.create(job_dict))

In [None]:
print('CLI - List jobs.')
os.system('skym get jobs')

print('CLI - Get jobs hello.')
os.system('skym get job hello')



## Controller Manager

Under the hood, the controller manager manages 
- Scheduler Controller, which coordinates which job goes to which clusters (aka spread replicas across clusters).
- Skylet Controller, which spawns a "Skylet" process for each cluster.

Diving deeper the Skylet controller manages:
- Cluster Controller, similar to Kubelet, which monitors a cluster's healthy and state.
- Job Controller, which monitors the state of a job's replicas submitted to cluster.
- Flow Controller, which controls the flow of jobs in and out of the cluster. (i.e. evict job is it is waiting too long).


In [None]:
# Launch controller manager
os.system('python ../launch_sky_manager.py &')

In [None]:
# Sky manager automatically detects all clusters in your Kubeconfig file. Skylet controller will spawn Skylet subprocesses for each valid K8 cluster.
!skym get clusters

## Job Submission Demo
This part of the DEMO will consist of three parts:
- Submitting a simple job. Sky Manager will automatically choose the cluster to execute the job.
- FilterPolicy (if user has governance constraints) - Sky Manager will filter for the right set of clusters to execute the job.
- Multi-node jobs (aka multiple replicas) - Sky Manager will automatically spread the job across clusters.

### Demo 1: Simple Job Submission

In [None]:
# Submit a 1 CPU job to Sky Manager
import time
import uuid

job_uuid = uuid.uuid4().hex[:8] # Get only the first 8 characters for a short version


os.system(f'skym create job sky-{job_uuid} --resources cpu 1 --run "echo Sky!; sleep 10"')

In [None]:
for _ in range(10):
    os.system(f'skym get job sky-{job_uuid}')
    time.sleep(0.5)

### Demo 2: Filter Policies

Filter policies constrain where users can submit their cluster.

In [None]:
# TODO: Filters on cluster labels (not just cluster name)
filter_policy = {
        'kind': 'FilterPolicy',
        'metadata': {
            'name': 'remove-mluo-cloud',
            'namespace': 'default',
        },
        'spec': {
            'clusterFilter': {
                'include': ['mluo-onprem', 'mluo-cloud', 'cloud-2'],
                'exclude': ['mluo-cloud'],
            },
            'labelsSelector': {
                'my_app': 'testing',
            }
        }
}

from sky_manager.api_client import FilterPolicyAPI

FilterPolicyAPI(namespace='default').create(filter_policy)

In [None]:
from sky_manager.api_client import JobAPI
job_api = JobAPI(namespace='default')


job_uuid = 'sky-' + str(uuid.uuid4().hex[:8]) # Get only the first 8 characters for a short version


job_dict = {
    "kind": "Job",
    "metadata": {
      "name": job_uuid,
      "labels": {
        "my_app": "testing"
      }
    },
    "spec": {
      "image": "gcr.io/sky-burst/skyburst:latest",
      "resources": {
        "cpu": 1,
      },
      "run": "sleep 30"
    } 
}
print(job_api.create(job_dict))

In [None]:
for _ in range(10):
    os.system(f'skym get job {job_uuid}')
    time.sleep(0.5)

### Demo 3: Spreading a job's/deployment's replicas across clusters.

In [None]:
from sky_manager.api_client import JobAPI
job_api = JobAPI(namespace='default')


job_uuid = 'sky-' + str(uuid.uuid4().hex[:8]) # Get only the first 8 characters for a short version
num_replicas = 4

job_dict = {
    "kind": "Job",
    "metadata": {
      "name": job_uuid,
    },
    "spec": {
      "replicas": num_replicas,
      "image": "gcr.io/sky-burst/skyburst:latest",
      "resources": {
        "cpu": 1,
      },
      "run": "sleep 30"
    } 
}
print(job_api.create(job_dict))

In [None]:
!kubectl get pods --context mluo-cloud

In [None]:
for _ in range(10):
    os.system(f'skym get job {job_uuid}')
    time.sleep(0.5)

In [None]:
!kubectl get pods --context mluo-cloud

In [None]:
!kubectl get pods --context mluo-onprem

In [None]:
!skym get clusters