diff --git a/src/model-proxy/build/model-proxy.common.dockerfile b/src/model-proxy/build/model-proxy.common.dockerfile new file mode 100644 index 00000000..4dea6ec5 --- /dev/null +++ b/src/model-proxy/build/model-proxy.common.dockerfile @@ -0,0 +1,18 @@ +# Build stage +FROM golang:1.25.0 AS builder +WORKDIR /app + +COPY ./src /app/model-proxy + +RUN cd /app/model-proxy && go mod tidy && \ + CGO_ENABLED=0 GOOS=linux go build -o /app/bin/modelproxy + +# Final stage +FROM ubuntu:latest +WORKDIR /app + +RUN apt-get update + +RUN apt-get upgrade -y + +COPY --from=builder /app/bin/modelproxy /app/bin/modelproxy diff --git a/src/model-proxy/config/model-proxy.yaml b/src/model-proxy/config/model-proxy.yaml new file mode 100644 index 00000000..443e0967 --- /dev/null +++ b/src/model-proxy/config/model-proxy.yaml @@ -0,0 +1,8 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +service_type: "common" + +port: 9999 +retry: 5 +modelkey: "123" \ No newline at end of file diff --git a/src/model-proxy/config/model_proxy.py b/src/model-proxy/config/model_proxy.py new file mode 100644 index 00000000..3549c705 --- /dev/null +++ b/src/model-proxy/config/model_proxy.py @@ -0,0 +1,33 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import copy + +class ModelProxy(object): + def __init__(self, cluster_conf, service_conf, default_service_conf): + self.cluster_conf = cluster_conf + self.service_conf = service_conf + self.default_service_conf = default_service_conf + + def get_master_ip(self): + for host_conf in self.cluster_conf["machine-list"]: + if "pai-master" in host_conf and host_conf["pai-master"] == "true": + return host_conf["hostip"] + + def validation_pre(self): + return True, None + + def run(self): + result = copy.deepcopy(self.default_service_conf) + result.update(self.service_conf) + result["host"] = self.get_master_ip() + result["url"] = "http://{0}:{1}".format(self.get_master_ip(), result["port"]) + return result + + def validation_post(self, conf): + port = conf["model-proxy"].get("port") + if type(port) != int: + msg = "expect port in model-proxy to be int but get %s with type %s" % \ + (port, type(port)) + return False, msg + return True, None diff --git a/src/model-proxy/deploy/delete.sh b/src/model-proxy/deploy/delete.sh new file mode 100644 index 00000000..fd6a77ef --- /dev/null +++ b/src/model-proxy/deploy/delete.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +pushd $(dirname "$0") > /dev/null + +echo "Call stop script to stop all service first" +/bin/bash stop.sh || exit $? + + +popd > /dev/null \ No newline at end of file diff --git a/src/model-proxy/deploy/model-proxy.yaml.template b/src/model-proxy/deploy/model-proxy.yaml.template new file mode 100644 index 00000000..f54e67ab --- /dev/null +++ b/src/model-proxy/deploy/model-proxy.yaml.template @@ -0,0 +1,58 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +kind: DaemonSet +apiVersion: apps/v1 +metadata: + name: model-proxy-ds +spec: + selector: + matchLabels: + app: model-proxy + template: + metadata: + labels: + app: model-proxy + spec: + tolerations: + - key: node-role.kubernetes.io/master + effect: NoSchedule + imagePullSecrets: + - name: {{ cluster_cfg["cluster"]["docker-registry"]["secret-name"] }} + containers: + - name: model-proxy + image: {{ cluster_cfg['cluster']['docker-registry']['prefix'] }}model-proxy:{{ cluster_cfg['cluster']['docker-registry']['tag'] }} + imagePullPolicy: Always + command: ["/app/bin/modelproxy"] + args: + - "--port={{ cluster_cfg['model-proxy']['port'] }}" + - "--retry={{ cluster_cfg['model-proxy']['retry'] }}" + - "--modelkey={{ cluster_cfg['model-proxy']['modelkey'] }}" + - "--logdir=/usr/local/ltp/model-proxy/logs" + volumeMounts: + {%- if cluster_cfg['model-proxy']['log_pvc'] %} + - name: model-proxy-log-storage + mountPath: /usr/local/ltp/model-proxy + {%- else %} + - name: model-proxy-log + mountPath: /usr/local/ltp/model-proxy + {%- endif %} + ports: + - containerPort: {{ cluster_cfg["model-proxy"]["port"] }} + hostPort: {{ cluster_cfg["model-proxy"]["port"] }} + name: model-proxy + livenessProbe: + httpGet: + path: '/healthz' + port: model-proxy + initialDelaySeconds: 10 + periodSeconds: 60 + volumes: + - name: model-proxy-log + hostPath: + path: /var/log/model-proxy + {%- if cluster_cfg['model-proxy']['log_pvc'] %} + - name: model-proxy-log-storage + persistentVolumeClaim: + claimName: {{ cluster_cfg['model-proxy']['log_pvc'] }} + {%- endif %} diff --git a/src/model-proxy/deploy/refresh.sh b/src/model-proxy/deploy/refresh.sh new file mode 100644 index 00000000..bd50ad3d --- /dev/null +++ b/src/model-proxy/deploy/refresh.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +pushd $(dirname "$0") > /dev/null + +bash stop.sh +bash start.sh + +popd > /dev/null diff --git a/src/model-proxy/deploy/service.yaml b/src/model-proxy/deploy/service.yaml new file mode 100644 index 00000000..a2fe7ea9 --- /dev/null +++ b/src/model-proxy/deploy/service.yaml @@ -0,0 +1,21 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +cluster-type: + - yarn + - k8s + +prerequisite: + - rest-server + +template-list: + - model-proxy.yaml + +start-script: start.sh +stop-script: stop.sh +delete-script: delete.sh +refresh-script: refresh.sh + + +deploy-rules: + - in: pai-master diff --git a/src/model-proxy/deploy/start.sh b/src/model-proxy/deploy/start.sh new file mode 100644 index 00000000..27720c02 --- /dev/null +++ b/src/model-proxy/deploy/start.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +pushd $(dirname "$0") > /dev/null + +kubectl apply --overwrite=true -f model-proxy.yaml || exit $? + +# Wait until the service is ready. +PYTHONPATH="../../../deployment" python -m k8sPaiLibrary.monitorTool.check_pod_ready_status -w -k app -v model-proxy || exit $? + +popd > /dev/null diff --git a/src/model-proxy/deploy/stop.sh b/src/model-proxy/deploy/stop.sh new file mode 100644 index 00000000..9a19435e --- /dev/null +++ b/src/model-proxy/deploy/stop.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +kubectl delete --ignore-not-found --now "daemonset/model-proxy-ds" diff --git a/src/model-proxy/src/.gitignore b/src/model-proxy/src/.gitignore new file mode 100644 index 00000000..23b58a97 --- /dev/null +++ b/src/model-proxy/src/.gitignore @@ -0,0 +1,20 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +bin +Traces + +venv \ No newline at end of file diff --git a/src/model-proxy/src/README.md b/src/model-proxy/src/README.md new file mode 100644 index 00000000..36ca0387 --- /dev/null +++ b/src/model-proxy/src/README.md @@ -0,0 +1,41 @@ +# Instruction of model-proxy + +## Overview + +Model-proxy is a proxy service to forward requests from clients to different model jobs in LTP cluster. With one base url, client can access different models by specifying different model name in the request path, `model-proxy` service will forward the request to corresponding model job. If there are multiple jobs which are serving the same model, `model-proxy` will do load balancing among these jobs. + +Workflow: + +1. Client sends request to `model-proxy` service to list all models by `/v1/models` endpoint. + - During the list request, `model-proxy` will query LTP REST server to get all model serving jobs which the user can access, and then list all models which are being served by these jobs. + +2. Client sends request to `model-proxy` service to access a specific model by openai sepc api request format, e.g. `POST /v1/chat/completions` with request body containing model name. + - During the access request, `model-proxy` will query LTP REST server to get all model serving jobs which are serving the requested model, and then forward the request to one of these jobs. If there are multiple jobs, `model-proxy` will do load balancing among these jobs. + +## Configuration + +### Requirements + +- LTP model serving jobs should be deployed in the LTP cluster, and names of these jobs must include `model-serving`. + +- LTP model serving jobs should support openai spec api, e.g. `/v1/chat/completions` endpoint. + * The endpoints should use the first taskrole's ip and port. + * The api key which is configured in model-proxy service should be supported by these endpoints. So if users want to make their model serving jobs accessible by model-proxy, they need to configure the same api key in their jobs, which will be provided by the cluster admin. + +### Binary configuration + +Model-proxy binary can be configured by flags: + +- `--port`: the port that model-proxy service listens on, default is 9999 +- `--retry`: the retry times when forwarding request to model job, default is 5 +- `--logdir`: the directory to store log files, default is `./logs` +- `--modelkey`: the key which is used to request model serving jobs in the LTP cluster. + +### Service configuration + +```yaml +model-proxy: + port: 9999 + retry: 5 + modelkey: "ABCD1234" # the api key to access model serving jobs +``` diff --git a/src/model-proxy/src/go.mod b/src/model-proxy/src/go.mod new file mode 100644 index 00000000..41b67d82 --- /dev/null +++ b/src/model-proxy/src/go.mod @@ -0,0 +1,3 @@ +module modelproxy + +go 1.25 diff --git a/src/model-proxy/src/main.go b/src/model-proxy/src/main.go new file mode 100644 index 00000000..9a921296 --- /dev/null +++ b/src/model-proxy/src/main.go @@ -0,0 +1,51 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package main + +import ( + "flag" + + "modelproxy/proxy" + "modelproxy/trace" + "modelproxy/types" +) + +var ( + port int + maxRetries int = 5 // default value + logFileDir string + modelKey string +) + +func init() { + flag.IntVar(&port, "port", 9999, "port for the proxy server") + flag.IntVar(&maxRetries, "retry", 5, "max retries for the request to the model server") + flag.StringVar(&logFileDir, "logdir", "./logs", "path to the log file directory") + flag.StringVar(&modelKey, "modelkey", "", "model key for requesting model serving jobs") +} + +func main() { + flag.Parse() + + config := types.Config{ + Server: &types.Server{ + Host: "0.0.0.0", + Port: port, + MaxRetries: maxRetries, + ModelKey: modelKey, + }, + Log: &types.Log{ + LogStorage: &types.LogStorage{ + LocalFolder: logFileDir, + AzureStorage: nil, + }, + TraceRelatedKeys: []string{}, + }, + } + ph := proxy.NewProxyHandler(&config) + traceLogger := trace.NewJsonFileLogger(logFileDir) + + ph.StartProxy(traceLogger) + +} diff --git a/src/model-proxy/src/proxy/authenticator.go b/src/model-proxy/src/proxy/authenticator.go new file mode 100644 index 00000000..b57b1006 --- /dev/null +++ b/src/model-proxy/src/proxy/authenticator.go @@ -0,0 +1,78 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package proxy + +import ( + "log" + "net/http" + "strings" +) + +// obfuscateToken returns a truncated identifier for safely logging tokens. +func obfuscateToken(token string) string { + if len(token) <= 6 { + return "" + } + return token[:3] + "***" + token[len(token)-3:] +} + +type RestServerAuthenticator struct { + // rest-server token => model names => model urls + tokenToModels map[string]map[string][]string + modelKey string +} + +func NewRestServerAuthenticator(tokenToModels map[string]map[string][]string, modelKey string) *RestServerAuthenticator { + if tokenToModels == nil { + tokenToModels = make(map[string]map[string][]string) + } + return &RestServerAuthenticator{ + tokenToModels: tokenToModels, + modelKey: modelKey, + } +} + +func (ra *RestServerAuthenticator) UpdateTokenModels(token string, model2Url map[string][]string) { + if ra.tokenToModels == nil { + ra.tokenToModels = make(map[string]map[string][]string) + } + ra.tokenToModels[token] = model2Url +} + +// Check if the request is authenticated and return the available model urls +func (ra *RestServerAuthenticator) AuthenticateReq(req *http.Request, reqBody map[string]interface{}) (bool, []string) { + token := req.Header.Get("Authorization") + token = strings.Replace(token, "Bearer ", "", 1) + // read request body + model, ok := reqBody["model"].(string) + if !ok { + log.Printf("[-] Error: 'model' field missing or not a string in request body") + return false, []string{} + } + availableModels, ok := ra.tokenToModels[token] + if !ok { + // request to RestServer to get the models + log.Printf("[-] Error: token %s not found in the authenticator\n", obfuscateToken(token)) + availableModels, err := GetJobModelsMapping(req, ra.modelKey) + if err != nil { + log.Printf("[-] Error: failed to get models for token %s: %v\n", obfuscateToken(token), err) + return false, []string{} + } + ra.tokenToModels[token] = availableModels + } + if len(availableModels) == 0 { + log.Printf("[-] Error: no models found") + return false, []string{} + } + if model == "" { + log.Printf("[-] Error: model is empty") + return false, []string{} + } + for m, v := range availableModels { + if m == model { + return true, v + } + } + return false, []string{} +} diff --git a/src/model-proxy/src/proxy/load_balancer.go b/src/model-proxy/src/proxy/load_balancer.go new file mode 100644 index 00000000..789686d9 --- /dev/null +++ b/src/model-proxy/src/proxy/load_balancer.go @@ -0,0 +1,53 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package proxy + +import ( + "math/rand" + "net/url" + + "modelproxy/types" +) + +// UrlPoller can generate the destination url by polling the provided base url list +type UrlPoller struct { + OriUrl string + BSL types.BaseSpecList + Seed int +} + +func NewUrlPoller(url string, bsl types.BaseSpecList) *UrlPoller { + if len(bsl) == 0 { + return nil + } + return &UrlPoller{ + OriUrl: url, + BSL: bsl, + Seed: rand.Intn(len(bsl)), + } +} + +func NewUrlPollerWithKey(url string, modelUrls []string, modelKey string) *UrlPoller { + if len(modelUrls) == 0 { + return nil + } + bsl := make(types.BaseSpecList, 0, len(modelUrls)) + for _, v := range modelUrls { + bsl = append(bsl, &types.BaseSpecStatistic{ + BaseSpec: &types.BaseSpec{ + URL: v, + Key: modelKey, + }, + }) + } + return NewUrlPoller(url, bsl) +} + +// GetUrlAndKey will return the new url and the key of the base spec +func (ug *UrlPoller) GetUrlAndKey() (*url.URL, string) { + baseSpec := ug.BSL[ug.Seed%len(ug.BSL)] + newUrl := ReplaceBaseURL(ug.OriUrl, baseSpec.BaseSpec) + ug.Seed += 1 + return newUrl, baseSpec.Key +} diff --git a/src/model-proxy/src/proxy/model_server.go b/src/model-proxy/src/proxy/model_server.go new file mode 100644 index 00000000..a2ff5027 --- /dev/null +++ b/src/model-proxy/src/proxy/model_server.go @@ -0,0 +1,268 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package proxy + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "strings" + "time" +) + +// target job tag to identify model serving jobs +const TARGET_JOB_TAG = "model-serving" + +// REST server and Job server path segments in the URL +const REST_SERVER_PATH = "rest-server" +const JOB_SERVER_PATH = "job-server" + +var httpClient = &http.Client{Timeout: 120 * time.Second} + +// ListModelServingJobs returns a list of model serving jobs with the given request +func ListModelServingJobs(restServerUrl string, restServerToken string) ([]string, error) { + url := fmt.Sprintf("%s/api/v2/jobs?state=RUNNING", restServerUrl) + + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + restServerToken = strings.TrimPrefix(restServerToken, "Bearer ") + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", restServerToken)) + + resp, err := httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to GET jobs from %s: %w", url, err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("non-2xx response from %s: %d - %s", url, resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read jobs response: %w", err) + } + + // Expected: array of jobs with fields { name, username } + var jobs []struct { + Name string `json:"name"` + Username string `json:"username"` + } + if err := json.Unmarshal(body, &jobs); err != nil { + // Try to provide a helpful error if the shape is unexpected + return nil, fmt.Errorf("failed to parse jobs JSON: %w", err) + } + + result := make([]string, 0, len(jobs)) + for _, j := range jobs { + if j.Name == "" { + continue + } + if strings.Contains(j.Name, TARGET_JOB_TAG) { + // Use the same identifier as TS: username~name + jobId := fmt.Sprintf("%s~%s", j.Username, j.Name) + result = append(result, jobId) + } + } + + return result, nil +} + +// return the job server url +func GetJobServerUrl(restServerUrl string, restServerToken string, jobId string) (string, error) { + if restServerUrl == "" { + return "", fmt.Errorf("empty restServerUrl") + } + if jobId == "" { + return "", fmt.Errorf("empty jobId") + } + + // Ensure restServerUrl doesn't end with slash + restServerUrl = strings.TrimRight(restServerUrl, "/") + url := fmt.Sprintf("%s/api/v2/jobs/%s", restServerUrl, jobId) + + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return "", fmt.Errorf("failed to create request: %w", err) + } + restServerToken = strings.TrimPrefix(restServerToken, "Bearer ") + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", restServerToken)) + + resp, err := httpClient.Do(req) + if err != nil { + return "", fmt.Errorf("failed to GET job details from %s: %w", url, err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return "", fmt.Errorf("non-2xx response from %s: %d - %s", url, resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("failed to read job details response: %w", err) + } + + // Parse expected job details: + // { taskRoles: { roleName: { taskStatuses: [ { containerIp: "...", containerPorts: { "http": "port" } } ] } } } + var details struct { + TaskRoles map[string]struct { + TaskStatuses []struct { + ContainerIp string `json:"containerIp"` + ContainerPorts map[string]string `json:"containerPorts"` + } `json:"taskStatuses"` + } `json:"taskRoles"` + } + if err := json.Unmarshal(body, &details); err != nil { + return "", fmt.Errorf("failed to parse job details JSON: %w", err) + } + + if len(details.TaskRoles) == 0 { + return "", fmt.Errorf("no taskRoles found for job %s", jobId) + } + + jobServerPath := strings.Replace(restServerUrl, REST_SERVER_PATH, JOB_SERVER_PATH, 1) + // Pick first role, first taskStatus + for _, role := range details.TaskRoles { + if len(role.TaskStatuses) == 0 { + continue + } + ts := role.TaskStatuses[0] + if ts.ContainerIp == "" { + return "", fmt.Errorf("no containerIp found for job %s", jobId) + } + // prefer http port + port, ok := ts.ContainerPorts["http"] + if !ok || port == "" { + return "", fmt.Errorf("no http port found for job %s", jobId) + } + jobServerUrl := fmt.Sprintf("%s/%s:%s", jobServerPath, ts.ContainerIp, port) + return jobServerUrl, nil + } + + return "", fmt.Errorf("no taskStatuses found for job %s", jobId) +} + +// return model names list +func listModels(jobServerUrl string, token string) ([]string, error) { + if jobServerUrl == "" { + return nil, fmt.Errorf("empty jobServerUrl") + } + // ensure no trailing slash + jobServerUrl = strings.TrimRight(jobServerUrl, "/") + url := fmt.Sprintf("%s/v1/models", jobServerUrl) + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request for %s: %w", url, err) + } + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) + + resp, err := httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to GET models from %s: %w", url, err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("non-2xx response from %s: %d - %s", url, resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read models response: %w", err) + } + + // Try the expected shape: { data: [{ id: "model1" }, ...] } + var wrapper struct { + Data []struct { + ID string `json:"id"` + } `json:"data"` + } + if err := json.Unmarshal(body, &wrapper); err == nil && len(wrapper.Data) > 0 { + result := make([]string, 0, len(wrapper.Data)) + for _, m := range wrapper.Data { + if m.ID != "" { + result = append(result, m.ID) + } + } + return result, nil + } + + // Fallback: maybe the endpoint returns an array of objects or strings + var arr []map[string]interface{} + if err := json.Unmarshal(body, &arr); err == nil && len(arr) > 0 { + result := []string{} + for _, item := range arr { + if idv, ok := item["id"].(string); ok && idv != "" { + result = append(result, idv) + continue + } + if namev, ok := item["name"].(string); ok && namev != "" { + result = append(result, namev) + continue + } + } + if len(result) > 0 { + return result, nil + } + } + + // Fallback: maybe it's an array of strings + var strArr []string + if err := json.Unmarshal(body, &strArr); err == nil && len(strArr) > 0 { + return strArr, nil + } + + // No models found + return nil, fmt.Errorf("no models found at %s", url) +} + +// return JobURL => models +func GetJobModelsMapping(req *http.Request, modelToken string) (map[string][]string, error) { + mapping := make(map[string][]string) + + if req == nil || req.Host == "" { + return mapping, fmt.Errorf("invalid request or empty host") + } + restBase := fmt.Sprintf("https://%s/rest-server", req.Host) + + restServerToken := req.Header.Get("Authorization") + jobIDs, err := ListModelServingJobs(restBase, restServerToken) + if err != nil { + return mapping, fmt.Errorf("failed to list model serving jobs: %w", err) + } + + for _, jobId := range jobIDs { + jobServerUrl, err := GetJobServerUrl(restBase, restServerToken, jobId) + if err != nil { + // skip this job but continue with others + log.Printf("[-] Error: failed to get job server URL for job %s: %s\n", jobId, err) + continue + } + // + models, err := listModels(jobServerUrl, modelToken) + if err != nil { + // skip if cannot list models + log.Printf("[-] Error: failed to list models for job %s: %s\n", jobId, err) + continue + } + for _, model := range models { + // map model name -> [jobServerUrl] + if _, ok := mapping[model]; !ok { + mapping[model] = []string{} + } + mapping[model] = append(mapping[model], jobServerUrl) + } + } + + return mapping, nil +} diff --git a/src/model-proxy/src/proxy/proxy.go b/src/model-proxy/src/proxy/proxy.go new file mode 100644 index 00000000..b1f50164 --- /dev/null +++ b/src/model-proxy/src/proxy/proxy.go @@ -0,0 +1,259 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package proxy + +import ( + "bytes" + "encoding/json" + "errors" + "io" + "log" + "net/http" + "net/http/httputil" + "sort" + "strconv" + "strings" + "time" + + "modelproxy/trace" + "modelproxy/types" +) + +// hookWriter is a wrapper of http.ResponseWriter, which can get the response body and status code, and does not dis +type hookWriter struct { + http.ResponseWriter + status int + body []string +} + +// Implement the Write method of http.ResponseWriter. It can record the response body +func (w *hookWriter) Write(data []byte) (int, error) { + w.body = append(w.body, string(data)) + // Disable nginx buffering (do this before first write) + if len(w.body) == 1 { // First write + w.Header().Set("X-Accel-Buffering", "no") + } + + n, err := w.ResponseWriter.Write(data) + // Flush the response writer immediately if it implements http.Flusher + if flusher, ok := w.ResponseWriter.(http.Flusher); ok { + flusher.Flush() + } + return n, err +} + +// Implement the WriteHeader method of http.ResponseWriter. It can record the status code +func (w *hookWriter) WriteHeader(statusCode int) { + w.status = statusCode + w.ResponseWriter.WriteHeader(statusCode) +} + +// ProxyHandler is the key struct for proxy server +type ProxyHandler struct { + authenticator *RestServerAuthenticator + port int + maxRetries int + // traceRelatedKeys is the keys that will be logged in trace, but will be filtered in the api request + traceRelatedKeys map[string]struct{} +} + +// NewProxyHandler create a new ProxyHandler according to the config +func NewProxyHandler(config *types.Config) *ProxyHandler { + traceRelatedKeys := make(map[string]struct{}) + for _, key := range config.Log.TraceRelatedKeys { + traceRelatedKeys[key] = struct{}{} + } + + return &ProxyHandler{ + authenticator: NewRestServerAuthenticator(nil, config.Server.ModelKey), + port: config.Server.Port, + maxRetries: config.Server.MaxRetries, + traceRelatedKeys: traceRelatedKeys, + } +} + +// ReverseProxyHandler act as a reverse proxy, it will redirect the request to the destination website and return the response +func (ph *ProxyHandler) ReverseProxyHandler(w http.ResponseWriter, r *http.Request) (string, []string, bool) { + + // handle /healthz + if r.URL.Path == "/healthz" { + w.WriteHeader(http.StatusOK) + if _, err := w.Write([]byte("ok")); err != nil { + log.Printf("[-] Error: failed to write healthz response: %v\n", err) + } + return "", nil, false + } + + // handle /v1/models + if r.URL.Path == "/v1/models" { + model2Url, err := GetJobModelsMapping(r, ph.authenticator.modelKey) + if err != nil { + log.Printf("[-] Error: failed to get models mapping: %v\n", err) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return "", nil, false + } + // Update the ph.authenticator + token := r.Header.Get("Authorization") + token = strings.Replace(token, "Bearer ", "", 1) + ph.authenticator.UpdateTokenModels(token, model2Url) + + // convert models list to OpenAI style list and write it to w + ids := make([]string, 0, len(model2Url)) + for id := range model2Url { + ids = append(ids, id) + } + sort.Strings(ids) + + list := map[string]interface{}{ + "object": "list", + "data": make([]map[string]interface{}, 0, len(ids)), + } + for _, id := range ids { + item := map[string]interface{}{ + "id": id, + "object": "model", + // intentionally not including created or owned_by + } + list["data"] = append(list["data"].([]map[string]interface{}), item) + } + + out, err := json.Marshal(list) + if err != nil { + log.Printf("[-] Error: failed to marshal models list: %v\n", err) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return "", nil, false + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if _, err := w.Write(out); err != nil { + log.Printf("[-] Error: failed to write response: %v\n", err) + } + // We've handled the response, do not continue proxying this request + return "", nil, false + } + log.Printf("[*] receive a request from %s\n", r.RemoteAddr) + + // filter the request body and check whether the request should be traced + rawReqBody, newReqBody, data, shouldTraced := ph.requestBodyFilter(r) + + // check the key of the request + ok, modelUrls := ph.authenticator.AuthenticateReq(r, data) + + if !ok { + log.Printf("[-] Error: unauthorized request from %s\n", r.RemoteAddr) + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return "", nil, false + } + + // get the url poller to generate and poll the destination url + urlPoller := NewUrlPollerWithKey(r.URL.String(), modelUrls, ph.authenticator.modelKey) + if urlPoller == nil { + log.Printf("[-] Error: cannot get the url poller: \n\trawReqBody: %s\n\tURL: %s\n", rawReqBody, r.URL.String()) + proxy := &httputil.ReverseProxy{Director: func(req *http.Request) {}} + proxy.ServeHTTP(w, r) + return rawReqBody, nil, false + } + + responseWriter := &hookWriter{ResponseWriter: w, body: make([]string, 0, 1)} + director := func(req *http.Request) { + // get the new destination url and the related key + newUrl, curkey := urlPoller.GetUrlAndKey() + log.Printf("[*] redirect to %s\n", newUrl.String()) + + if newUrl == nil { + log.Println("[-] Error: cannot get the destinaton url") + return + } + req.Header.Set("Content-Length", strconv.Itoa(len(newReqBody))) + // key setting for openai spec endpoints + req.Header.Set("Authorization", "Bearer "+curkey) + // key setting for azure spec endpoints + req.Header.Set("Api-key", curkey) + + req.Host = newUrl.Host + req.URL = newUrl + + // the request body has been read out, so we need to reset the request body + req.Body = io.NopCloser(bytes.NewBufferString(newReqBody)) + req.ContentLength = int64(len(newReqBody)) + } + + // create the reverse proxy + proxy := &httputil.ReverseProxy{Director: director} + retries := 0 + modifyResponse := func(resp *http.Response) error { + // Handle error response here + // If the response is an error response, we will retry the request to another destination url + if resp.StatusCode >= 400 && retries < ph.maxRetries { + log.Printf("[-] receive error response %s\n", resp.Status) + retries++ + time.Sleep(100 * time.Millisecond) + // clear the response body and status code in the responseWriter + responseWriter.body = make([]string, 0, 1) + proxy.ServeHTTP(responseWriter, r) + return errors.New("retry") + } else { + return nil + } + + } + proxy.ModifyResponse = modifyResponse + proxy.ErrorHandler = func(http.ResponseWriter, *http.Request, error) {} + proxy.ServeHTTP(responseWriter, r) + + log.Printf("[*] receive the destination website response\n") + + return rawReqBody, responseWriter.body, shouldTraced +} + +// requestBodyFilter filter the request body, return the original request body, the filtered request body and whether the request should be traced (only trace chat request) +func (ph *ProxyHandler) requestBodyFilter(r *http.Request) (string, string, map[string]interface{}, bool) { + reqBody, _ := io.ReadAll(r.Body) + // check whether the request is a completion request + if !strings.Contains(r.URL.Path, "completions") { + log.Printf("[-] Request from %s is not a completion request\n", r.URL.Path) + return string(reqBody), string(reqBody), nil, false + } + + var data map[string]interface{} + + // parse request body + if err := json.Unmarshal(reqBody, &data); err != nil { + log.Printf("[-] Filter Error: %s\n", err) + return string(reqBody), string(reqBody), nil, false + } + for k := range data { + if _, ok := ph.traceRelatedKeys[k]; ok { + delete(data, k) + } + } + filtedBody, _ := json.Marshal(data) + + // check whether the request is a chat request + _, ok := data["messages"] + return string(reqBody), string(filtedBody), data, ok +} + +// ProxyHandlerWithLogger return a http.HandlerFunc, which can be used to start the proxy server and call the logger.Record method +func (ph *ProxyHandler) ProxyHandlerWithLogger(logger trace.TraceLogger) http.HandlerFunc { + res := func(w http.ResponseWriter, r *http.Request) { + log.Println("-----------------------------------------------") + req, resp, shouldTraced := ph.ReverseProxyHandler(w, r) + if shouldTraced { + log.Printf("[*] Debug: response: %s\n", resp) + logger.Record(req, resp) + } + } + return res +} + +// StartProxy start the proxy server +func (ph *ProxyHandler) StartProxy(logger trace.TraceLogger) { + log.Printf("[*] Start proxy at port %d\n", ph.port) + handler := ph.ProxyHandlerWithLogger(logger) + if err := http.ListenAndServe(":"+strconv.Itoa(ph.port), handler); err != nil { + log.Fatal("[*] Server error: " + err.Error()) + } +} diff --git a/src/model-proxy/src/proxy/utils.go b/src/model-proxy/src/proxy/utils.go new file mode 100644 index 00000000..a22d8f33 --- /dev/null +++ b/src/model-proxy/src/proxy/utils.go @@ -0,0 +1,66 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package proxy + +import ( + "log" + "net/http" + "net/url" + "path" + "strings" + + "modelproxy/types" +) + +// parse the RESTFul-style url and get the value of the given key +// e.g. .../deployments/abc/predictions -> GetArgsFromRestfulUrl(url, "deployments") -> abc +func GetArgsFromRestfulUrl(url, key string) string { + splitURL := strings.Split(url, "/") + for i, part := range splitURL { + if part == key && i+1 < len(splitURL) { + return splitURL[i+1] + } + } + return "" +} + +// ReplaceBaseURL will replace the base url of the original url with the new base url +// e.g. original url: http://localhost:8999/openai/deployments/gpt-4-32k/chat/completions?api-version=placeholder +// new base url: https://XXX.openai.azure.com/openai/deployments/gpt-4-32k/chat/completions?api-version=2023-08-01-preview +func ReplaceBaseURL(originalURL string, baseSpec *types.BaseSpec) *url.URL { + parsedURL, err := url.Parse(originalURL) + if err != nil { + log.Printf("Failed to parse original URL: %v", err) + return nil + } + + newURL, err := url.Parse(baseSpec.URL) + if err != nil { + log.Printf("Failed to parse new base URL: %v", err) + return nil + } + log.Printf("****************************************** %s\n", parsedURL.Path) + + newURL.Path = path.Join(newURL.Path, parsedURL.Path) + log.Printf("****************************************** %s\n", newURL.Path) + //replace the api-version + queryParams := newURL.Query() + if baseSpec.Version != "" { + queryParams.Set("api-version", baseSpec.Version) + // update into newURL + newURL.RawQuery = queryParams.Encode() + } + return newURL +} + +func GetKeyFromRequest(request *http.Request) string { + azureKey := request.Header.Get("Authorization") + azureKey = strings.Replace(azureKey, "Bearer ", "", 1) + if azureKey != "" { + return azureKey + } + openaiKey := request.Header.Get("Api-key") + openaiKey = strings.Replace(openaiKey, "Bearer ", "", 1) + return openaiKey +} diff --git a/src/model-proxy/src/trace/trace.go b/src/model-proxy/src/trace/trace.go new file mode 100644 index 00000000..458dff9e --- /dev/null +++ b/src/model-proxy/src/trace/trace.go @@ -0,0 +1,137 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package trace + +import ( + "bytes" + "log" + "os" + "path" + "strings" + "sync" + "time" + + "modelproxy/types" +) + +const maxNumChoice = 10 // Reasonable upper limit for number of choices per request. Adjust as needed. + +// TraceLogger is the interface for trace logger +type TraceLogger interface { + // Record the trace + Record(req string, resp []string) +} + +// JsonFileLogger is a logger that log the trace into json files, it implements TraceLogger interface +// The log file is named by date, and the log file will be uploaded to Azure Blob Storage everyday +type JsonFileLogger struct { + localFolderPath string + currentDay string + lock sync.Mutex +} + +// NewJsonFileLogger create a new JsonFileLogger +func NewJsonFileLogger(folderPath string) *JsonFileLogger { + if _, err := os.Stat(folderPath); os.IsNotExist(err) { + if err := os.Mkdir(folderPath, os.ModePerm); err != nil { + log.Printf("Failed to create directory %s: %v", folderPath, err) + } + } + date := time.Now().Format("2006-01-02") + return &JsonFileLogger{localFolderPath: folderPath, currentDay: date} +} + +func (j *JsonFileLogger) Record(req string, resp []string) { + go j.record(req, resp) +} + +// record the trace +func (j *JsonFileLogger) record(req string, resp []string) { + if req == "" || len(resp) == 0 { + return + } + + reqStruct := &types.Request{} + if err := reqStruct.Unmarshal([]byte(req)); err != nil { + log.Printf("[-] Error: %s\nrequest: %s\nresponse: %s\n", err, req, resp) + return + } + + numChoice := reqStruct.Choices + if numChoice <= 0 { + numChoice = 1 + } + if numChoice > maxNumChoice { + numChoice = maxNumChoice + } + + respStr := make([]string, numChoice) + modelName := "" + if len(resp) > 1 { + // If len(resp) > 1, the resp is a 'stream' + respConcat := make([]bytes.Buffer, numChoice) + for _, r := range resp { + lines := strings.Split(r, "\n") + for _, line := range lines { + if strings.ReplaceAll(line, " ", "") == "" { + continue + } + line = strings.TrimPrefix(line, "data: ") + + if line == "[DONE]" { + continue + } + responseChunk := types.ResponseChunk{} + if err := responseChunk.Unmarshal([]byte(line)); err != nil { + log.Printf("[-] Error: %s\nrequest: %s\nresponse: %s\n", err, req, resp) + return + } + if modelName == "" { + modelName = responseChunk.Model + } + for choice := range responseChunk.Choices { + respConcat[choice].WriteString(responseChunk.Choices[choice].Delta.Content) + } + } + } + for i := range respConcat { + respStr[i] = respConcat[i].String() + } + } else { + // If len(resp) == 1, the resp is not a 'stream' + response := types.Response{} + if err := response.Unmarshal([]byte(resp[0])); err != nil { + log.Printf("[-] Error: %s\nrequest: %s\nresponse: %s\n", err, req, resp) + return + } + modelName = response.Model + for choice := range response.Choices { + respStr[choice] = response.Choices[choice].Message.Content + } + } + if modelName != "" { + reqStruct.Model = modelName + } + trace := types.ConvertReqResp2Trace(reqStruct, respStr) + traceStr, err := trace.Marshal() + if err != nil { + log.Printf("[-] Error: %s\nrequest: %s\nresponse: %s\n", err, req, resp) + return + } + + j.lock.Lock() + defer j.lock.Unlock() + + date := time.Now().Format("2006-01-02") + j.currentDay = date + + filePath := path.Join(j.localFolderPath, date+".jsonl") + f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0664) + if err != nil { + panic(err) + } + defer f.Close() + // append the trace to the file + f.WriteString(string(traceStr) + "\n") +} diff --git a/src/model-proxy/src/types/config_types.go b/src/model-proxy/src/types/config_types.go new file mode 100644 index 00000000..cb05f5e5 --- /dev/null +++ b/src/model-proxy/src/types/config_types.go @@ -0,0 +1,97 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package types + +import ( + "encoding/json" + "os" +) + +// Config is the struct for config.json +type Config struct { + Server *Server `json:"server"` + Log *Log `json:"log"` +} + +// Server is the struct for proxy server config +type Server struct { + // Host is the host of the server + Host string `json:"host"` + // Port is the port of the server + Port int `json:"port"` + // MaxRetries is the max retries for the request + MaxRetries int `json:"retry"` + // ModelKey is the model key for the proxy server to access the model server + ModelKey string `json:"model_key"` +} + +// Log is the config for log +type Log struct { + // LogStorage is the log storage config + LogStorage *LogStorage `json:"storage"` + // TraceRelatedKeys is the keys that will be logged in trace, + // which will be used to identify the trace and filtered in the api request + TraceRelatedKeys []string `json:"trace_related_keys"` +} + +// LogStorage is the config for log storage +type LogStorage struct { + LocalFolder string `json:"local_folder,omitempty"` + AzureStorage *AzureStorage `json:"azure_storage,omitempty"` +} + +type AzureStorage struct { + // URL is the url of the azure blob storage, including the sas token + URL string `json:"url"` + Container string `json:"container"` + Path string `json:"path"` +} + +// BaseSpec is the base spec for azure and openai +type BaseSpec struct { + URL string `json:"url"` + Key string `json:"key"` + Version string `json:"version,omitempty"` +} + +// ParseConfig parse the config file into Config struct +func ParseConfig(path string) (*Config, error) { + file, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + var config Config + err = json.Unmarshal(file, &config) + if err != nil { + return nil, err + } + + return &config, nil +} + +// BaseSpecStatistic is the struct for statistic +type BaseSpecStatistic struct { + *BaseSpec + ValidRequestCount int + SuccessCount int + SuccessRate float64 +} + +type BaseSpecList []*BaseSpecStatistic + +// deploy_name -> BaseSpecList +type ModelToBase map[string]BaseSpecList + +// Two reverse maps for chat and embedding +type ReverseMap struct { + Chat ModelToBase + Embedding ModelToBase +} + +// two ReverseMap for Azure and OpenAI +type AllReverseMap struct { + Azure *ReverseMap + OpenAI *ReverseMap +} diff --git a/src/model-proxy/src/types/request_response_types.go b/src/model-proxy/src/types/request_response_types.go new file mode 100644 index 00000000..3ad970a8 --- /dev/null +++ b/src/model-proxy/src/types/request_response_types.go @@ -0,0 +1,182 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package types + +import ( + "crypto/rand" + "encoding/hex" + "encoding/json" +) + +type Message struct { + Role string `json:"role"` + Content string `json:"content"` +} + +type Source struct { + SourceName string `json:"source_name"` + Location string `json:"location,omitempty"` +} + +// Request is the struct for request of openai chat. It is used to parse the request body +type Request struct { + Messages []Message `json:"messages"` + Model string `json:"model"` + Temperature float64 `json:"temperature,omitempty"` + Choices int `json:"n,omitempty"` + PresencePenalty float64 `json:"presence_penalty,omitempty"` + TopP float64 `json:"top_p,omitempty"` + FrequencyPenalty float64 `json:"frequency_penalty,omitempty"` + Stream bool `json:"stream,omitempty"` + Stop string `json:"stop,omitempty"` + MaxTokens int `json:"max_tokens,omitempty"` + LogitBias map[string]float64 `json:"logit_bias,omitempty"` + User string `json:"user,omitempty"` + + Source *Source `json:"source,omitempty"` + CategoryInfo *map[string]string `json:"category_info,omitempty"` + Label *[]string `json:"labels,omitempty"` + OtherMetadata string `json:"other_metadata,omitempty"` +} + +func (r *Request) Unmarshal(data []byte) error { + return json.Unmarshal(data, r) +} + +// Response is the struct for response of openai chat +type Response struct { + ID string `json:"id"` + Object string `json:"object"` + Created int64 `json:"created"` + Model string `json:"model"` + Choices []struct { + Index int `json:"index"` + FinishReason string `json:"finish_reason"` + Message Message `json:"message"` + } `json:"choices"` + Usage struct { + PromptTokens int `json:"prompt_tokens"` + CompletionTokens int `json:"completion_tokens"` + TotalTokens int `json:"total_tokens"` + } `json:"usage"` +} + +func (r *Response) Unmarshal(data []byte) error { + return json.Unmarshal(data, r) +} + +// ResponseChunk is the struct for response of openai chat stream +type ResponseChunk struct { + ID string `json:"id"` + Object string `json:"object"` + Created int64 `json:"created"` + Model string `json:"model"` + Choices []struct { + Index int `json:"index"` + FinishReason string `json:"finish_reason"` + Delta struct { + Content string `json:"content"` + } `json:"delta"` + } `json:"choices"` +} + +func (rc *ResponseChunk) Unmarshal(data []byte) error { + return json.Unmarshal(data, rc) +} + +// TraceMessage is the struct for each message in a trace +type TraceMessage struct { + Role string `json:"from"` + Content string `json:"value"` + Score float64 `json:"score"` + ParentID int `json:"parent_id"` + ID int `json:"id"` +} + +type ModelInfo struct { + ModelName string `json:"model_name"` + Temperature float64 `json:"temperature,omitempty"` + Choices int `json:"n,omitempty"` + PresencePenalty float64 `json:"presence_penalty,omitempty"` + TopP float64 `json:"top_p,omitempty"` + FrequencyPenalty float64 `json:"frequency_penalty,omitempty"` + Stream bool `json:"stream,omitempty"` + Stop string `json:"stop,omitempty"` + MaxTokens int `json:"max_tokens,omitempty"` + LogitBias map[string]float64 `json:"logit_bias,omitempty"` + User string `json:"user,omitempty"` +} + +// Trace is the struct for one trace. It is recorded in the json file +type Trace struct { + ID string `json:"id"` + Source *Source `json:"source,omitempty"` + CategoryInfo *map[string]string `json:"category_info,omitempty"` + Other_metadata string `json:"other_metadata,omitempty"` + ModelInfo *ModelInfo `json:"model_info"` + Conversations []*TraceMessage `json:"conversation"` +} + +func (t *Trace) Marshal() ([]byte, error) { + return json.Marshal(t) +} + +// convert a request and response to a trace +func ConvertReqResp2Trace(req *Request, response []string) *Trace { + conversations := make([]*TraceMessage, 0, len(req.Messages)+len(response)) + for i, msg := range req.Messages { + conversations = append(conversations, &TraceMessage{ + Role: msg.Role, + Content: msg.Content, + Score: -1, + ParentID: i - 1, + ID: i, + }) + } + for _, resp := range response { + conversations = append(conversations, &TraceMessage{ + Role: "assistant", + Content: resp, + Score: -1, + ParentID: len(req.Messages) - 1, + ID: len(conversations), + }) + } + + id, _ := generateID() + + trace := &Trace{ + ID: id, + Source: req.Source, + CategoryInfo: req.CategoryInfo, + Other_metadata: req.OtherMetadata, + ModelInfo: &ModelInfo{ + ModelName: req.Model, + Temperature: req.Temperature, + Choices: req.Choices, + PresencePenalty: req.PresencePenalty, + TopP: req.TopP, + FrequencyPenalty: req.FrequencyPenalty, + Stream: req.Stream, + Stop: req.Stop, + MaxTokens: req.MaxTokens, + LogitBias: req.LogitBias, + User: req.User, + }, + Conversations: conversations, + } + + return trace +} + +// generate a 16bytes id +func generateID() (string, error) { + buf := make([]byte, 16) + _, err := rand.Read(buf) + if err != nil { + return "", err + } + res := hex.EncodeToString(buf) + return res, nil +} diff --git a/src/pylon/deploy/pylon-config/location.conf.template b/src/pylon/deploy/pylon-config/location.conf.template index ad9cb86b..02b9e940 100644 --- a/src/pylon/deploy/pylon-config/location.conf.template +++ b/src/pylon/deploy/pylon-config/location.conf.template @@ -63,6 +63,14 @@ location ~ ^/copilot/api/operation(.*)$ { proxy_send_timeout 2m; } +# Model proxy backend +location ~ ^/model-proxy/(.*)$ { + proxy_pass {{MODEL_PROXY_URI}}/$1$is_args$args; + proxy_connect_timeout 60m; + proxy_read_timeout 60m; + proxy_send_timeout 60m; +} + # # Other web portals # diff --git a/src/pylon/deploy/pylon.yaml.template b/src/pylon/deploy/pylon.yaml.template index bccf392d..75b561cb 100644 --- a/src/pylon/deploy/pylon.yaml.template +++ b/src/pylon/deploy/pylon.yaml.template @@ -66,7 +66,11 @@ spec: {% if "copilot-chat" in cluster_cfg %} - name: COPILOT_CHAT_URI value: {{ cluster_cfg['copilot-chat']['url'] }} -{% endif %} +{% endif %} +{% if "model-proxy" in cluster_cfg %} + - name: MODEL_PROXY_URI + value: {{ cluster_cfg['model-proxy']['url'] }} +{% endif %} - name: K8S_DASHBOARD_URI value: {{ cluster_cfg['layout']['kubernetes']['dashboard-url'] }} {% if cluster_cfg["cluster"]["common"]["cluster-type"] == "yarn" %}