From 8d5206100445481b9ba605e9fd6387d8b367eadd Mon Sep 17 00:00:00 2001 From: Zhongxin Guo Date: Tue, 26 Aug 2025 10:39:38 +0800 Subject: [PATCH 01/12] init model proxy add LTP scripts update update --- .../build/model-proxy.common.dockerfile | 7 + src/model-proxy/config/model-proxy.yaml | 5 + src/model-proxy/config/model_proxy.py | 23 ++ src/model-proxy/deploy/delete.sh | 26 ++ .../deploy/model-proxy.yaml.template | 62 ++++ src/model-proxy/deploy/refresh.sh | 26 ++ src/model-proxy/deploy/service.yaml | 36 +++ src/model-proxy/deploy/start.sh | 27 ++ src/model-proxy/deploy/stop.sh | 20 ++ src/model-proxy/src/.gitignore | 20 ++ src/model-proxy/src/README.md | 0 src/model-proxy/src/config.json.example | 61 ++++ src/model-proxy/src/go.mod | 3 + src/model-proxy/src/go.sum | 0 src/model-proxy/src/main.go | 55 ++++ src/model-proxy/src/proxy/authenticator.go | 63 +++++ src/model-proxy/src/proxy/load_balancer.go | 50 ++++ src/model-proxy/src/proxy/model_server.go | 264 ++++++++++++++++++ src/model-proxy/src/proxy/proxy.go | 246 ++++++++++++++++ src/model-proxy/src/proxy/utils.go | 63 +++++ src/model-proxy/src/scripts/build-docker.sh | 9 + src/model-proxy/src/scripts/build.sh | 1 + src/model-proxy/src/scripts/kill.sh | 1 + src/model-proxy/src/scripts/run-docker.sh | 49 ++++ src/model-proxy/src/scripts/runall.sh | 2 + src/model-proxy/src/trace/trace.go | 127 +++++++++ src/model-proxy/src/types/config_types.go | 94 +++++++ .../src/types/request_response_types.go | 179 ++++++++++++ 28 files changed, 1519 insertions(+) create mode 100644 src/model-proxy/build/model-proxy.common.dockerfile create mode 100644 src/model-proxy/config/model-proxy.yaml create mode 100644 src/model-proxy/config/model_proxy.py create mode 100644 src/model-proxy/deploy/delete.sh create mode 100644 src/model-proxy/deploy/model-proxy.yaml.template create mode 100644 src/model-proxy/deploy/refresh.sh create mode 100644 src/model-proxy/deploy/service.yaml create mode 100644 src/model-proxy/deploy/start.sh create mode 100644 src/model-proxy/deploy/stop.sh create mode 100644 src/model-proxy/src/.gitignore create mode 100644 src/model-proxy/src/README.md create mode 100644 src/model-proxy/src/config.json.example create mode 100644 src/model-proxy/src/go.mod create mode 100644 src/model-proxy/src/go.sum create mode 100644 src/model-proxy/src/main.go create mode 100644 src/model-proxy/src/proxy/authenticator.go create mode 100644 src/model-proxy/src/proxy/load_balancer.go create mode 100644 src/model-proxy/src/proxy/model_server.go create mode 100644 src/model-proxy/src/proxy/proxy.go create mode 100644 src/model-proxy/src/proxy/utils.go create mode 100644 src/model-proxy/src/scripts/build-docker.sh create mode 100755 src/model-proxy/src/scripts/build.sh create mode 100755 src/model-proxy/src/scripts/kill.sh create mode 100644 src/model-proxy/src/scripts/run-docker.sh create mode 100755 src/model-proxy/src/scripts/runall.sh create mode 100644 src/model-proxy/src/trace/trace.go create mode 100644 src/model-proxy/src/types/config_types.go create mode 100644 src/model-proxy/src/types/request_response_types.go diff --git a/src/model-proxy/build/model-proxy.common.dockerfile b/src/model-proxy/build/model-proxy.common.dockerfile new file mode 100644 index 00000000..f5c05c07 --- /dev/null +++ b/src/model-proxy/build/model-proxy.common.dockerfile @@ -0,0 +1,7 @@ +FROM golang:1.25.0 +WORKDIR /app + +COPY ./src /app/model-proxy + +RUN cd /app/model-proxy && go mod tidy && \ + go build -o /app/bin/modelproxy diff --git a/src/model-proxy/config/model-proxy.yaml b/src/model-proxy/config/model-proxy.yaml new file mode 100644 index 00000000..fa519038 --- /dev/null +++ b/src/model-proxy/config/model-proxy.yaml @@ -0,0 +1,5 @@ +service_type: "common" + +port: 9999 +retry: 5 +modelkey: "123" \ No newline at end of file diff --git a/src/model-proxy/config/model_proxy.py b/src/model-proxy/config/model_proxy.py new file mode 100644 index 00000000..e6c272e6 --- /dev/null +++ b/src/model-proxy/config/model_proxy.py @@ -0,0 +1,23 @@ +import copy + +class ModelProxy(object): + def __init__(self, cluster_conf, service_conf, default_service_conf): + self.cluster_conf = cluster_conf + self.service_conf = service_conf + self.default_service_conf = default_service_conf + + def validation_pre(self): + return True, None + + def run(self): + result = copy.deepcopy(self.default_service_conf) + result.update(self.service_conf) + return result + + def validation_post(self, conf): + port = conf["model-proxy"].get("port") + if type(port) != int: + msg = "expect port in model-proxy to be int but get %s with type %s" % \ + (port, type(port)) + return False, msg + return True, None diff --git a/src/model-proxy/deploy/delete.sh b/src/model-proxy/deploy/delete.sh new file mode 100644 index 00000000..1b952d11 --- /dev/null +++ b/src/model-proxy/deploy/delete.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +pushd $(dirname "$0") > /dev/null + +echo "Call stop script to stop all service first" +/bin/bash stop.sh || exit $? + + +popd > /dev/null \ No newline at end of file diff --git a/src/model-proxy/deploy/model-proxy.yaml.template b/src/model-proxy/deploy/model-proxy.yaml.template new file mode 100644 index 00000000..52cdfa4c --- /dev/null +++ b/src/model-proxy/deploy/model-proxy.yaml.template @@ -0,0 +1,62 @@ +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +kind: DaemonSet +apiVersion: apps/v1 +metadata: + name: model-proxy-ds +spec: + selector: + matchLabels: + app: model-proxy + template: + metadata: + labels: + app: model-proxy + spec: + tolerations: + - key: node-role.kubernetes.io/master + effect: NoSchedule + imagePullSecrets: + - name: {{ cluster_cfg["cluster"]["docker-registry"]["secret-name"] }} + containers: + - name: model-proxy + image: {{ cluster_cfg['cluster']['docker-registry']['prefix'] }}model-proxy:{{ cluster_cfg['cluster']['docker-registry']['tag'] }} + imagePullPolicy: Always + command: ["/app/bin/modelproxy"] + args: + - "--port={{ cluster_cfg['model-proxy']['port'] }}" + - "--retry={{ cluster_cfg['model-proxy']['retry'] }}" + - "--modelkey={{ cluster_cfg['model-proxy']['modelkey'] }}" + - "--logdir=/usr/local/ltp/model-proxy/logs" + volumeMounts: + {%- if cluster_cfg['model-proxy']['log_pvc'] %} + - name: model-proxy-log-storage + mountPath: /usr/local/ltp/model-proxy + {%- else %} + - name: model-proxy-log + mountPath: /usr/local/ltp/model-proxy + {%- endif %} + volumes: + - name: model-proxy-log + hostPath: + path: /var/log/model-proxy + {%- if cluster_cfg['model-proxy']['log_pvc'] %} + - name: model-proxy-log-storage + persistentVolumeClaim: + claimName: {{ cluster_cfg['model-proxy']['log_pvc'] }} + {%- endif %} diff --git a/src/model-proxy/deploy/refresh.sh b/src/model-proxy/deploy/refresh.sh new file mode 100644 index 00000000..3b6d2ca1 --- /dev/null +++ b/src/model-proxy/deploy/refresh.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + +pushd $(dirname "$0") > /dev/null + +bash stop.sh +bash start.sh + +popd > /dev/null diff --git a/src/model-proxy/deploy/service.yaml b/src/model-proxy/deploy/service.yaml new file mode 100644 index 00000000..60888da4 --- /dev/null +++ b/src/model-proxy/deploy/service.yaml @@ -0,0 +1,36 @@ +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +cluster-type: + - yarn + - k8s + +prerequisite: + - rest-server + +template-list: + - model-proxy.yaml + +start-script: start.sh +stop-script: stop.sh +delete-script: delete.sh +refresh-script: refresh.sh +upgraded-script: upgraded.sh + + +deploy-rules: + - in: pai-master diff --git a/src/model-proxy/deploy/start.sh b/src/model-proxy/deploy/start.sh new file mode 100644 index 00000000..8747ed46 --- /dev/null +++ b/src/model-proxy/deploy/start.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +pushd $(dirname "$0") > /dev/null + +kubectl apply --overwrite=true -f model-proxy.yaml || exit $? + +# Wait until the service is ready. +PYTHONPATH="../../../deployment" python -m k8sPaiLibrary.monitorTool.check_pod_ready_status -w -k app -v model-proxy || exit $? + +popd > /dev/null diff --git a/src/model-proxy/deploy/stop.sh b/src/model-proxy/deploy/stop.sh new file mode 100644 index 00000000..3abbaec1 --- /dev/null +++ b/src/model-proxy/deploy/stop.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +kubectl delete --ignore-not-found --now "daemonset/model-proxy-ds" diff --git a/src/model-proxy/src/.gitignore b/src/model-proxy/src/.gitignore new file mode 100644 index 00000000..23b58a97 --- /dev/null +++ b/src/model-proxy/src/.gitignore @@ -0,0 +1,20 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +bin +Traces + +venv \ No newline at end of file diff --git a/src/model-proxy/src/README.md b/src/model-proxy/src/README.md new file mode 100644 index 00000000..e69de29b diff --git a/src/model-proxy/src/config.json.example b/src/model-proxy/src/config.json.example new file mode 100644 index 00000000..b020ce50 --- /dev/null +++ b/src/model-proxy/src/config.json.example @@ -0,0 +1,61 @@ +{ + "server": { + "host": "0.0.0.0", + "port": 20001, + "retry": 5, + "access_keys": [ + "key1", + "key2" + ] + }, + "log": { + "storage": { + "local_folder": "./Traces", + "azure_storage": { + "url": "https://aimiciusdata.blob.core.windows.net/", + "container": "datastore", + "path": "raw/traces/model_traces" + } + }, + "trace_related_keys": ["source", "category_info", "other_metadata"] + }, + "endpoints": { + "azure_spec": [ + { + "url": "https://aimicius-dev-canada-east.openai.azure.com", + "key": "<>", + "version": "2023-07-01-preview", + "chat": [ + "gpt-4-32k", + "gpt-4", + "gpt-35-turbo" + ], + "embeddings": [ + "text-embedding-ada-002" + ] + }, + { + "url": "https://gcrgpt4aoai5c.openai.azure.com", + "key": "<>", + "version": "2023-06-01-preview", + "chat": [ + "gpt-4-32k", + "gpt-4", + "gpt-35-turbo" + ] + } + ], + "openai_spec": [ + { + "url": "http://localhost:8081/v1", + "key": "<>", + "chat": [ + "llama-2-7b-chat", + "llama-2-13b-chat", + "llama-2-70b-chat", + "llama-2-7b-chat-ark" + ] + } + ] + } +} \ No newline at end of file diff --git a/src/model-proxy/src/go.mod b/src/model-proxy/src/go.mod new file mode 100644 index 00000000..41b67d82 --- /dev/null +++ b/src/model-proxy/src/go.mod @@ -0,0 +1,3 @@ +module modelproxy + +go 1.25 diff --git a/src/model-proxy/src/go.sum b/src/model-proxy/src/go.sum new file mode 100644 index 00000000..e69de29b diff --git a/src/model-proxy/src/main.go b/src/model-proxy/src/main.go new file mode 100644 index 00000000..acd111cd --- /dev/null +++ b/src/model-proxy/src/main.go @@ -0,0 +1,55 @@ +package main + +import ( + "flag" + "fmt" + "os" + + "modelproxy/proxy" + "modelproxy/trace" + "modelproxy/types" +) + +var ( + port int + maxRetries int = 5 // default value + logFileDir string + modelKey string +) + +func init() { + flag.IntVar(&port, "port", 9999, "port for the proxy server") + flag.IntVar(&maxRetries, "retry", 5, "max retries for the request to the model server") + flag.StringVar(&logFileDir, "logdir", "./logs", "path to the log file directory") + flag.StringVar(&modelKey, "modelkey", "", "comma-separated model keys for the proxy server") + + flag.Usage = func() { + fmt.Fprintf(os.Stderr, "Usage of modelproxy:\n") + flag.PrintDefaults() + } +} + +func main() { + flag.Parse() + + config := types.Config{ + Server: &types.Server{ + Host: "0.0.0.0", + Port: port, + MaxRetries: maxRetries, + ModelKey: modelKey, + }, + Log: &types.Log{ + LogStorage: &types.LogStorage{ + LocalFolder: logFileDir, + AzureStorage: nil, + }, + TraceRelatedKeys: []string{}, + }, + } + ph := proxy.NewProxyHandler(&config) + traceLogger := trace.NewJsonFileLogger(logFileDir) + + ph.StartProxy(traceLogger) + +} diff --git a/src/model-proxy/src/proxy/authenticator.go b/src/model-proxy/src/proxy/authenticator.go new file mode 100644 index 00000000..a3fa4f02 --- /dev/null +++ b/src/model-proxy/src/proxy/authenticator.go @@ -0,0 +1,63 @@ +package proxy + +import ( + "log" + "net/http" + "strings" +) + +type RestServerAuthenticator struct { + // rest-server token => model names => model urls + tokenToModels map[string]map[string][]string + modelKey string +} + +func NewRestServerAuthenticator(tokenToModels map[string]map[string][]string, modelKey string) *RestServerAuthenticator { + if tokenToModels == nil { + tokenToModels = make(map[string]map[string][]string) + } + return &RestServerAuthenticator{ + tokenToModels: tokenToModels, + modelKey: modelKey, + } +} + +func (ra *RestServerAuthenticator) UpdateTokenModels(token string, model2Url map[string][]string) { + if ra.tokenToModels == nil { + ra.tokenToModels = make(map[string]map[string][]string) + } + ra.tokenToModels[token] = model2Url +} + +// Check if the request is authenticated and return the available model urls +func (ra *RestServerAuthenticator) AuthenticateReq(req *http.Request, reqBody map[string]interface{}) (bool, []string) { + token := req.Header.Get("Authorization") + token = strings.Replace(token, "Bearer ", "", 1) + // read request body + model := reqBody["model"].(string) + availableModels, ok := ra.tokenToModels[token] + if !ok { + // request to RestServer to get the models + log.Printf("[-] Error: token %s not found in the authenticator\n", token) + availableModels, err := GetJobModelsMapping(req, ra.modelKey) + if err != nil { + log.Printf("[-] Error: failed to get models for token %s: %v\n", token, err) + return false, []string{} + } + ra.tokenToModels[token] = availableModels + } + if len(availableModels) == 0 { + log.Printf("[-] Error: no models found") + return false, []string{} + } + if model == "" { + log.Printf("[-] Error: model is empty") + return false, []string{} + } + for m, v := range availableModels { + if m == model { + return true, v + } + } + return false, []string{} +} diff --git a/src/model-proxy/src/proxy/load_balancer.go b/src/model-proxy/src/proxy/load_balancer.go new file mode 100644 index 00000000..7707a477 --- /dev/null +++ b/src/model-proxy/src/proxy/load_balancer.go @@ -0,0 +1,50 @@ +package proxy + +import ( + "math/rand" + "net/url" + + "modelproxy/types" +) + +// UrlPoller can generate the destination url by polling the provided base url list +type UrlPoller struct { + OriUrl string + BSL types.BaseSpecList + Seed int +} + +func NewUrlPoller(url string, bsl types.BaseSpecList) *UrlPoller { + if len(bsl) == 0 { + return nil + } + return &UrlPoller{ + OriUrl: url, + BSL: bsl, + Seed: rand.Intn(len(bsl)), + } +} + +func NewUrlPollerWithKey(url string, modelUrls []string, modelKey string) *UrlPoller { + if len(modelUrls) == 0 { + return nil + } + bsl := make(types.BaseSpecList, 0, len(modelUrls)) + for _, v := range modelUrls { + bsl = append(bsl, &types.BaseSpecStatistic{ + BaseSpec: &types.BaseSpec{ + URL: v, + Key: modelKey, + }, + }) + } + return NewUrlPoller(url, bsl) +} + +// GetUrlAndKey will return the new url and the key of the base spec +func (ug *UrlPoller) GetUrlAndKey() (*url.URL, string) { + baseSpec := ug.BSL[ug.Seed%len(ug.BSL)] + newUrl := ReplaceBaseURL(ug.OriUrl, baseSpec.BaseSpec) + ug.Seed += 1 + return newUrl, baseSpec.Key +} diff --git a/src/model-proxy/src/proxy/model_server.go b/src/model-proxy/src/proxy/model_server.go new file mode 100644 index 00000000..ed278f8e --- /dev/null +++ b/src/model-proxy/src/proxy/model_server.go @@ -0,0 +1,264 @@ +package proxy + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "strings" + "time" +) + +const TARGET_JOB_TAG = "model-serving" +const REST_SERVER_PATH = "rest-server" +const JOB_SERVER_PATH = "job-server" + +var httpClient = &http.Client{Timeout: 10 * time.Second} + +// ListModelServingJobs returns a list of model serving jobs with the given request +func ListModelServingJobs(restServerUrl string, restServerToken string) ([]string, error) { + url := fmt.Sprintf("%s/api/v2/jobs?state=RUNNING", restServerUrl) + + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + restServerToken = strings.TrimPrefix(restServerToken, "Bearer ") + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", restServerToken)) + + resp, err := httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to GET jobs from %s: %w", url, err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("non-2xx response from %s: %d - %s", url, resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read jobs response: %w", err) + } + + // Expected: array of jobs with fields { debugId, name, username, state } + var jobs []struct { + DebugId string `json:"debugId"` + Name string `json:"name"` + Username string `json:"username"` + State string `json:"state"` + } + if err := json.Unmarshal(body, &jobs); err != nil { + // Try to provide a helpful error if the shape is unexpected + return nil, fmt.Errorf("failed to parse jobs JSON: %w", err) + } + + result := make([]string, 0, len(jobs)) + for _, j := range jobs { + if j.Name == "" { + continue + } + if strings.Contains(j.Name, TARGET_JOB_TAG) { + // Use the same identifier as TS: username~name + jobId := fmt.Sprintf("%s~%s", j.Username, j.Name) + result = append(result, jobId) + } + } + + return result, nil +} + +// return the job server url +func GetJobServerUrl(restServerUrl string, restServerToken string, jobId string) (string, error) { + if restServerUrl == "" { + return "", fmt.Errorf("empty restServerUrl") + } + if jobId == "" { + return "", fmt.Errorf("empty jobId") + } + + // Ensure restServerUrl doesn't end with slash + restServerUrl = strings.TrimRight(restServerUrl, "/") + url := fmt.Sprintf("%s/api/v2/jobs/%s", restServerUrl, jobId) + + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return "", fmt.Errorf("failed to create request: %w", err) + } + restServerToken = strings.TrimPrefix(restServerToken, "Bearer ") + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", restServerToken)) + + resp, err := httpClient.Do(req) + if err != nil { + return "", fmt.Errorf("failed to GET job details from %s: %w", url, err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return "", fmt.Errorf("non-2xx response from %s: %d - %s", url, resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("failed to read job details response: %w", err) + } + + // Parse expected job details: + // { taskRoles: { roleName: { taskStatuses: [ { containerIp: "...", containerPorts: { "http": "port" } } ] } } } + var details struct { + TaskRoles map[string]struct { + TaskStatuses []struct { + ContainerIp string `json:"containerIp"` + ContainerPorts map[string]string `json:"containerPorts"` + } `json:"taskStatuses"` + } `json:"taskRoles"` + } + if err := json.Unmarshal(body, &details); err != nil { + return "", fmt.Errorf("failed to parse job details JSON: %w", err) + } + + if len(details.TaskRoles) == 0 { + return "", fmt.Errorf("no taskRoles found for job %s", jobId) + } + + jobServerPath := strings.Replace(restServerUrl, REST_SERVER_PATH, JOB_SERVER_PATH, 1) + // Pick first role, first taskStatus + for _, role := range details.TaskRoles { + if len(role.TaskStatuses) == 0 { + continue + } + ts := role.TaskStatuses[0] + if ts.ContainerIp == "" { + return "", fmt.Errorf("no containerIp found for job %s", jobId) + } + // prefer http port + port, ok := ts.ContainerPorts["http"] + if !ok || port == "" { + return "", fmt.Errorf("no http port found for job %s", jobId) + } + jobServerUrl := fmt.Sprintf("%s/%s:%s", jobServerPath, ts.ContainerIp, port) + return jobServerUrl, nil + } + + return "", fmt.Errorf("no taskStatuses found for job %s", jobId) +} + +// return model names list +func listModels(jobServerUrl string, token string) ([]string, error) { + if jobServerUrl == "" { + return nil, fmt.Errorf("empty jobServerUrl") + } + // ensure no trailing slash + jobServerUrl = strings.TrimRight(jobServerUrl, "/") + url := fmt.Sprintf("%s/v1/models", jobServerUrl) + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request for %s: %w", url, err) + } + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) + + resp, err := httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to GET models from %s: %w", url, err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("non-2xx response from %s: %d - %s", url, resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read models response: %w", err) + } + + // Try the expected shape: { data: [{ id: "model1" }, ...] } + var wrapper struct { + Data []struct { + ID string `json:"id"` + } `json:"data"` + } + if err := json.Unmarshal(body, &wrapper); err == nil && len(wrapper.Data) > 0 { + result := make([]string, 0, len(wrapper.Data)) + for _, m := range wrapper.Data { + if m.ID != "" { + result = append(result, m.ID) + } + } + return result, nil + } + + // Fallback: maybe the endpoint returns an array of objects or strings + var arr []map[string]interface{} + if err := json.Unmarshal(body, &arr); err == nil && len(arr) > 0 { + result := []string{} + for _, item := range arr { + if idv, ok := item["id"].(string); ok && idv != "" { + result = append(result, idv) + continue + } + if namev, ok := item["name"].(string); ok && namev != "" { + result = append(result, namev) + continue + } + } + if len(result) > 0 { + return result, nil + } + } + + // Fallback: maybe it's an array of strings + var strArr []string + if err := json.Unmarshal(body, &strArr); err == nil && len(strArr) > 0 { + return strArr, nil + } + + // No models found + return nil, fmt.Errorf("no models found at %s", url) +} + +// return JobURL => models +func GetJobModelsMapping(req *http.Request, modelToken string) (map[string][]string, error) { + mapping := make(map[string][]string) + + if req == nil || req.Host == "" { + return mapping, fmt.Errorf("invalid request or empty host") + } + restBase := fmt.Sprintf("https://%s/rest-server", req.Host) + + restServerToken := req.Header.Get("Authorization") + jobIDs, err := ListModelServingJobs(restBase, restServerToken) + if err != nil { + return mapping, fmt.Errorf("failed to list model serving jobs: %w", err) + } + + for _, jobId := range jobIDs { + jobServerUrl, err := GetJobServerUrl(restBase, restServerToken, jobId) + if err != nil { + // skip this job but continue with others + log.Printf("[-] Error: failed to get job server URL for job %s: %s\n", jobId, err) + continue + } + // + models, err := listModels(jobServerUrl, modelToken) + if err != nil { + // skip if cannot list models + log.Printf("[-] Error: failed to list models for job %s: %s\n", jobId, err) + continue + } + for _, model := range models { + // map model name -> [jobServerUrl] + if _, ok := mapping[model]; !ok { + mapping[model] = []string{} + } + mapping[model] = append(mapping[model], jobServerUrl) + } + } + + return mapping, nil +} diff --git a/src/model-proxy/src/proxy/proxy.go b/src/model-proxy/src/proxy/proxy.go new file mode 100644 index 00000000..2343006b --- /dev/null +++ b/src/model-proxy/src/proxy/proxy.go @@ -0,0 +1,246 @@ +package proxy + +import ( + "bytes" + "encoding/json" + "errors" + "io" + "log" + "net/http" + "net/http/httputil" + "sort" + "strconv" + "strings" + "time" + + "modelproxy/trace" + "modelproxy/types" +) + +// hookWriter is a wrapper of http.ResponseWriter, which can get the response body and status code, and does not dis +type hookWriter struct { + http.ResponseWriter + status int + body []string +} + +// Implement the Write method of http.ResponseWriter. It can record the response body +func (w *hookWriter) Write(data []byte) (int, error) { + w.body = append(w.body, string(data)) + // Disable nginx buffering (do this before first write) + if len(w.body) == 1 { // First write + w.Header().Set("X-Accel-Buffering", "no") + } + + n, err := w.ResponseWriter.Write(data) + // Flush the response writer immediately if it implements http.Flusher + if flusher, ok := w.ResponseWriter.(http.Flusher); ok { + flusher.Flush() + } + return n, err +} + +// Implement the WriteHeader method of http.ResponseWriter. It can record the status code +func (w *hookWriter) WriteHeader(statusCode int) { + w.status = statusCode + w.ResponseWriter.WriteHeader(statusCode) +} + +// ProxyHandler is the key struct for proxy server +type ProxyHandler struct { + authenticator *RestServerAuthenticator + port int + maxRetries int + // traceRelatedKeys is the keys that will be logged in trace, but will be filtered in the api request + traceRelatedKeys map[string]struct{} +} + +// NewProxyHandler create a new ProxyHandler according to the config +func NewProxyHandler(config *types.Config) *ProxyHandler { + traceRelatedKeys := make(map[string]struct{}) + for _, key := range config.Log.TraceRelatedKeys { + traceRelatedKeys[key] = struct{}{} + } + + return &ProxyHandler{ + authenticator: NewRestServerAuthenticator(nil, config.Server.ModelKey), + port: config.Server.Port, + maxRetries: config.Server.MaxRetries, + traceRelatedKeys: traceRelatedKeys, + } +} + +// ReverseProxyHandler act as a reverse proxy, it will redirect the request to the destination website and return the response +func (ph *ProxyHandler) ReverseProxyHandler(w http.ResponseWriter, r *http.Request) (string, []string, bool) { + // handle /v1/models + if r.URL.Path == "/v1/models" { + model2Url, err := GetJobModelsMapping(r, ph.authenticator.modelKey) + if err != nil { + log.Printf("[-] Error: failed to get models mapping: %v\n", err) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return "", nil, false + } + // Update the ph.authenticator + token := r.Header.Get("Authorization") + token = strings.Replace(token, "Bearer ", "", 1) + ph.authenticator.UpdateTokenModels(token, model2Url) + + // convert models list to OpenAI style list and write it to w + ids := make([]string, 0, len(model2Url)) + for id := range model2Url { + ids = append(ids, id) + } + sort.Strings(ids) + + list := map[string]interface{}{ + "object": "list", + "data": make([]map[string]interface{}, 0, len(ids)), + } + for _, id := range ids { + item := map[string]interface{}{ + "id": id, + "object": "model", + // intentionally not including created or owned_by + } + list["data"] = append(list["data"].([]map[string]interface{}), item) + } + + out, err := json.Marshal(list) + if err != nil { + log.Printf("[-] Error: failed to marshal models list: %v\n", err) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return "", nil, false + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if _, err := w.Write(out); err != nil { + log.Printf("[-] Error: failed to write response: %v\n", err) + } + // We've handled the response, do not continue proxying this request + return "", nil, false + } + log.Printf("[*] receive a request from %s\n", r.RemoteAddr) + + // filter the request body and check whether the request should be traced + rawReqBody, newReqBody, data, shouldTraced := ph.requestBodyFilter(r) + + // check the key of the request + ok, modelUrls := ph.authenticator.AuthenticateReq(r, data) + + if !ok { + log.Printf("[-] Error: unauthorized request from %s\n", r.RemoteAddr) + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return "", nil, false + } + + // get the url poller to generate and poll the destination url + urlPoller := NewUrlPollerWithKey(r.URL.String(), modelUrls, ph.authenticator.modelKey) + if urlPoller == nil { + log.Printf("[-] Error: cannot get the url poller: \n\trawReqBody: %s\n\tURL: %s\n", rawReqBody, r.URL.String()) + proxy := &httputil.ReverseProxy{Director: func(req *http.Request) {}} + proxy.ServeHTTP(w, r) + return rawReqBody, nil, false + } + + responseWriter := &hookWriter{ResponseWriter: w, body: make([]string, 0, 1)} + director := func(req *http.Request) { + // get the new destination url and the related key + newUrl, curkey := urlPoller.GetUrlAndKey() + log.Printf("[*] redirect to %s\n", newUrl.String()) + + if newUrl == nil { + log.Println("[-] Error: cannot get the destinaton url") + return + } + req.Header.Set("Content-Length", strconv.Itoa(len(newReqBody))) + // ket setting for openai spec endpoints + req.Header.Set("Authorization", "Bearer "+curkey) + // ket setting for azure spec endpoints + req.Header.Set("Api-key", curkey) + + req.Host = newUrl.Host + req.URL = newUrl + + // the request body has been read out, so we need to reset the request body + req.Body = io.NopCloser(bytes.NewBufferString(newReqBody)) + req.ContentLength = int64(len(newReqBody)) + } + + // create the reverse proxy + proxy := &httputil.ReverseProxy{Director: director} + retries := 0 + modifyResponse := func(resp *http.Response) error { + // Handle error response here + // If the response is an error response, we will retry the request to another destination url + if resp.StatusCode >= 400 && retries < ph.maxRetries { + log.Printf("[-] receive error response %s\n", resp.Status) + retries++ + time.Sleep(100 * time.Millisecond) + // clear the response body and status code in the responseWriter + responseWriter.body = make([]string, 0, 1) + proxy.ServeHTTP(responseWriter, r) + return errors.New("retry") + } else { + return nil + } + + } + proxy.ModifyResponse = modifyResponse + proxy.ErrorHandler = func(http.ResponseWriter, *http.Request, error) {} + proxy.ServeHTTP(responseWriter, r) + + log.Printf("[*] receive the destination website response\n") + + return rawReqBody, responseWriter.body, shouldTraced +} + +// requestBodyFilter filter the request body, return the original request body, the filtered request body and whether the request should be traced (only trace chat request) +func (ph *ProxyHandler) requestBodyFilter(r *http.Request) (string, string, map[string]interface{}, bool) { + reqBody, _ := io.ReadAll(r.Body) + // check whether the request is a completion request + if !strings.Contains(r.URL.Path, "completions") { + log.Printf("[-] Request from %s is not a completion request\n", r.URL.Path) + return string(reqBody), string(reqBody), nil, false + } + + var data map[string]interface{} + + // parse request body + if err := json.Unmarshal(reqBody, &data); err != nil { + log.Printf("[-] Filter Error: %s\n", err) + return string(reqBody), string(reqBody), nil, false + } + for k := range data { + if _, ok := ph.traceRelatedKeys[k]; ok { + delete(data, k) + } + } + filtedBody, _ := json.Marshal(data) + + // check whether the request is a chat request + _, ok := data["messages"] + return string(reqBody), string(filtedBody), data, ok +} + +// ProxyHandlerWithLogger return a http.HandlerFunc, which can be used to start the proxy server and call the logger.Record method +func (ph *ProxyHandler) ProxyHandlerWithLogger(logger trace.TraceLogger) http.HandlerFunc { + res := func(w http.ResponseWriter, r *http.Request) { + log.Println("-----------------------------------------------") + req, resp, shouldTraced := ph.ReverseProxyHandler(w, r) + if shouldTraced { + log.Printf("[*] Debug: response: %s\n", resp) + logger.Record(req, resp) + } + } + return res +} + +// StartProxy start the proxy server +func (ph *ProxyHandler) StartProxy(logger trace.TraceLogger) { + log.Printf("[*] Start proxy at port \n") + handler := ph.ProxyHandlerWithLogger(logger) + if err := http.ListenAndServe(":"+strconv.Itoa(ph.port), handler); err != nil { + log.Fatal("[*] Server error: " + err.Error()) + } +} diff --git a/src/model-proxy/src/proxy/utils.go b/src/model-proxy/src/proxy/utils.go new file mode 100644 index 00000000..8b83706c --- /dev/null +++ b/src/model-proxy/src/proxy/utils.go @@ -0,0 +1,63 @@ +package proxy + +import ( + "log" + "net/http" + "net/url" + "path" + "strings" + + "modelproxy/types" +) + +// parse the RESTFul-style url and get the value of the given key +// e.g. .../deployments/abc/predictions -> GetArgsFromRestfulUrl(url, "deployments") -> abc +func GetArgsFromRestfulUrl(url, key string) string { + splitURL := strings.Split(url, "/") + for i, part := range splitURL { + if part == key && i+1 < len(splitURL) { + return splitURL[i+1] + } + } + return "" +} + +// ReplaceBaseURL will replace the base url of the original url with the new base url +// e.g. original url: http://localhost:8999/openai/deployments/gpt-4-32k/chat/completions?api-version=placeholder +// new base url: https://XXX.openai.azure.com/openai/deployments/gpt-4-32k/chat/completions?api-version=2023-08-01-preview +func ReplaceBaseURL(originalURL string, baseSpec *types.BaseSpec) *url.URL { + parsedURL, err := url.Parse(originalURL) + if err != nil { + log.Printf("Failed to parse original URL: %v", err) + return nil + } + + newURL, err := url.Parse(baseSpec.URL) + if err != nil { + log.Printf("Failed to parse new base URL: %v", err) + return nil + } + log.Printf("****************************************** %s\n", parsedURL.Path) + + newURL.Path = path.Join(newURL.Path, parsedURL.Path) + log.Printf("****************************************** %s\n", newURL.Path) + //replace the api-version + queryParams := newURL.Query() + if baseSpec.Version != "" { + queryParams.Set("api-version", baseSpec.Version) + // update into newURL + newURL.RawQuery = queryParams.Encode() + } + return newURL +} + +func GetKeyFromRequest(request *http.Request) string { + azureKey := request.Header.Get("Authorization") + azureKey = strings.Replace(azureKey, "Bearer ", "", 1) + if azureKey != "" { + return azureKey + } + openaiKey := request.Header.Get("Api-key") + openaiKey = strings.Replace(openaiKey, "Bearer ", "", 1) + return openaiKey +} diff --git a/src/model-proxy/src/scripts/build-docker.sh b/src/model-proxy/src/scripts/build-docker.sh new file mode 100644 index 00000000..03db6db4 --- /dev/null +++ b/src/model-proxy/src/scripts/build-docker.sh @@ -0,0 +1,9 @@ +#!/bin/bash +# This script is used to build the project + +# Go to the parent directory of the root directory of the project +SCRIPT_PATH=$(cd $(dirname $0) && pwd -P) +cd $SCRIPT_PATH/../.. + +# Build the docker image +docker -t modelproxy:latest -f modelproxy/dockerfile/deploy.dockerfile . \ No newline at end of file diff --git a/src/model-proxy/src/scripts/build.sh b/src/model-proxy/src/scripts/build.sh new file mode 100755 index 00000000..2db248d8 --- /dev/null +++ b/src/model-proxy/src/scripts/build.sh @@ -0,0 +1 @@ +go build -o ./bin/modelproxy \ No newline at end of file diff --git a/src/model-proxy/src/scripts/kill.sh b/src/model-proxy/src/scripts/kill.sh new file mode 100755 index 00000000..de330373 --- /dev/null +++ b/src/model-proxy/src/scripts/kill.sh @@ -0,0 +1 @@ +ps -ef | grep modelproxy | grep -v grep | awk '{print $2}' | xargs kill -9 \ No newline at end of file diff --git a/src/model-proxy/src/scripts/run-docker.sh b/src/model-proxy/src/scripts/run-docker.sh new file mode 100644 index 00000000..49323c0b --- /dev/null +++ b/src/model-proxy/src/scripts/run-docker.sh @@ -0,0 +1,49 @@ +#!/bin/bash +# This script is used to run the docker after building + +usage (){ + echo "Usage: $0 [-p ] [-n ] [-c ] [-h]" + echo " -p : the port of the server : the port of the docker" + echo " -c : Path to the proxy config file" + echo " -h : Show usage" + exit 1 +} + +# Set the default values +PORT=9999:8999 +CONFIG=./bin/config.json + +# Parse the command line arguments +while getopts "p:c:h" opt; do + case $opt in + p) + PORT=$OPTARG + ;; + c) + CONFIG=$OPTARG + ;; + h) + usage + ;; + \?) + echo "Invalid option: -$OPTARG" >&2 + usage + ;; + esac +done + +# Check if the config file exists +if [ ! -f $CONFIG ]; then + echo "Config file not found: $CONFIG" + usage +fi + +# Get the absolute path of the config file +CONFIG=$(cd $(dirname $CONFIG) && pwd -P)/$(basename $CONFIG) + +# Run the docker +docker run -d \ + -p $PORT \ + --mount type=bind,source=$CONFIG,target=/config.json \ + modelproxy:latest \ + ./bin/modelproxy --config /config.json \ No newline at end of file diff --git a/src/model-proxy/src/scripts/runall.sh b/src/model-proxy/src/scripts/runall.sh new file mode 100755 index 00000000..5f0f315b --- /dev/null +++ b/src/model-proxy/src/scripts/runall.sh @@ -0,0 +1,2 @@ +mkdir -p ./Traces +nohup ./bin/modelproxy --config ./bin/config.json >> ./Traces/all.log 2>&1 & \ No newline at end of file diff --git a/src/model-proxy/src/trace/trace.go b/src/model-proxy/src/trace/trace.go new file mode 100644 index 00000000..fa2bef74 --- /dev/null +++ b/src/model-proxy/src/trace/trace.go @@ -0,0 +1,127 @@ +package trace + +import ( + "bytes" + "log" + "os" + "path" + "strings" + "sync" + "time" + + "modelproxy/types" +) + +// TraceLogger is the interface for trace logger +type TraceLogger interface { + // Record the trace + Record(req string, resp []string) +} + +// JsonFileLogger is a logger that log the trace into json files, it implements TraceLogger interface +// The log file is named by date, and the log file will be uploaded to Azure Blob Storage everyday +type JsonFileLogger struct { + localFolderPath string + currentDay string + lock sync.Mutex +} + +// NewJsonFileLogger create a new JsonFileLogger +func NewJsonFileLogger(folderPath string) *JsonFileLogger { + if _, err := os.Stat(folderPath); os.IsNotExist(err) { + os.Mkdir(folderPath, os.ModePerm) + } + date := time.Now().Format("2006-01-02") + return &JsonFileLogger{localFolderPath: folderPath, currentDay: date} +} + +func (j *JsonFileLogger) Record(req string, resp []string) { + go j.record(req, resp) +} + +// record the trace +func (j *JsonFileLogger) record(req string, resp []string) { + if req == "" || len(resp) == 0 { + return + } + + reqSturct := &types.Request{} + if err := reqSturct.Unmarshal([]byte(req)); err != nil { + log.Printf("[-] Error: %s\nrequest: %s\nresponse: %s\n", err, req, resp) + return + } + + numChoice := reqSturct.Choices + if numChoice == 0 { + numChoice = 1 + } + + respStr := make([]string, numChoice) + modelName := "" + if len(resp) > 1 { + // If len(resp) > 1, the resp is a 'stream' + respConcat := make([]bytes.Buffer, numChoice) + for _, r := range resp { + lines := strings.Split(r, "\n") + for _, line := range lines { + if strings.ReplaceAll(line, " ", "") == "" { + continue + } + line = strings.TrimPrefix(line, "data: ") + + if line == "[DONE]" { + continue + } + responseChunc := types.ResponseChunk{} + if err := responseChunc.Unmarshal([]byte(line)); err != nil { + log.Printf("[-] Error: %s\nrequest: %s\nresponse: %s\n", err, req, resp) + return + } + if modelName == "" { + modelName = responseChunc.Model + } + for choice := range responseChunc.Choices { + respConcat[choice].WriteString(responseChunc.Choices[choice].Delta.Content) + } + } + } + for i := range respConcat { + respStr[i] = respConcat[i].String() + } + } else { + // If len(resp) == 1, the resp is not a 'stream' + response := types.Response{} + if err := response.Unmarshal([]byte(resp[0])); err != nil { + log.Printf("[-] Error: %s\nrequest: %s\nresponse: %s\n", err, req, resp) + return + } + modelName = response.Model + for choice := range response.Choices { + respStr[choice] = response.Choices[choice].Message.Content + } + } + if modelName != "" { + reqSturct.Model = modelName + } + trace := types.ConvertReqResp2Trace(reqSturct, respStr) + traceStr, err := trace.Marshal() + if err != nil { + log.Printf("[-] Error: %s\nrequest: %s\nresponse: %s\n", err, req, resp) + return + } + + j.lock.Lock() + defer j.lock.Unlock() + + date := time.Now().Format("2006-01-02") + j.currentDay = date + + filePath := path.Join(j.localFolderPath, date+".jsonl") + f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0664) + if err != nil { + panic(err) + } + defer f.Close() + // append the trace to the file + f.WriteString(string(traceStr) + "\n") +} diff --git a/src/model-proxy/src/types/config_types.go b/src/model-proxy/src/types/config_types.go new file mode 100644 index 00000000..3ca2733c --- /dev/null +++ b/src/model-proxy/src/types/config_types.go @@ -0,0 +1,94 @@ +package types + +import ( + "encoding/json" + "os" +) + +// Config is the struct for config.json +type Config struct { + Server *Server `json:"server"` + Log *Log `json:"log"` +} + +// Server is the struct for proxy server config +type Server struct { + // Host is the host of the server + Host string `json:"host"` + // Port is the port of the server + Port int `json:"port"` + // MaxRetries is the max retries for the request + MaxRetries int `json:"retry"` + // ModelKey is the model key for the proxy server to access the model server + ModelKey string `json:"model_key"` +} + +// Log is the config for log +type Log struct { + // LogStorage is the log storage config + LogStorage *LogStorage `json:"storage"` + // TraceRelatedKeys is the keys that will be logged in trace, + // which will be used to identify the trace and filtered in the api request + TraceRelatedKeys []string `json:"trace_related_keys"` +} + +// LogStorage is the config for log storage +type LogStorage struct { + LocalFolder string `json:"local_folder,omitempty"` + AzureStorage *AzureStorage `json:"azure_storage,omitempty"` +} + +type AzureStorage struct { + // URL is the url of the azure blob storage, including the sas token + URL string `json:"url"` + Container string `json:"container"` + Path string `json:"path"` +} + +// BaseSpec is the base spec for azure and openai +type BaseSpec struct { + URL string `json:"url"` + Key string `json:"key"` + Version string `json:"version,omitempty"` +} + +// ParseConfig parse the config file into Config struct +func ParseConfig(path string) (*Config, error) { + file, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + var config Config + err = json.Unmarshal(file, &config) + if err != nil { + return nil, err + } + + return &config, nil +} + +// BaseSpecStatistic is the struct for statistic +type BaseSpecStatistic struct { + *BaseSpec + ValidRequestCount int + SuccessCount int + SuccessRate float64 +} + +type BaseSpecList []*BaseSpecStatistic + +// deploy_name -> BaseSpecList +type ModelToBase map[string]BaseSpecList + +// Two reverse maps for chat and embedding +type ReverseMap struct { + Chat ModelToBase + Embedding ModelToBase +} + +// two ReverseMap for Azure and OpenAI +type AllReverseMap struct { + Azure *ReverseMap + OpenAI *ReverseMap +} diff --git a/src/model-proxy/src/types/request_response_types.go b/src/model-proxy/src/types/request_response_types.go new file mode 100644 index 00000000..0200cc2f --- /dev/null +++ b/src/model-proxy/src/types/request_response_types.go @@ -0,0 +1,179 @@ +package types + +import ( + "crypto/rand" + "encoding/hex" + "encoding/json" +) + +type Message struct { + Role string `json:"role"` + Content string `json:"content"` +} + +type Source struct { + SourceName string `json:"source_name"` + Location string `json:"location,omitempty"` +} + +// Request is the struct for request of openai chat. It is used to parse the request body +type Request struct { + Messages []Message `json:"messages"` + Model string `json:"model"` + Temperature float64 `json:"temperature,omitempty"` + Choices int `json:"n,omitempty"` + PresencePenalty float64 `json:"presence_penalty,omitempty"` + TopP float64 `json:"top_p,omitempty"` + FrequencyPenalty float64 `json:"frequency_penalty,omitempty"` + Stream bool `json:"stream,omitempty"` + Stop string `json:"stop,omitempty"` + MaxTokens int `json:"max_tokens,omitempty"` + LogitBias map[string]float64 `json:"logit_bias,omitempty"` + User string `json:"user,omitempty"` + + Source *Source `json:"source,omitempty"` + CategoryInfo *map[string]string `json:"category_info,omitempty"` + Label *[]string `json:"labels,omitempty"` + OtherMetadata string `json:"other_metadata,omitempty"` +} + +func (r *Request) Unmarshal(data []byte) error { + return json.Unmarshal(data, r) +} + +// Response is the struct for response of openai chat +type Response struct { + ID string `json:"id"` + Object string `json:"object"` + Created int64 `json:"created"` + Model string `json:"model"` + Choices []struct { + Index int `json:"index"` + FinishReason string `json:"finish_reason"` + Message Message `json:"message"` + } `json:"choices"` + Usage struct { + PromptTokens int `json:"prompt_tokens"` + CompletionTokens int `json:"completion_tokens"` + TotalTokens int `json:"total_tokens"` + } `json:"usage"` +} + +func (r *Response) Unmarshal(data []byte) error { + return json.Unmarshal(data, r) +} + +// ResponseChunk is the struct for response of openai chat stream +type ResponseChunk struct { + ID string `json:"id"` + Object string `json:"object"` + Created int64 `json:"created"` + Model string `json:"model"` + Choices []struct { + Index int `json:"index"` + FinishReason string `json:"finish_reason"` + Delta struct { + Content string `json:"content"` + } `json:"delta"` + } `json:"choices"` +} + +func (rc *ResponseChunk) Unmarshal(data []byte) error { + return json.Unmarshal(data, rc) +} + +// TraceMessage is the struct for each message in a trace +type TraceMessage struct { + Role string `json:"from"` + Content string `json:"value"` + Score float64 `json:"score"` + ParentID int `json:"parent_id"` + ID int `json:"id"` +} + +type ModelInfo struct { + ModelName string `json:"model_name"` + Temperature float64 `json:"temperature,omitempty"` + Choices int `json:"n,omitempty"` + PresencePenalty float64 `json:"presence_penalty,omitempty"` + TopP float64 `json:"top_p,omitempty"` + FrequencyPenalty float64 `json:"frequency_penalty,omitempty"` + Stream bool `json:"stream,omitempty"` + Stop string `json:"stop,omitempty"` + MaxTokens int `json:"max_tokens,omitempty"` + LogitBias map[string]float64 `json:"logit_bias,omitempty"` + User string `json:"user,omitempty"` +} + +// Trace is the struct for one trace. It is recorded in the json file +type Trace struct { + ID string `json:"id"` + Source *Source `json:"source,omitempty"` + CategoryInfo *map[string]string `json:"category_info,omitempty"` + Other_metadata string `json:"other_metadata,omitempty"` + ModelInfo *ModelInfo `json:"model_info"` + Conversations []*TraceMessage `json:"conversation"` +} + +func (t *Trace) Marshal() ([]byte, error) { + return json.Marshal(t) +} + +// convert a request and reqponse to a trace +func ConvertReqResp2Trace(req *Request, response []string) *Trace { + conversactions := make([]*TraceMessage, 0, len(req.Messages)+len(response)) + for i, msg := range req.Messages { + conversactions = append(conversactions, &TraceMessage{ + Role: msg.Role, + Content: msg.Content, + Score: -1, + ParentID: i - 1, + ID: i, + }) + } + for _, resp := range response { + conversactions = append(conversactions, &TraceMessage{ + Role: "assistant", + Content: resp, + Score: -1, + ParentID: len(req.Messages) - 1, + ID: len(conversactions), + }) + } + + id, _ := generateID() + + trace := &Trace{ + ID: id, + Source: req.Source, + CategoryInfo: req.CategoryInfo, + Other_metadata: req.OtherMetadata, + ModelInfo: &ModelInfo{ + ModelName: req.Model, + Temperature: req.Temperature, + Choices: req.Choices, + PresencePenalty: req.PresencePenalty, + TopP: req.TopP, + FrequencyPenalty: req.FrequencyPenalty, + Stream: req.Stream, + Stop: req.Stop, + MaxTokens: req.MaxTokens, + LogitBias: req.LogitBias, + User: req.User, + }, + Conversations: conversactions, + } + + return trace +} + +// generate a 16bytes id +func generateID() (string, error) { + buf := make([]byte, 16) + _, err := rand.Read(buf) + if err != nil { + return "", err + } + res := hex.EncodeToString(buf) + return res, nil +} From 3178cc419a4dcf7ee061d55b3260ef966da47840 Mon Sep 17 00:00:00 2001 From: Zhongxin Guo Date: Sun, 7 Sep 2025 19:21:49 -0700 Subject: [PATCH 02/12] fix copilot's suggestion --- src/model-proxy/src/proxy/authenticator.go | 12 ++++++++++-- src/model-proxy/src/trace/trace.go | 7 ++++++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/src/model-proxy/src/proxy/authenticator.go b/src/model-proxy/src/proxy/authenticator.go index a3fa4f02..d75064c7 100644 --- a/src/model-proxy/src/proxy/authenticator.go +++ b/src/model-proxy/src/proxy/authenticator.go @@ -6,6 +6,14 @@ import ( "strings" ) +// obfuscateToken returns a truncated identifier for safely logging tokens. +func obfuscateToken(token string) string { + if len(token) <= 6 { + return "" + } + return token[:3] + "***" + token[len(token)-3:] +} + type RestServerAuthenticator struct { // rest-server token => model names => model urls tokenToModels map[string]map[string][]string @@ -38,10 +46,10 @@ func (ra *RestServerAuthenticator) AuthenticateReq(req *http.Request, reqBody ma availableModels, ok := ra.tokenToModels[token] if !ok { // request to RestServer to get the models - log.Printf("[-] Error: token %s not found in the authenticator\n", token) + log.Printf("[-] Error: token %s not found in the authenticator\n", obfuscateToken(token)) availableModels, err := GetJobModelsMapping(req, ra.modelKey) if err != nil { - log.Printf("[-] Error: failed to get models for token %s: %v\n", token, err) + log.Printf("[-] Error: failed to get models for token %s: %v\n", obfuscateToken(token), err) return false, []string{} } ra.tokenToModels[token] = availableModels diff --git a/src/model-proxy/src/trace/trace.go b/src/model-proxy/src/trace/trace.go index fa2bef74..f501c794 100644 --- a/src/model-proxy/src/trace/trace.go +++ b/src/model-proxy/src/trace/trace.go @@ -12,6 +12,8 @@ import ( "modelproxy/types" ) +const maxNumChoice = 10 // Reasonable upper limit for number of choices per request. Adjust as needed. + // TraceLogger is the interface for trace logger type TraceLogger interface { // Record the trace @@ -52,9 +54,12 @@ func (j *JsonFileLogger) record(req string, resp []string) { } numChoice := reqSturct.Choices - if numChoice == 0 { + if numChoice <= 0 { numChoice = 1 } + if numChoice > maxNumChoice { + numChoice = maxNumChoice + } respStr := make([]string, numChoice) modelName := "" From fa64cbc11e70852d260315d113dcc07cab097c26 Mon Sep 17 00:00:00 2001 From: Zhongxin Guo Date: Sun, 7 Sep 2025 20:01:38 -0700 Subject: [PATCH 03/12] fix copilot's suggestion --- src/model-proxy/src/main.go | 7 --- src/model-proxy/src/proxy/proxy.go | 2 +- src/model-proxy/src/scripts/build-docker.sh | 9 ---- src/model-proxy/src/scripts/build.sh | 1 - src/model-proxy/src/scripts/kill.sh | 1 - src/model-proxy/src/scripts/run-docker.sh | 49 ------------------- src/model-proxy/src/scripts/runall.sh | 2 - src/model-proxy/src/trace/trace.go | 20 ++++---- .../src/types/request_response_types.go | 2 +- 9 files changed, 12 insertions(+), 81 deletions(-) delete mode 100644 src/model-proxy/src/scripts/build-docker.sh delete mode 100755 src/model-proxy/src/scripts/build.sh delete mode 100755 src/model-proxy/src/scripts/kill.sh delete mode 100644 src/model-proxy/src/scripts/run-docker.sh delete mode 100755 src/model-proxy/src/scripts/runall.sh diff --git a/src/model-proxy/src/main.go b/src/model-proxy/src/main.go index acd111cd..7a94095c 100644 --- a/src/model-proxy/src/main.go +++ b/src/model-proxy/src/main.go @@ -2,8 +2,6 @@ package main import ( "flag" - "fmt" - "os" "modelproxy/proxy" "modelproxy/trace" @@ -22,11 +20,6 @@ func init() { flag.IntVar(&maxRetries, "retry", 5, "max retries for the request to the model server") flag.StringVar(&logFileDir, "logdir", "./logs", "path to the log file directory") flag.StringVar(&modelKey, "modelkey", "", "comma-separated model keys for the proxy server") - - flag.Usage = func() { - fmt.Fprintf(os.Stderr, "Usage of modelproxy:\n") - flag.PrintDefaults() - } } func main() { diff --git a/src/model-proxy/src/proxy/proxy.go b/src/model-proxy/src/proxy/proxy.go index 2343006b..74b0fb89 100644 --- a/src/model-proxy/src/proxy/proxy.go +++ b/src/model-proxy/src/proxy/proxy.go @@ -238,7 +238,7 @@ func (ph *ProxyHandler) ProxyHandlerWithLogger(logger trace.TraceLogger) http.Ha // StartProxy start the proxy server func (ph *ProxyHandler) StartProxy(logger trace.TraceLogger) { - log.Printf("[*] Start proxy at port \n") + log.Printf("[*] Start proxy at port %d\n", ph.port) handler := ph.ProxyHandlerWithLogger(logger) if err := http.ListenAndServe(":"+strconv.Itoa(ph.port), handler); err != nil { log.Fatal("[*] Server error: " + err.Error()) diff --git a/src/model-proxy/src/scripts/build-docker.sh b/src/model-proxy/src/scripts/build-docker.sh deleted file mode 100644 index 03db6db4..00000000 --- a/src/model-proxy/src/scripts/build-docker.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash -# This script is used to build the project - -# Go to the parent directory of the root directory of the project -SCRIPT_PATH=$(cd $(dirname $0) && pwd -P) -cd $SCRIPT_PATH/../.. - -# Build the docker image -docker -t modelproxy:latest -f modelproxy/dockerfile/deploy.dockerfile . \ No newline at end of file diff --git a/src/model-proxy/src/scripts/build.sh b/src/model-proxy/src/scripts/build.sh deleted file mode 100755 index 2db248d8..00000000 --- a/src/model-proxy/src/scripts/build.sh +++ /dev/null @@ -1 +0,0 @@ -go build -o ./bin/modelproxy \ No newline at end of file diff --git a/src/model-proxy/src/scripts/kill.sh b/src/model-proxy/src/scripts/kill.sh deleted file mode 100755 index de330373..00000000 --- a/src/model-proxy/src/scripts/kill.sh +++ /dev/null @@ -1 +0,0 @@ -ps -ef | grep modelproxy | grep -v grep | awk '{print $2}' | xargs kill -9 \ No newline at end of file diff --git a/src/model-proxy/src/scripts/run-docker.sh b/src/model-proxy/src/scripts/run-docker.sh deleted file mode 100644 index 49323c0b..00000000 --- a/src/model-proxy/src/scripts/run-docker.sh +++ /dev/null @@ -1,49 +0,0 @@ -#!/bin/bash -# This script is used to run the docker after building - -usage (){ - echo "Usage: $0 [-p ] [-n ] [-c ] [-h]" - echo " -p : the port of the server : the port of the docker" - echo " -c : Path to the proxy config file" - echo " -h : Show usage" - exit 1 -} - -# Set the default values -PORT=9999:8999 -CONFIG=./bin/config.json - -# Parse the command line arguments -while getopts "p:c:h" opt; do - case $opt in - p) - PORT=$OPTARG - ;; - c) - CONFIG=$OPTARG - ;; - h) - usage - ;; - \?) - echo "Invalid option: -$OPTARG" >&2 - usage - ;; - esac -done - -# Check if the config file exists -if [ ! -f $CONFIG ]; then - echo "Config file not found: $CONFIG" - usage -fi - -# Get the absolute path of the config file -CONFIG=$(cd $(dirname $CONFIG) && pwd -P)/$(basename $CONFIG) - -# Run the docker -docker run -d \ - -p $PORT \ - --mount type=bind,source=$CONFIG,target=/config.json \ - modelproxy:latest \ - ./bin/modelproxy --config /config.json \ No newline at end of file diff --git a/src/model-proxy/src/scripts/runall.sh b/src/model-proxy/src/scripts/runall.sh deleted file mode 100755 index 5f0f315b..00000000 --- a/src/model-proxy/src/scripts/runall.sh +++ /dev/null @@ -1,2 +0,0 @@ -mkdir -p ./Traces -nohup ./bin/modelproxy --config ./bin/config.json >> ./Traces/all.log 2>&1 & \ No newline at end of file diff --git a/src/model-proxy/src/trace/trace.go b/src/model-proxy/src/trace/trace.go index f501c794..f08113ce 100644 --- a/src/model-proxy/src/trace/trace.go +++ b/src/model-proxy/src/trace/trace.go @@ -47,13 +47,13 @@ func (j *JsonFileLogger) record(req string, resp []string) { return } - reqSturct := &types.Request{} - if err := reqSturct.Unmarshal([]byte(req)); err != nil { + reqStruct := &types.Request{} + if err := reqStruct.Unmarshal([]byte(req)); err != nil { log.Printf("[-] Error: %s\nrequest: %s\nresponse: %s\n", err, req, resp) return } - numChoice := reqSturct.Choices + numChoice := reqStruct.Choices if numChoice <= 0 { numChoice = 1 } @@ -77,16 +77,16 @@ func (j *JsonFileLogger) record(req string, resp []string) { if line == "[DONE]" { continue } - responseChunc := types.ResponseChunk{} - if err := responseChunc.Unmarshal([]byte(line)); err != nil { + responseChunk := types.ResponseChunk{} + if err := responseChunk.Unmarshal([]byte(line)); err != nil { log.Printf("[-] Error: %s\nrequest: %s\nresponse: %s\n", err, req, resp) return } if modelName == "" { - modelName = responseChunc.Model + modelName = responseChunk.Model } - for choice := range responseChunc.Choices { - respConcat[choice].WriteString(responseChunc.Choices[choice].Delta.Content) + for choice := range responseChunk.Choices { + respConcat[choice].WriteString(responseChunk.Choices[choice].Delta.Content) } } } @@ -106,9 +106,9 @@ func (j *JsonFileLogger) record(req string, resp []string) { } } if modelName != "" { - reqSturct.Model = modelName + reqStruct.Model = modelName } - trace := types.ConvertReqResp2Trace(reqSturct, respStr) + trace := types.ConvertReqResp2Trace(reqStruct, respStr) traceStr, err := trace.Marshal() if err != nil { log.Printf("[-] Error: %s\nrequest: %s\nresponse: %s\n", err, req, resp) diff --git a/src/model-proxy/src/types/request_response_types.go b/src/model-proxy/src/types/request_response_types.go index 0200cc2f..7a3958e9 100644 --- a/src/model-proxy/src/types/request_response_types.go +++ b/src/model-proxy/src/types/request_response_types.go @@ -119,7 +119,7 @@ func (t *Trace) Marshal() ([]byte, error) { return json.Marshal(t) } -// convert a request and reqponse to a trace +// convert a request and response to a trace func ConvertReqResp2Trace(req *Request, response []string) *Trace { conversactions := make([]*TraceMessage, 0, len(req.Messages)+len(response)) for i, msg := range req.Messages { From 86d6e9f53a8b1889e0316e0132099a08d427036c Mon Sep 17 00:00:00 2001 From: Zhongxin Guo Date: Wed, 10 Sep 2025 02:04:30 -0700 Subject: [PATCH 04/12] add readme --- src/model-proxy/src/README.md | 39 +++++++++++++++ src/model-proxy/src/config.json.example | 61 ----------------------- src/model-proxy/src/go.sum | 0 src/model-proxy/src/main.go | 2 +- src/model-proxy/src/proxy/model_server.go | 5 +- 5 files changed, 44 insertions(+), 63 deletions(-) delete mode 100644 src/model-proxy/src/config.json.example delete mode 100644 src/model-proxy/src/go.sum diff --git a/src/model-proxy/src/README.md b/src/model-proxy/src/README.md index e69de29b..bf970706 100644 --- a/src/model-proxy/src/README.md +++ b/src/model-proxy/src/README.md @@ -0,0 +1,39 @@ +# Instruction of model-proxy + +## Overview + +Model-proxy is a proxy service to forward requests from clients to different model jobs in LTP cluster. With one base url, client can access different models by specifying different model name in the request path, `model-proxy` service will forward the request to corresponding model job. If there is multiple jobs which are serving the same model, `model-proxy` will do load balancing among these jobs. + +Workflow: + +1. Client sends request to `model-proxy` service to list all models by `/v1/models` endpoint. + - During the list request, `model-proxy` will query LTP REST server to get all model serving jobs which the user can access, and then list all models which are being served by these jobs. + +2. Client sends request to `model-proxy` service to access a specific model by openai sepc api request format, e.g. `POST /v1/chat/completions` with request body containing model name. + - During the access request, `model-proxy` will query LTP REST server to get all model serving jobs which are serving the requested model, and then forward the request to one of these jobs. If there is multiple jobs, `model-proxy` will do load balancing among these jobs. + +## Configuration + +### Requirements + +- LTP model serving jobs should be deployed in the LTP cluster, and these jobs should have the job tag `model-serving`. + +- LTP model serving jobs should support openai spec api, e.g. `/v1/chat/completions` endpoint. And the api key which is configured in model-proxy service should be supported by these endpoints. + +### Binary configuration + +Model-proxy binary can be configured by flags: + +- `--port`: the port that model-proxy service listens on, default is 9999 +- `--retry`: the retry times when forwarding request to model job, default is 5 +- `--logdir`: the directory to store log files, default is `./logs` +- `--modelkey`: the key which is used to request model serving jobs in the LTP cluster. + +### Service configuration + +```yaml +model-proxy: + port: 9999 + retry: 5 + modelkey: "ABCD1234" # the api key to access model serving jobs +``` diff --git a/src/model-proxy/src/config.json.example b/src/model-proxy/src/config.json.example deleted file mode 100644 index b020ce50..00000000 --- a/src/model-proxy/src/config.json.example +++ /dev/null @@ -1,61 +0,0 @@ -{ - "server": { - "host": "0.0.0.0", - "port": 20001, - "retry": 5, - "access_keys": [ - "key1", - "key2" - ] - }, - "log": { - "storage": { - "local_folder": "./Traces", - "azure_storage": { - "url": "https://aimiciusdata.blob.core.windows.net/", - "container": "datastore", - "path": "raw/traces/model_traces" - } - }, - "trace_related_keys": ["source", "category_info", "other_metadata"] - }, - "endpoints": { - "azure_spec": [ - { - "url": "https://aimicius-dev-canada-east.openai.azure.com", - "key": "<>", - "version": "2023-07-01-preview", - "chat": [ - "gpt-4-32k", - "gpt-4", - "gpt-35-turbo" - ], - "embeddings": [ - "text-embedding-ada-002" - ] - }, - { - "url": "https://gcrgpt4aoai5c.openai.azure.com", - "key": "<>", - "version": "2023-06-01-preview", - "chat": [ - "gpt-4-32k", - "gpt-4", - "gpt-35-turbo" - ] - } - ], - "openai_spec": [ - { - "url": "http://localhost:8081/v1", - "key": "<>", - "chat": [ - "llama-2-7b-chat", - "llama-2-13b-chat", - "llama-2-70b-chat", - "llama-2-7b-chat-ark" - ] - } - ] - } -} \ No newline at end of file diff --git a/src/model-proxy/src/go.sum b/src/model-proxy/src/go.sum deleted file mode 100644 index e69de29b..00000000 diff --git a/src/model-proxy/src/main.go b/src/model-proxy/src/main.go index 7a94095c..aea600d6 100644 --- a/src/model-proxy/src/main.go +++ b/src/model-proxy/src/main.go @@ -19,7 +19,7 @@ func init() { flag.IntVar(&port, "port", 9999, "port for the proxy server") flag.IntVar(&maxRetries, "retry", 5, "max retries for the request to the model server") flag.StringVar(&logFileDir, "logdir", "./logs", "path to the log file directory") - flag.StringVar(&modelKey, "modelkey", "", "comma-separated model keys for the proxy server") + flag.StringVar(&modelKey, "modelkey", "", "model key for requsting model serving jobs") } func main() { diff --git a/src/model-proxy/src/proxy/model_server.go b/src/model-proxy/src/proxy/model_server.go index ed278f8e..8d529fe6 100644 --- a/src/model-proxy/src/proxy/model_server.go +++ b/src/model-proxy/src/proxy/model_server.go @@ -10,11 +10,14 @@ import ( "time" ) +// target job tag to identify model serving jobs const TARGET_JOB_TAG = "model-serving" + +// REST server and Job server path segments in the URL const REST_SERVER_PATH = "rest-server" const JOB_SERVER_PATH = "job-server" -var httpClient = &http.Client{Timeout: 10 * time.Second} +var httpClient = &http.Client{Timeout: 120 * time.Second} // ListModelServingJobs returns a list of model serving jobs with the given request func ListModelServingJobs(restServerUrl string, restServerToken string) ([]string, error) { From 66075aa18b3676454949fec13fe8bcee45243c4e Mon Sep 17 00:00:00 2001 From: Zhongxin Guo Date: Wed, 10 Sep 2025 02:14:48 -0700 Subject: [PATCH 05/12] add pylon support --- src/model-proxy/config/model_proxy.py | 7 +++++++ src/model-proxy/deploy/service.yaml | 1 - src/pylon/deploy/pylon-config/location.conf.template | 8 ++++++++ src/pylon/deploy/pylon.yaml.template | 6 +++++- 4 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/model-proxy/config/model_proxy.py b/src/model-proxy/config/model_proxy.py index e6c272e6..2eb988d9 100644 --- a/src/model-proxy/config/model_proxy.py +++ b/src/model-proxy/config/model_proxy.py @@ -6,12 +6,19 @@ def __init__(self, cluster_conf, service_conf, default_service_conf): self.service_conf = service_conf self.default_service_conf = default_service_conf + def get_master_ip(self): + for host_conf in self.cluster_conf["machine-list"]: + if "pai-master" in host_conf and host_conf["pai-master"] == "true": + return host_conf["hostip"] + def validation_pre(self): return True, None def run(self): result = copy.deepcopy(self.default_service_conf) result.update(self.service_conf) + result["host"] = self.get_master_ip() + result["url"] = "http://{0}:{1}".format(self.get_master_ip(), result["port"]) return result def validation_post(self, conf): diff --git a/src/model-proxy/deploy/service.yaml b/src/model-proxy/deploy/service.yaml index 60888da4..a06e63d3 100644 --- a/src/model-proxy/deploy/service.yaml +++ b/src/model-proxy/deploy/service.yaml @@ -29,7 +29,6 @@ start-script: start.sh stop-script: stop.sh delete-script: delete.sh refresh-script: refresh.sh -upgraded-script: upgraded.sh deploy-rules: diff --git a/src/pylon/deploy/pylon-config/location.conf.template b/src/pylon/deploy/pylon-config/location.conf.template index ad9cb86b..5d70dff2 100644 --- a/src/pylon/deploy/pylon-config/location.conf.template +++ b/src/pylon/deploy/pylon-config/location.conf.template @@ -63,6 +63,14 @@ location ~ ^/copilot/api/operation(.*)$ { proxy_send_timeout 2m; } +# Model proxy backend +location ~ ^/model-proxy/(.*)$ { + proxy_pass {{MODEL_PROXY_URI}}/$1$is_args$args; + proxy_connect_timeout 2m; + proxy_read_timeout 2m; + proxy_send_timeout 2m; +} + # # Other web portals # diff --git a/src/pylon/deploy/pylon.yaml.template b/src/pylon/deploy/pylon.yaml.template index bccf392d..75b561cb 100644 --- a/src/pylon/deploy/pylon.yaml.template +++ b/src/pylon/deploy/pylon.yaml.template @@ -66,7 +66,11 @@ spec: {% if "copilot-chat" in cluster_cfg %} - name: COPILOT_CHAT_URI value: {{ cluster_cfg['copilot-chat']['url'] }} -{% endif %} +{% endif %} +{% if "model-proxy" in cluster_cfg %} + - name: MODEL_PROXY_URI + value: {{ cluster_cfg['model-proxy']['url'] }} +{% endif %} - name: K8S_DASHBOARD_URI value: {{ cluster_cfg['layout']['kubernetes']['dashboard-url'] }} {% if cluster_cfg["cluster"]["common"]["cluster-type"] == "yarn" %} From cb8d36dafa9951787b2cab47fe02f599619ddadc Mon Sep 17 00:00:00 2001 From: Zhongxin Guo Date: Wed, 10 Sep 2025 02:24:06 -0700 Subject: [PATCH 06/12] update docker build --- .../build/model-proxy.common.dockerfile | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/model-proxy/build/model-proxy.common.dockerfile b/src/model-proxy/build/model-proxy.common.dockerfile index f5c05c07..4dea6ec5 100644 --- a/src/model-proxy/build/model-proxy.common.dockerfile +++ b/src/model-proxy/build/model-proxy.common.dockerfile @@ -1,7 +1,18 @@ -FROM golang:1.25.0 +# Build stage +FROM golang:1.25.0 AS builder WORKDIR /app COPY ./src /app/model-proxy RUN cd /app/model-proxy && go mod tidy && \ - go build -o /app/bin/modelproxy + CGO_ENABLED=0 GOOS=linux go build -o /app/bin/modelproxy + +# Final stage +FROM ubuntu:latest +WORKDIR /app + +RUN apt-get update + +RUN apt-get upgrade -y + +COPY --from=builder /app/bin/modelproxy /app/bin/modelproxy From a8740bb2a53396d04fdb361e1e7b753156a7e4ce Mon Sep 17 00:00:00 2001 From: Zhongxin Guo Date: Wed, 10 Sep 2025 02:31:58 -0700 Subject: [PATCH 07/12] fix copilot's comment --- src/model-proxy/src/main.go | 2 +- src/model-proxy/src/proxy/authenticator.go | 6 +++++- src/model-proxy/src/proxy/proxy.go | 4 ++-- src/model-proxy/src/trace/trace.go | 4 +++- src/model-proxy/src/types/request_response_types.go | 10 +++++----- 5 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/model-proxy/src/main.go b/src/model-proxy/src/main.go index aea600d6..65cc1c05 100644 --- a/src/model-proxy/src/main.go +++ b/src/model-proxy/src/main.go @@ -19,7 +19,7 @@ func init() { flag.IntVar(&port, "port", 9999, "port for the proxy server") flag.IntVar(&maxRetries, "retry", 5, "max retries for the request to the model server") flag.StringVar(&logFileDir, "logdir", "./logs", "path to the log file directory") - flag.StringVar(&modelKey, "modelkey", "", "model key for requsting model serving jobs") + flag.StringVar(&modelKey, "modelkey", "", "model key for requesting model serving jobs") } func main() { diff --git a/src/model-proxy/src/proxy/authenticator.go b/src/model-proxy/src/proxy/authenticator.go index d75064c7..eeaac95d 100644 --- a/src/model-proxy/src/proxy/authenticator.go +++ b/src/model-proxy/src/proxy/authenticator.go @@ -42,7 +42,11 @@ func (ra *RestServerAuthenticator) AuthenticateReq(req *http.Request, reqBody ma token := req.Header.Get("Authorization") token = strings.Replace(token, "Bearer ", "", 1) // read request body - model := reqBody["model"].(string) + model, ok := reqBody["model"].(string) + if !ok { + log.Printf("[-] Error: 'model' field missing or not a string in request body") + return false, []string{} + } availableModels, ok := ra.tokenToModels[token] if !ok { // request to RestServer to get the models diff --git a/src/model-proxy/src/proxy/proxy.go b/src/model-proxy/src/proxy/proxy.go index 74b0fb89..82f28294 100644 --- a/src/model-proxy/src/proxy/proxy.go +++ b/src/model-proxy/src/proxy/proxy.go @@ -154,9 +154,9 @@ func (ph *ProxyHandler) ReverseProxyHandler(w http.ResponseWriter, r *http.Reque return } req.Header.Set("Content-Length", strconv.Itoa(len(newReqBody))) - // ket setting for openai spec endpoints + // key setting for openai spec endpoints req.Header.Set("Authorization", "Bearer "+curkey) - // ket setting for azure spec endpoints + // key setting for azure spec endpoints req.Header.Set("Api-key", curkey) req.Host = newUrl.Host diff --git a/src/model-proxy/src/trace/trace.go b/src/model-proxy/src/trace/trace.go index f08113ce..4a605e9d 100644 --- a/src/model-proxy/src/trace/trace.go +++ b/src/model-proxy/src/trace/trace.go @@ -31,7 +31,9 @@ type JsonFileLogger struct { // NewJsonFileLogger create a new JsonFileLogger func NewJsonFileLogger(folderPath string) *JsonFileLogger { if _, err := os.Stat(folderPath); os.IsNotExist(err) { - os.Mkdir(folderPath, os.ModePerm) + if err := os.Mkdir(folderPath, os.ModePerm); err != nil { + log.Printf("Failed to create directory %s: %v", folderPath, err) + } } date := time.Now().Format("2006-01-02") return &JsonFileLogger{localFolderPath: folderPath, currentDay: date} diff --git a/src/model-proxy/src/types/request_response_types.go b/src/model-proxy/src/types/request_response_types.go index 7a3958e9..1a560566 100644 --- a/src/model-proxy/src/types/request_response_types.go +++ b/src/model-proxy/src/types/request_response_types.go @@ -121,9 +121,9 @@ func (t *Trace) Marshal() ([]byte, error) { // convert a request and response to a trace func ConvertReqResp2Trace(req *Request, response []string) *Trace { - conversactions := make([]*TraceMessage, 0, len(req.Messages)+len(response)) + conversations := make([]*TraceMessage, 0, len(req.Messages)+len(response)) for i, msg := range req.Messages { - conversactions = append(conversactions, &TraceMessage{ + conversations = append(conversations, &TraceMessage{ Role: msg.Role, Content: msg.Content, Score: -1, @@ -132,12 +132,12 @@ func ConvertReqResp2Trace(req *Request, response []string) *Trace { }) } for _, resp := range response { - conversactions = append(conversactions, &TraceMessage{ + conversations = append(conversations, &TraceMessage{ Role: "assistant", Content: resp, Score: -1, ParentID: len(req.Messages) - 1, - ID: len(conversactions), + ID: len(conversations), }) } @@ -161,7 +161,7 @@ func ConvertReqResp2Trace(req *Request, response []string) *Trace { LogitBias: req.LogitBias, User: req.User, }, - Conversations: conversactions, + Conversations: conversations, } return trace From 4ac4e0cabbec8729a6fb4cf0be3b35dbeeb7f6f2 Mon Sep 17 00:00:00 2001 From: Zhongxin Guo Date: Wed, 10 Sep 2025 21:07:30 -0700 Subject: [PATCH 08/12] fix comments --- src/model-proxy/config/model-proxy.yaml | 3 +++ src/model-proxy/config/model_proxy.py | 3 +++ src/model-proxy/deploy/delete.sh | 18 ++---------------- .../deploy/model-proxy.yaml.template | 18 ++---------------- src/model-proxy/deploy/refresh.sh | 19 ++----------------- src/model-proxy/deploy/service.yaml | 18 ++---------------- src/model-proxy/deploy/start.sh | 18 ++---------------- src/model-proxy/deploy/stop.sh | 18 ++---------------- src/model-proxy/src/README.md | 6 +++--- src/model-proxy/src/main.go | 3 +++ src/model-proxy/src/proxy/authenticator.go | 3 +++ src/model-proxy/src/proxy/load_balancer.go | 3 +++ src/model-proxy/src/proxy/model_server.go | 6 ++++-- src/model-proxy/src/proxy/proxy.go | 3 +++ src/model-proxy/src/proxy/utils.go | 3 +++ src/model-proxy/src/trace/trace.go | 3 +++ src/model-proxy/src/types/config_types.go | 3 +++ .../src/types/request_response_types.go | 3 +++ .../pylon-config/location.conf.template | 6 +++--- 19 files changed, 52 insertions(+), 105 deletions(-) diff --git a/src/model-proxy/config/model-proxy.yaml b/src/model-proxy/config/model-proxy.yaml index fa519038..443e0967 100644 --- a/src/model-proxy/config/model-proxy.yaml +++ b/src/model-proxy/config/model-proxy.yaml @@ -1,3 +1,6 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + service_type: "common" port: 9999 diff --git a/src/model-proxy/config/model_proxy.py b/src/model-proxy/config/model_proxy.py index 2eb988d9..3549c705 100644 --- a/src/model-proxy/config/model_proxy.py +++ b/src/model-proxy/config/model_proxy.py @@ -1,3 +1,6 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + import copy class ModelProxy(object): diff --git a/src/model-proxy/deploy/delete.sh b/src/model-proxy/deploy/delete.sh index 1b952d11..fd6a77ef 100644 --- a/src/model-proxy/deploy/delete.sh +++ b/src/model-proxy/deploy/delete.sh @@ -1,21 +1,7 @@ #!/bin/bash -# Copyright (c) Microsoft Corporation -# All rights reserved. -# -# MIT License -# -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated -# documentation files (the "Software"), to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and -# to permit persons to whom the Software is furnished to do so, subject to the following conditions: -# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING -# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. pushd $(dirname "$0") > /dev/null diff --git a/src/model-proxy/deploy/model-proxy.yaml.template b/src/model-proxy/deploy/model-proxy.yaml.template index 52cdfa4c..1561f126 100644 --- a/src/model-proxy/deploy/model-proxy.yaml.template +++ b/src/model-proxy/deploy/model-proxy.yaml.template @@ -1,19 +1,5 @@ -# Copyright (c) Microsoft Corporation -# All rights reserved. -# -# MIT License -# -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated -# documentation files (the "Software"), to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and -# to permit persons to whom the Software is furnished to do so, subject to the following conditions: -# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING -# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. kind: DaemonSet apiVersion: apps/v1 diff --git a/src/model-proxy/deploy/refresh.sh b/src/model-proxy/deploy/refresh.sh index 3b6d2ca1..bd50ad3d 100644 --- a/src/model-proxy/deploy/refresh.sh +++ b/src/model-proxy/deploy/refresh.sh @@ -1,22 +1,7 @@ #!/bin/bash -# Copyright (c) Microsoft Corporation -# All rights reserved. -# -# MIT License -# -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated -# documentation files (the "Software"), to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and -# to permit persons to whom the Software is furnished to do so, subject to the following conditions: -# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING -# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. pushd $(dirname "$0") > /dev/null diff --git a/src/model-proxy/deploy/service.yaml b/src/model-proxy/deploy/service.yaml index a06e63d3..a2fe7ea9 100644 --- a/src/model-proxy/deploy/service.yaml +++ b/src/model-proxy/deploy/service.yaml @@ -1,19 +1,5 @@ -# Copyright (c) Microsoft Corporation -# All rights reserved. -# -# MIT License -# -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated -# documentation files (the "Software"), to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and -# to permit persons to whom the Software is furnished to do so, subject to the following conditions: -# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING -# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. cluster-type: - yarn diff --git a/src/model-proxy/deploy/start.sh b/src/model-proxy/deploy/start.sh index 8747ed46..27720c02 100644 --- a/src/model-proxy/deploy/start.sh +++ b/src/model-proxy/deploy/start.sh @@ -1,21 +1,7 @@ #!/bin/bash -# Copyright (c) Microsoft Corporation -# All rights reserved. -# -# MIT License -# -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated -# documentation files (the "Software"), to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and -# to permit persons to whom the Software is furnished to do so, subject to the following conditions: -# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING -# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. pushd $(dirname "$0") > /dev/null diff --git a/src/model-proxy/deploy/stop.sh b/src/model-proxy/deploy/stop.sh index 3abbaec1..9a19435e 100644 --- a/src/model-proxy/deploy/stop.sh +++ b/src/model-proxy/deploy/stop.sh @@ -1,20 +1,6 @@ #!/bin/bash -# Copyright (c) Microsoft Corporation -# All rights reserved. -# -# MIT License -# -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated -# documentation files (the "Software"), to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and -# to permit persons to whom the Software is furnished to do so, subject to the following conditions: -# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING -# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. kubectl delete --ignore-not-found --now "daemonset/model-proxy-ds" diff --git a/src/model-proxy/src/README.md b/src/model-proxy/src/README.md index bf970706..00865d2b 100644 --- a/src/model-proxy/src/README.md +++ b/src/model-proxy/src/README.md @@ -2,7 +2,7 @@ ## Overview -Model-proxy is a proxy service to forward requests from clients to different model jobs in LTP cluster. With one base url, client can access different models by specifying different model name in the request path, `model-proxy` service will forward the request to corresponding model job. If there is multiple jobs which are serving the same model, `model-proxy` will do load balancing among these jobs. +Model-proxy is a proxy service to forward requests from clients to different model jobs in LTP cluster. With one base url, client can access different models by specifying different model name in the request path, `model-proxy` service will forward the request to corresponding model job. If there are multiple jobs which are serving the same model, `model-proxy` will do load balancing among these jobs. Workflow: @@ -10,13 +10,13 @@ Workflow: - During the list request, `model-proxy` will query LTP REST server to get all model serving jobs which the user can access, and then list all models which are being served by these jobs. 2. Client sends request to `model-proxy` service to access a specific model by openai sepc api request format, e.g. `POST /v1/chat/completions` with request body containing model name. - - During the access request, `model-proxy` will query LTP REST server to get all model serving jobs which are serving the requested model, and then forward the request to one of these jobs. If there is multiple jobs, `model-proxy` will do load balancing among these jobs. + - During the access request, `model-proxy` will query LTP REST server to get all model serving jobs which are serving the requested model, and then forward the request to one of these jobs. If there are multiple jobs, `model-proxy` will do load balancing among these jobs. ## Configuration ### Requirements -- LTP model serving jobs should be deployed in the LTP cluster, and these jobs should have the job tag `model-serving`. +- LTP model serving jobs should be deployed in the LTP cluster, and names of these jobs must include `model-serving`. - LTP model serving jobs should support openai spec api, e.g. `/v1/chat/completions` endpoint. And the api key which is configured in model-proxy service should be supported by these endpoints. diff --git a/src/model-proxy/src/main.go b/src/model-proxy/src/main.go index 65cc1c05..9a921296 100644 --- a/src/model-proxy/src/main.go +++ b/src/model-proxy/src/main.go @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + package main import ( diff --git a/src/model-proxy/src/proxy/authenticator.go b/src/model-proxy/src/proxy/authenticator.go index eeaac95d..b57b1006 100644 --- a/src/model-proxy/src/proxy/authenticator.go +++ b/src/model-proxy/src/proxy/authenticator.go @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + package proxy import ( diff --git a/src/model-proxy/src/proxy/load_balancer.go b/src/model-proxy/src/proxy/load_balancer.go index 7707a477..789686d9 100644 --- a/src/model-proxy/src/proxy/load_balancer.go +++ b/src/model-proxy/src/proxy/load_balancer.go @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + package proxy import ( diff --git a/src/model-proxy/src/proxy/model_server.go b/src/model-proxy/src/proxy/model_server.go index 8d529fe6..f52a30ef 100644 --- a/src/model-proxy/src/proxy/model_server.go +++ b/src/model-proxy/src/proxy/model_server.go @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + package proxy import ( @@ -46,9 +49,8 @@ func ListModelServingJobs(restServerUrl string, restServerToken string) ([]strin return nil, fmt.Errorf("failed to read jobs response: %w", err) } - // Expected: array of jobs with fields { debugId, name, username, state } + // Expected: array of jobs with fields { name, username, state } var jobs []struct { - DebugId string `json:"debugId"` Name string `json:"name"` Username string `json:"username"` State string `json:"state"` diff --git a/src/model-proxy/src/proxy/proxy.go b/src/model-proxy/src/proxy/proxy.go index 82f28294..ce22dbca 100644 --- a/src/model-proxy/src/proxy/proxy.go +++ b/src/model-proxy/src/proxy/proxy.go @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + package proxy import ( diff --git a/src/model-proxy/src/proxy/utils.go b/src/model-proxy/src/proxy/utils.go index 8b83706c..a22d8f33 100644 --- a/src/model-proxy/src/proxy/utils.go +++ b/src/model-proxy/src/proxy/utils.go @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + package proxy import ( diff --git a/src/model-proxy/src/trace/trace.go b/src/model-proxy/src/trace/trace.go index 4a605e9d..458dff9e 100644 --- a/src/model-proxy/src/trace/trace.go +++ b/src/model-proxy/src/trace/trace.go @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + package trace import ( diff --git a/src/model-proxy/src/types/config_types.go b/src/model-proxy/src/types/config_types.go index 3ca2733c..cb05f5e5 100644 --- a/src/model-proxy/src/types/config_types.go +++ b/src/model-proxy/src/types/config_types.go @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + package types import ( diff --git a/src/model-proxy/src/types/request_response_types.go b/src/model-proxy/src/types/request_response_types.go index 1a560566..3ad970a8 100644 --- a/src/model-proxy/src/types/request_response_types.go +++ b/src/model-proxy/src/types/request_response_types.go @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + package types import ( diff --git a/src/pylon/deploy/pylon-config/location.conf.template b/src/pylon/deploy/pylon-config/location.conf.template index 5d70dff2..02b9e940 100644 --- a/src/pylon/deploy/pylon-config/location.conf.template +++ b/src/pylon/deploy/pylon-config/location.conf.template @@ -66,9 +66,9 @@ location ~ ^/copilot/api/operation(.*)$ { # Model proxy backend location ~ ^/model-proxy/(.*)$ { proxy_pass {{MODEL_PROXY_URI}}/$1$is_args$args; - proxy_connect_timeout 2m; - proxy_read_timeout 2m; - proxy_send_timeout 2m; + proxy_connect_timeout 60m; + proxy_read_timeout 60m; + proxy_send_timeout 60m; } # From 21a61d251ce3b58296ca134aa48a2cbfcf98b747 Mon Sep 17 00:00:00 2001 From: Zhongxin Guo Date: Wed, 10 Sep 2025 21:12:06 -0700 Subject: [PATCH 09/12] add healthz --- src/model-proxy/deploy/model-proxy.yaml.template | 10 ++++++++++ src/model-proxy/src/proxy/proxy.go | 10 ++++++++++ 2 files changed, 20 insertions(+) diff --git a/src/model-proxy/deploy/model-proxy.yaml.template b/src/model-proxy/deploy/model-proxy.yaml.template index 1561f126..f54e67ab 100644 --- a/src/model-proxy/deploy/model-proxy.yaml.template +++ b/src/model-proxy/deploy/model-proxy.yaml.template @@ -37,6 +37,16 @@ spec: - name: model-proxy-log mountPath: /usr/local/ltp/model-proxy {%- endif %} + ports: + - containerPort: {{ cluster_cfg["model-proxy"]["port"] }} + hostPort: {{ cluster_cfg["model-proxy"]["port"] }} + name: model-proxy + livenessProbe: + httpGet: + path: '/healthz' + port: model-proxy + initialDelaySeconds: 10 + periodSeconds: 60 volumes: - name: model-proxy-log hostPath: diff --git a/src/model-proxy/src/proxy/proxy.go b/src/model-proxy/src/proxy/proxy.go index ce22dbca..b1f50164 100644 --- a/src/model-proxy/src/proxy/proxy.go +++ b/src/model-proxy/src/proxy/proxy.go @@ -75,6 +75,16 @@ func NewProxyHandler(config *types.Config) *ProxyHandler { // ReverseProxyHandler act as a reverse proxy, it will redirect the request to the destination website and return the response func (ph *ProxyHandler) ReverseProxyHandler(w http.ResponseWriter, r *http.Request) (string, []string, bool) { + + // handle /healthz + if r.URL.Path == "/healthz" { + w.WriteHeader(http.StatusOK) + if _, err := w.Write([]byte("ok")); err != nil { + log.Printf("[-] Error: failed to write healthz response: %v\n", err) + } + return "", nil, false + } + // handle /v1/models if r.URL.Path == "/v1/models" { model2Url, err := GetJobModelsMapping(r, ph.authenticator.modelKey) From 30b98b4f3eebcbd8f784308e46914408716b032e Mon Sep 17 00:00:00 2001 From: Zhongxin Guo Date: Wed, 10 Sep 2025 21:16:16 -0700 Subject: [PATCH 10/12] remove unused field --- src/model-proxy/src/proxy/model_server.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/model-proxy/src/proxy/model_server.go b/src/model-proxy/src/proxy/model_server.go index f52a30ef..a2ff5027 100644 --- a/src/model-proxy/src/proxy/model_server.go +++ b/src/model-proxy/src/proxy/model_server.go @@ -49,11 +49,10 @@ func ListModelServingJobs(restServerUrl string, restServerToken string) ([]strin return nil, fmt.Errorf("failed to read jobs response: %w", err) } - // Expected: array of jobs with fields { name, username, state } + // Expected: array of jobs with fields { name, username } var jobs []struct { Name string `json:"name"` Username string `json:"username"` - State string `json:"state"` } if err := json.Unmarshal(body, &jobs); err != nil { // Try to provide a helpful error if the shape is unexpected From f152d8bf7752fe5be1567e55a9ab74836e0eaf19 Mon Sep 17 00:00:00 2001 From: Zhongxin Guo Date: Thu, 11 Sep 2025 01:43:42 -0700 Subject: [PATCH 11/12] update readme --- src/model-proxy/src/README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/model-proxy/src/README.md b/src/model-proxy/src/README.md index 00865d2b..cc4974bf 100644 --- a/src/model-proxy/src/README.md +++ b/src/model-proxy/src/README.md @@ -18,7 +18,9 @@ Workflow: - LTP model serving jobs should be deployed in the LTP cluster, and names of these jobs must include `model-serving`. -- LTP model serving jobs should support openai spec api, e.g. `/v1/chat/completions` endpoint. And the api key which is configured in model-proxy service should be supported by these endpoints. +- LTP model serving jobs should support openai spec api, e.g. `/v1/chat/completions` endpoint. + * The api key which is configured in model-proxy service should be supported by these endpoints. + * The endpoints should use the first taskrole's ip and port. ### Binary configuration From f18bcb4fe6375fb0a9d2146059e8f4d56aac1ebb Mon Sep 17 00:00:00 2001 From: Zhongxin Guo Date: Sat, 13 Sep 2025 05:08:28 -0700 Subject: [PATCH 12/12] update readme --- src/model-proxy/src/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/model-proxy/src/README.md b/src/model-proxy/src/README.md index cc4974bf..36ca0387 100644 --- a/src/model-proxy/src/README.md +++ b/src/model-proxy/src/README.md @@ -19,8 +19,8 @@ Workflow: - LTP model serving jobs should be deployed in the LTP cluster, and names of these jobs must include `model-serving`. - LTP model serving jobs should support openai spec api, e.g. `/v1/chat/completions` endpoint. - * The api key which is configured in model-proxy service should be supported by these endpoints. * The endpoints should use the first taskrole's ip and port. + * The api key which is configured in model-proxy service should be supported by these endpoints. So if users want to make their model serving jobs accessible by model-proxy, they need to configure the same api key in their jobs, which will be provided by the cluster admin. ### Binary configuration