From e7a7cac8bfbd45b836c3d9f865aa7eff50ea9a99 Mon Sep 17 00:00:00 2001 From: justinsb Date: Mon, 8 Sep 2025 12:53:47 -0400 Subject: [PATCH] AI inference: demonstrate in-cluster storage of models This example demonstrates how we can serve models from inside the cluster, without needing to bake them into the container images. We may also in future want to support storing models in GCS or S3, but this example focuses on storing models without cloud dependencies. We may also want to investigate serving models from container images, particularly given the upcoming support for mounting container images as volumes, but this approach works today and allows for more dynamic model loading (e.g. loading new models without restarting pods). Moreover, a container image server is backed by a blob server, as introduced here. --- AI/modelcloud/README.md | 148 +++++++++++++ AI/modelcloud/cmd/blob-server/main.go | 196 ++++++++++++++++++ AI/modelcloud/cmd/model-hasher/main.go | 150 ++++++++++++++ AI/modelcloud/cmd/vllm-frontend/main.go | 151 ++++++++++++++ .../cmd/vllm-frontend/models/embed.go | 40 ++++ .../models/google/gemma-3-1b-it/model.yaml | 22 ++ AI/modelcloud/dev/tools/build-images | 51 +++++ AI/modelcloud/dev/tools/deploy-to-kube | 86 ++++++++ AI/modelcloud/dev/tools/push-images | 54 +++++ AI/modelcloud/dev/tools/shared/utils.py | 54 +++++ AI/modelcloud/go.mod | 21 ++ AI/modelcloud/go.sum | 41 ++++ AI/modelcloud/images/blob-server/Dockerfile | 32 +++ AI/modelcloud/images/vllm-frontend/Dockerfile | 39 ++++ AI/modelcloud/k8s/blob-server/manifest.yaml | 57 +++++ AI/modelcloud/k8s/gemma3/manifest.yaml | 57 +++++ AI/modelcloud/pkg/api/model.go | 28 +++ AI/modelcloud/pkg/blobs/blobserver.go | 153 ++++++++++++++ AI/modelcloud/pkg/blobs/interfaces.go | 39 ++++ AI/modelcloud/pkg/blobs/localblobstore.go | 136 ++++++++++++ 20 files changed, 1555 insertions(+) create mode 100644 AI/modelcloud/README.md create mode 100644 AI/modelcloud/cmd/blob-server/main.go create mode 100644 AI/modelcloud/cmd/model-hasher/main.go create mode 100644 AI/modelcloud/cmd/vllm-frontend/main.go create mode 100644 AI/modelcloud/cmd/vllm-frontend/models/embed.go create mode 100644 AI/modelcloud/cmd/vllm-frontend/models/google/gemma-3-1b-it/model.yaml create mode 100755 AI/modelcloud/dev/tools/build-images create mode 100755 AI/modelcloud/dev/tools/deploy-to-kube create mode 100755 AI/modelcloud/dev/tools/push-images create mode 100644 AI/modelcloud/dev/tools/shared/utils.py create mode 100644 AI/modelcloud/go.mod create mode 100644 AI/modelcloud/go.sum create mode 100644 AI/modelcloud/images/blob-server/Dockerfile create mode 100644 AI/modelcloud/images/vllm-frontend/Dockerfile create mode 100644 AI/modelcloud/k8s/blob-server/manifest.yaml create mode 100644 AI/modelcloud/k8s/gemma3/manifest.yaml create mode 100644 AI/modelcloud/pkg/api/model.go create mode 100644 AI/modelcloud/pkg/blobs/blobserver.go create mode 100644 AI/modelcloud/pkg/blobs/interfaces.go create mode 100644 AI/modelcloud/pkg/blobs/localblobstore.go diff --git a/AI/modelcloud/README.md b/AI/modelcloud/README.md new file mode 100644 index 000000000..e430dda86 --- /dev/null +++ b/AI/modelcloud/README.md @@ -0,0 +1,148 @@ +# modelcloud example + +## Goal: Production-grade inference on AI-conformant kubernetes clusters + +This goal of this example is to build up production-grade inference +on AI-conformant kubernetes clusters. + +We (aspirationally) aim to demonstrate the capabilities of the AI-conformance +profile. Where we cannot achieve production-grade inference, we hope to +motivate discussion of extensions to the AI-conformance profile to plug those gaps. + +## Walkthrough + +### Deploying to a kubernetes cluster + +Create a kubernetes cluster, we currently test with GKE and gcr.io but do not aim +to depend on non-conformant functionality; PRs to add support for deployment +to other conformant environments are very welcome. + +1. From the modelcloud directory, run `dev/tools/push-images` to push to `gcr.io/$(gcloud config get project)/...` + +1. Run `dev/tools/deploy-to-kube` to deploy. + +We deploy two workloads: + +1. `blob-server`, a statefulset with a persistent volume to hold the model blobs (files) + +1. `gemma3`, a deployment running vLLM, with a frontend go process that will download the model from `blob-server`. + +### Uploading a model + +For now, we will likely be dealing with models from huggingface. + +Begin by cloning the model locally (and note that currently only google/gemma-3-1b-it is supported): + +``` +git clone https://huggingface.co/google/gemma-3-1b-it +``` + +If you now run `go run ./cmd/model-hasher --src gemma-3-1b-it` you should see it print the hashes for each file: + +``` +spec: + files: + - hash: 1468e03275c15d522c721bd987f03c4ee21b8e246b901803c67448bbc25acf58 + path: .gitattributes + - hash: 60be259533fe3acba21d109a51673815a4c29aefdb7769862695086fcedbeb7a + path: README.md + - hash: 50b2f405ba56a26d4913fd772089992252d7f942123cc0a034d96424221ba946 + path: added_tokens.json + - hash: 19cb5d28c97778271ba2b3c3df47bf76bdd6706724777a2318b3522230afe91e + path: config.json + - hash: fd9324becc53c4be610db39e13a613006f09fd6ef71a95fb6320dc33157490a3 + path: generation_config.json + - hash: 3d4ef8d71c14db7e448a09ebe891cfb6bf32c57a9b44499ae0d1c098e48516b6 + path: model.safetensors + - hash: 2f7b0adf4fb469770bb1490e3e35df87b1dc578246c5e7e6fc76ecf33213a397 + path: special_tokens_map.json + - hash: 4667f2089529e8e7657cfb6d1c19910ae71ff5f28aa7ab2ff2763330affad795 + path: tokenizer.json + - hash: 1299c11d7cf632ef3b4e11937501358ada021bbdf7c47638d13c0ee982f2e79c + path: tokenizer.model + - hash: bfe25c2735e395407beb78456ea9a6984a1f00d8c16fa04a8b75f2a614cf53e1 + path: tokenizer_config.json +``` + +Inside the vllm-frontend, this [list of files is currently embedded](cmd/vllm-frontend/models/google/gemma-3-1b-it/model.yaml). +This is why we only support gemma-3-1b-it today, though we plan to relax this in future (e.g. a CRD?) + +The blob-server accepts uploads, and we can upload the blobs using a port-forward: + +``` +kubectl port-forward blob-server-0 8081:8080 & +go run ./cmd/model-hasher/ --src gemma-3-1b-it/ --upload http://127.0.0.1:8081 +``` + +This will then store the blobs on the persistent disk of blob-server, so they are now available in-cluster, +you can verify this with `kubectl debug`: + +``` +> kubectl debug blob-server-0 -it --image=debian:latest --profile=general --share-processes --target blob-server +root@blob-server-0:/# cat /proc/1/mounts | grep blob +/dev/sdb /blobs ext4 rw,relatime 0 0 +root@blob-server-0:/# ls -l /proc/1/root/blobs/ +total 1991324 +-rw------- 1 root root 4689074 Sep 8 21:40 1299c11d7cf632ef3b4e11937501358ada021bbdf7c47638d13c0ee982f2e79c +-rw------- 1 root root 1676 Sep 8 21:39 1468e03275c15d522c721bd987f03c4ee21b8e246b901803c67448bbc25acf58 +-rw------- 1 root root 899 Sep 8 21:39 19cb5d28c97778271ba2b3c3df47bf76bdd6706724777a2318b3522230afe91e +-rw------- 1 root root 662 Sep 8 21:40 2f7b0adf4fb469770bb1490e3e35df87b1dc578246c5e7e6fc76ecf33213a397 +-rw------- 1 root root 1999811208 Sep 8 21:40 3d4ef8d71c14db7e448a09ebe891cfb6bf32c57a9b44499ae0d1c098e48516b6 +-rw------- 1 root root 33384568 Sep 8 21:40 4667f2089529e8e7657cfb6d1c19910ae71ff5f28aa7ab2ff2763330affad795 +-rw------- 1 root root 35 Sep 8 21:39 50b2f405ba56a26d4913fd772089992252d7f942123cc0a034d96424221ba946 +-rw------- 1 root root 24265 Sep 8 21:39 60be259533fe3acba21d109a51673815a4c29aefdb7769862695086fcedbeb7a +-rw------- 1 root root 1156999 Sep 8 21:40 bfe25c2735e395407beb78456ea9a6984a1f00d8c16fa04a8b75f2a614cf53e1 +-rw------- 1 root root 215 Sep 8 21:39 fd9324becc53c4be610db39e13a613006f09fd6ef71a95fb6320dc33157490a3 +drwx------ 2 root root 16384 Sep 8 21:38 lost+found +``` + +## Using the inference server + +At this point the vLLM process should (hopefully) have downloaded the model and started. + +```bash +kubectl wait --for=condition=Available --timeout=10s deployment/gemma3 +kubectl get pods -l app=gemma3 +``` + +To check logs (particularly if this is not already ready) +```bash +kubectl logs -f -l app=gemma3 +``` + + +## Verification / Seeing it Work + +Forward local requests to vLLM service: + +```bash +# Forward a local port (e.g., 8080) to the service port (e.g., 8080) +kubectl port-forward service/gemma3 8080:80 & +``` + +2. Send request to local forwarding port: + +```bash +curl -X POST http://localhost:8080/v1/chat/completions \ +-H "Content-Type: application/json" \ +-d '{ + "model": "google/gemma-3-1b-it", + "messages": [{"role": "user", "content": "Explain Quantum Computing in simple terms."}], + "max_tokens": 100 +}' +``` + +Expected output (or similar): + +```json +{"id":"chatcmpl-462b3e153fd34e5ca7f5f02f3bcb6b0c","object":"chat.completion","created":1753164476,"model":"google/gemma-3-1b-it","choices":[{"index":0,"message":{"role":"assistant","reasoning_content":null,"content":"Okay, let’s break down quantum computing in a way that’s hopefully understandable without getting lost in too much jargon. Here's the gist:\n\n**1. Classical Computers vs. Quantum Computers:**\n\n* **Classical Computers:** These are the computers you use every day – laptops, phones, servers. They store information as *bits*. A bit is like a light switch: it's either on (1) or off (0). Everything a classical computer does – from playing games","tool_calls":[]},"logprobs":null,"finish_reason":"length","stop_reason":null}],"usage":{"prompt_tokens":16,"total_tokens":116,"completion_tokens":100,"prompt_tokens_details":null},"prompt_logprobs":null} +``` + +--- + +## Cleanup + +```bash +kubectl delete deployment gemma3 +kubectl delete statefulset blob-server +``` diff --git a/AI/modelcloud/cmd/blob-server/main.go b/AI/modelcloud/cmd/blob-server/main.go new file mode 100644 index 000000000..8efe9ad88 --- /dev/null +++ b/AI/modelcloud/cmd/blob-server/main.go @@ -0,0 +1,196 @@ +// Copyright 2025 The Kubernetes Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "strings" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/examples/AI/modelcloud/pkg/blobs" + "k8s.io/klog/v2" +) + +func main() { + if err := run(context.Background()); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } +} + +func run(ctx context.Context) error { + log := klog.FromContext(ctx) + + listen := ":8080" + cacheDir := os.Getenv("CACHE_DIR") + if cacheDir == "" { + // We expect CACHE_DIR to be set when running on kubernetes, but default sensibly for local dev + cacheDir = "~/.cache/blob-server/blobs" + } + flag.StringVar(&listen, "listen", listen, "listen address") + flag.StringVar(&cacheDir, "cache-dir", cacheDir, "cache directory") + flag.Parse() + + if strings.HasPrefix(cacheDir, "~/") { + homeDir, err := os.UserHomeDir() + if err != nil { + return fmt.Errorf("getting home directory: %w", err) + } + cacheDir = filepath.Join(homeDir, strings.TrimPrefix(cacheDir, "~/")) + } + + if err := os.MkdirAll(cacheDir, 0755); err != nil { + return fmt.Errorf("creating cache directory %q: %w", cacheDir, err) + } + + blobStore := &blobs.LocalBlobStore{ + LocalDir: cacheDir, + } + + blobCache := &blobCache{ + CacheDir: cacheDir, + blobStore: blobStore, + } + + s := &httpServer{ + blobCache: blobCache, + tmpDir: filepath.Join(cacheDir, "tmp"), + } + + log.Info("serving http", "endpoint", listen) + if err := http.ListenAndServe(listen, s); err != nil { + return fmt.Errorf("serving on %q: %w", listen, err) + } + + return nil +} + +type httpServer struct { + blobCache *blobCache + tmpDir string +} + +func (s *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + tokens := strings.Split(strings.TrimPrefix(r.URL.Path, "/"), "/") + if len(tokens) == 1 { + if r.Method == "GET" { + hash := tokens[0] + s.serveGETBlob(w, r, hash) + return + } + if r.Method == "PUT" { + hash := tokens[0] + s.servePUTBlob(w, r, hash) + return + } + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + http.Error(w, "not found", http.StatusNotFound) +} + +func (s *httpServer) serveGETBlob(w http.ResponseWriter, r *http.Request, hash string) { + ctx := r.Context() + + log := klog.FromContext(ctx) + + // TODO: Validate hash is hex, right length etc + + f, err := s.blobCache.GetBlob(ctx, hash) + if err != nil { + if status.Code(err) == codes.NotFound { + log.Info("blob not found", "hash", hash) + http.Error(w, "not found", http.StatusNotFound) + return + } + log.Error(err, "error getting blob") + http.Error(w, "internal server error", http.StatusInternalServerError) + return + } + defer f.Close() + p := f.Name() + + log.Info("serving blob", "hash", hash, "path", p) + http.ServeFile(w, r, p) +} + +func (s *httpServer) servePUTBlob(w http.ResponseWriter, r *http.Request, hash string) { + ctx := r.Context() + + log := klog.FromContext(ctx) + + // TODO: Download to temp file first? + + if err := s.blobCache.PutBlob(ctx, hash, r.Body); err != nil { + log.Error(err, "error stoing blob") + http.Error(w, "internal server error", http.StatusInternalServerError) + return + } + + log.Info("uploaded blob", "hash", hash) + + w.WriteHeader(http.StatusCreated) +} + +type blobCache struct { + CacheDir string + blobStore blobs.BlobStore +} + +func (c *blobCache) GetBlob(ctx context.Context, hash string) (*os.File, error) { + log := klog.FromContext(ctx) + + localPath := filepath.Join(c.CacheDir, hash) + f, err := os.Open(localPath) + if err == nil { + return f, nil + } else if !os.IsNotExist(err) { + return nil, fmt.Errorf("opening blob %q: %w", hash, err) + } + + log.Info("blob not found in cache, downloading", "hash", hash) + + err = c.blobStore.Download(ctx, blobs.BlobInfo{Hash: hash}, localPath) + if err == nil { + f, err := os.Open(localPath) + if err != nil { + return nil, fmt.Errorf("opening blob %q after download: %w", hash, err) + } + return f, nil + } + + return nil, err +} + +func (c *blobCache) PutBlob(ctx context.Context, hash string, r io.Reader) error { + log := klog.FromContext(ctx) + + if err := c.blobStore.Upload(ctx, r, blobs.BlobInfo{Hash: hash}); err != nil { + log.Error(err, "error uploading blob") + return fmt.Errorf("uploading blob %q: %w", hash, err) + } + + // TODO: Side-load into local cache too? + + return nil +} diff --git a/AI/modelcloud/cmd/model-hasher/main.go b/AI/modelcloud/cmd/model-hasher/main.go new file mode 100644 index 000000000..c545af7fb --- /dev/null +++ b/AI/modelcloud/cmd/model-hasher/main.go @@ -0,0 +1,150 @@ +// Copyright 2025 The Kubernetes Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "crypto/sha256" + "flag" + "fmt" + "io" + "io/fs" + "net/url" + "os" + "path/filepath" + "strings" + + "k8s.io/examples/AI/modelcloud/pkg/api" + "k8s.io/examples/AI/modelcloud/pkg/blobs" + + "sigs.k8s.io/yaml" + + "k8s.io/klog/v2" +) + +func main() { + if err := run(context.Background()); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } +} + +func run(ctx context.Context) error { + log := klog.FromContext(ctx) + + srcDir := "" + flag.StringVar(&srcDir, "src", srcDir, "Directory to scan for model files") + + upload := "" + flag.StringVar(&upload, "upload", upload, "If set, the URL to upload to") + + flag.Parse() + + if srcDir == "" { + return fmt.Errorf("--src is required") + } + + model := &api.Model{} + + // Walk all the files in the directory, and print their SHA256 hashes + err := filepath.WalkDir(srcDir, func(path string, dent fs.DirEntry, err error) error { + if err != nil { + return err + } + relativePath, err := filepath.Rel(srcDir, path) + if err != nil { + return fmt.Errorf("getting relative path for %q: %w", path, err) + } + if dent.IsDir() { + if relativePath == ".git" { + return filepath.SkipDir + } + return nil + } + + hash, err := fileHash(path) + if err != nil { + log.Error(err, "error hashing file", "path", path) + return nil + } + + model.Spec.Files = append(model.Spec.Files, api.ModelFile{ + Path: relativePath, + Hash: hash, + }) + return nil + }) + if err != nil { + return fmt.Errorf("walking directory %q: %w", srcDir, err) + } + + y, err := yaml.Marshal(model) + if err != nil { + return fmt.Errorf("marshalling model to YAML: %w", err) + } + fmt.Println(string(y)) + + if upload != "" { + var blobstore blobs.BlobStore + + if strings.HasPrefix(upload, "http://") || strings.HasPrefix(upload, "https://") { + u, err := url.Parse(upload) + if err != nil { + return fmt.Errorf("parsing upload URL %q: %w", upload, err) + } + log.Info("using blob server", "url", u.String()) + blobstore = &blobs.BlobServer{ + URL: u, + } + } else { + return fmt.Errorf("upload must be a URL (https:///), got %q", upload) + } + + for _, file := range model.Spec.Files { + info := blobs.BlobInfo{ + Hash: file.Hash, + } + srcPath := filepath.Join(srcDir, file.Path) + r, err := os.Open(srcPath) + if err != nil { + return fmt.Errorf("opening file %q: %w", srcPath, err) + } + defer r.Close() + + if err := blobstore.Upload(ctx, r, info); err != nil { + return fmt.Errorf("uploading file %q: %w", file.Path, err) + } + log.Info("uploaded file", "path", file.Path, "hash", file.Hash) + } + } + + return nil +} + +func fileHash(p string) (string, error) { + f, err := os.Open(p) + if err != nil { + return "", fmt.Errorf("opening file %q: %w", p, err) + } + defer f.Close() + + hasher := sha256.New() + if _, err := io.Copy(hasher, f); err != nil { + return "", fmt.Errorf("hashing file %q: %w", p, err) + } + + hash := fmt.Sprintf("%x", hasher.Sum(nil)) + return hash, nil +} diff --git a/AI/modelcloud/cmd/vllm-frontend/main.go b/AI/modelcloud/cmd/vllm-frontend/main.go new file mode 100644 index 000000000..9780f57f6 --- /dev/null +++ b/AI/modelcloud/cmd/vllm-frontend/main.go @@ -0,0 +1,151 @@ +// Copyright 2025 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law of agetd in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + "fmt" + "net/url" + "os" + "os/exec" + "path/filepath" + "time" + + models "k8s.io/examples/AI/modelcloud/cmd/vllm-frontend/models" + + "k8s.io/examples/AI/modelcloud/pkg/blobs" + "k8s.io/klog/v2" +) + +func main() { + ctx := context.Background() + if err := run(ctx); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } +} + +func run(ctx context.Context) error { + log := klog.FromContext(ctx) + + llmModelID := os.Getenv("MODEL_NAME") + flag.StringVar(&llmModelID, "model-name", llmModelID, "identifier of model.") + + blobserverFlag := os.Getenv("BLOBSERVER") + if blobserverFlag == "" { + blobserverFlag = "http://blob-server" + } + flag.StringVar(&blobserverFlag, "blobserver", blobserverFlag, "base url to blobserver") + + klog.InitFlags(nil) + + flag.Parse() + + llmModel, err := models.LoadModel(llmModelID) + if err != nil { + return fmt.Errorf("loading model %q: %w", llmModelID, err) + } + + blobserverURL, err := url.Parse(blobserverFlag) + if err != nil { + return fmt.Errorf("parsing blobserver url %q: %w", blobserverFlag, err) + } + blobStore := &blobs.BlobServer{ + URL: blobserverURL, + } + + loader := &ModelLoader{ + reader: blobStore, + maxDownloadAttempts: 5, + } + modelDir := filepath.Join(os.TempDir(), "model", llmModelID) + if err := os.MkdirAll(modelDir, 0755); err != nil { + return fmt.Errorf("creating temp directory %q: %w", modelDir, err) + } + log.Info("downloading model to", "dir", modelDir) + + for _, file := range llmModel.Spec.Files { + localPath := filepath.Join(modelDir, file.Path) + + info := blobs.BlobInfo{ + Hash: file.Hash, + } + if err := loader.downloadToFile(ctx, info, localPath); err != nil { + return fmt.Errorf("downloading model: %w", err) + } + log.Info("downloaded model file", "path", file.Path) + } + + var vllmArgs []string + + baseArgs := []string{ + "python3", "-m", "vllm.entrypoints.openai.api_server", + "--host=0.0.0.0", + "--port=8080", + "--model=" + modelDir, + "--served_model_name=" + llmModelID, + } + + vllmArgs = append(vllmArgs, baseArgs...) + vllmArgs = append(vllmArgs, flag.Args()...) + + log.Info("starting vllm", "args", vllmArgs) + + cmd := exec.CommandContext(ctx, vllmArgs[0], vllmArgs[1:]...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + env := os.Environ() + cmd.Env = env + + if err := cmd.Start(); err != nil { + return fmt.Errorf("starting vllm: %w", err) + } + + if err := cmd.Wait(); err != nil { + return fmt.Errorf("vllm exited with error: %w", err) + } + return nil +} + +type ModelLoader struct { + // reader is the interface to fetch blobs + reader blobs.BlobReader + + // maxDownloadAttempts is the number of times to attempt a download before failing + maxDownloadAttempts int +} + +func (l *ModelLoader) downloadToFile(ctx context.Context, info blobs.BlobInfo, destPath string) error { + log := klog.FromContext(ctx) + + attempt := 0 + for { + attempt++ + + err := l.reader.Download(ctx, info, destPath) + if err == nil { + return nil + } + + if attempt >= l.maxDownloadAttempts { + return err + } + + log.Error(err, "downloading blob, will retry", "info", info, "attempt", attempt) + time.Sleep(5 * time.Second) + } +} diff --git a/AI/modelcloud/cmd/vllm-frontend/models/embed.go b/AI/modelcloud/cmd/vllm-frontend/models/embed.go new file mode 100644 index 000000000..4c99f8e5c --- /dev/null +++ b/AI/modelcloud/cmd/vllm-frontend/models/embed.go @@ -0,0 +1,40 @@ +// Copyright 2025 The Kubernetes Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package models + +import ( + "embed" + "fmt" + + api "k8s.io/examples/AI/modelcloud/pkg/api" + + "sigs.k8s.io/yaml" +) + +//go:embed */*/*.yaml +var embedModels embed.FS + +func LoadModel(modelID string) (*api.Model, error) { + b, err := embedModels.ReadFile(modelID + "/model.yaml") + if err != nil { + return nil, fmt.Errorf("reading embedded model %q: %w", modelID, err) + } + + model := &api.Model{} + if err := yaml.Unmarshal(b, model); err != nil { + return nil, fmt.Errorf("parsing embedded model %q: %w", modelID, err) + } + return model, nil +} diff --git a/AI/modelcloud/cmd/vllm-frontend/models/google/gemma-3-1b-it/model.yaml b/AI/modelcloud/cmd/vllm-frontend/models/google/gemma-3-1b-it/model.yaml new file mode 100644 index 000000000..a8b0a7d9b --- /dev/null +++ b/AI/modelcloud/cmd/vllm-frontend/models/google/gemma-3-1b-it/model.yaml @@ -0,0 +1,22 @@ +spec: + files: + - hash: 1468e03275c15d522c721bd987f03c4ee21b8e246b901803c67448bbc25acf58 + path: .gitattributes + - hash: 60be259533fe3acba21d109a51673815a4c29aefdb7769862695086fcedbeb7a + path: README.md + - hash: 50b2f405ba56a26d4913fd772089992252d7f942123cc0a034d96424221ba946 + path: added_tokens.json + - hash: 19cb5d28c97778271ba2b3c3df47bf76bdd6706724777a2318b3522230afe91e + path: config.json + - hash: fd9324becc53c4be610db39e13a613006f09fd6ef71a95fb6320dc33157490a3 + path: generation_config.json + - hash: 3d4ef8d71c14db7e448a09ebe891cfb6bf32c57a9b44499ae0d1c098e48516b6 + path: model.safetensors + - hash: 2f7b0adf4fb469770bb1490e3e35df87b1dc578246c5e7e6fc76ecf33213a397 + path: special_tokens_map.json + - hash: 4667f2089529e8e7657cfb6d1c19910ae71ff5f28aa7ab2ff2763330affad795 + path: tokenizer.json + - hash: 1299c11d7cf632ef3b4e11937501358ada021bbdf7c47638d13c0ee982f2e79c + path: tokenizer.model + - hash: bfe25c2735e395407beb78456ea9a6984a1f00d8c16fa04a8b75f2a614cf53e1 + path: tokenizer_config.json diff --git a/AI/modelcloud/dev/tools/build-images b/AI/modelcloud/dev/tools/build-images new file mode 100755 index 000000000..4ea7dff3f --- /dev/null +++ b/AI/modelcloud/dev/tools/build-images @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +# Copyright 2025 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import subprocess +import sys + +# Add shared directory to Python path +sys.path.append(os.path.join(os.path.dirname(__file__), "shared")) +import utils + +def main(): + """Builds docker images.""" + srcdir = utils.find_srcdir() + images_dir = os.path.join(srcdir, "images") + + if not os.path.isdir(images_dir): + print("images directory not found") + return + + for root, dirs, files in os.walk(images_dir): + for filename in files: + if filename != "Dockerfile": + continue + + dockerfile_path = os.path.join(root, filename) + dockerfile_rel_path = os.path.relpath(dockerfile_path, srcdir) + service_name = os.path.basename(root) + + image_name = utils.get_full_image_name(service_name) + + print(f"Building image for {service_name} with tag {image_name}") + + cmd = ["docker", "buildx", "build", "-f", dockerfile_rel_path, "-t", image_name, "."] + subprocess.run(cmd, cwd=srcdir, check=True) + +if __name__ == "__main__": + main() diff --git a/AI/modelcloud/dev/tools/deploy-to-kube b/AI/modelcloud/dev/tools/deploy-to-kube new file mode 100755 index 000000000..0cd13e056 --- /dev/null +++ b/AI/modelcloud/dev/tools/deploy-to-kube @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 +# Copyright 2025 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import subprocess +import sys +import io + +import yaml + +# Add shared directory to Python path +sys.path.append(os.path.join(os.path.dirname(__file__), "shared")) +import utils + +def find_and_replace_images(doc, service_name, image_prefix, image_tag): + """Recursively finds and replaces container images in a Kubernetes manifest.""" + if isinstance(doc, dict): + for k, v in doc.items(): + if k == "image": + # We only replace images that look like :latest + image_and_tag = v.split(":") + if len(image_and_tag) == 2 and image_and_tag[1] == "latest": + doc[k] = f"{image_prefix}{image_and_tag[0]}:{image_tag}" + else: + find_and_replace_images(v, service_name, image_prefix, image_tag) + elif isinstance(doc, list): + for item in doc: + find_and_replace_images(item, service_name, image_prefix, image_tag) + +def main(): + srcdir = utils.find_srcdir() + + manifests_path = os.path.join(srcdir, "k8s") + + if not os.path.isdir(manifests_path): + print("k8s directory not found") + return + + for root, dirs, files in os.walk(manifests_path): + for filename in files: + if not (filename.endswith(".yaml") or filename.endswith(".yml")): + continue + + path = os.path.join(root, filename) + + with open(path, "r") as f: + # Use safe_load_all for multi-document YAML files + docs = list(yaml.safe_load_all(f)) + + service_name = os.path.basename(root) + image_prefix = utils.get_image_prefix() + image_tag = utils.get_image_tag() + + # Process each document in the file + for doc in docs: + if not doc: # Skip empty documents + continue + + find_and_replace_images(doc, service_name, image_prefix, image_tag) + + # Dump the modified documents back to a string + string_stream = io.StringIO() + yaml.dump_all(docs, string_stream) + modified_content = string_stream.getvalue() + + rel_path = os.path.relpath(path, srcdir) + print(f"applying manifest {rel_path}") + + cmd = ["kubectl", "apply", "-f", "-"] + subprocess.run(cmd, cwd=srcdir, check=True, input=modified_content, text=True) + +if __name__ == "__main__": + main() diff --git a/AI/modelcloud/dev/tools/push-images b/AI/modelcloud/dev/tools/push-images new file mode 100755 index 000000000..9d0e4af31 --- /dev/null +++ b/AI/modelcloud/dev/tools/push-images @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +# Copyright 2025 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import subprocess +import sys + +# Add shared directory to Python path +sys.path.append(os.path.join(os.path.dirname(__file__), "shared")) +import utils + +def main(): + """Builds and pushes docker images.""" + srcdir = utils.find_srcdir() + images_dir = os.path.join(srcdir, "images") + + if not os.path.isdir(images_dir): + print("images directory not found") + return + + for root, dirs, files in os.walk(images_dir): + for filename in files: + if filename != "Dockerfile": + continue + + dockerfile_path = os.path.join(root, filename) + service_name = os.path.basename(root) + + image_name = utils.get_full_image_name(service_name) + + print(f"building image for {service_name} with tag {image_name}") + + build_cmd = ["docker", "buildx", "build", "--push", "-t", image_name, "-f", dockerfile_path, "."] + subprocess.run(build_cmd, cwd=srcdir, check=True) + + print(f"pushing image {image_name}") + push_cmd = ["docker", "push", image_name] + subprocess.run(push_cmd, check=True) + +if __name__ == "__main__": + main() diff --git a/AI/modelcloud/dev/tools/shared/utils.py b/AI/modelcloud/dev/tools/shared/utils.py new file mode 100644 index 000000000..6df2fae10 --- /dev/null +++ b/AI/modelcloud/dev/tools/shared/utils.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +# Copyright 2025 The Kubernetes Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import subprocess + +def find_srcdir(): + """Finds the project root directory by looking for go.mod.""" + p = os.path.dirname(os.path.abspath(__file__)) + while True: + # We go up two levels to get out of dev/tools/shared + p = os.path.dirname(os.path.dirname(os.path.dirname(p))) + if os.path.exists(os.path.join(p, "go.mod")): + return p + parent = os.path.dirname(p) + if parent == p: + raise Exception("could not find go.mod in any parent directory") + p = parent + +def get_gcp_project(): + """Gets the GCP project ID from gcloud.""" + return subprocess.check_output(["gcloud", "config", "get-value", "project"], text=True).strip() + +def get_git_commit_short(): + """Gets the short git commit hash for HEAD.""" + return subprocess.check_output(["git", "rev-parse", "--short", "HEAD"], text=True).strip() + +def get_image_tag(): + """Gets the image tag based on the git commit.""" + return f"git-{get_git_commit_short()}" + +def get_image_prefix(): + """Constructs the image prefix for a container image.""" + project_id = get_gcp_project() + return f"gcr.io/{project_id}/" + +def get_full_image_name(service_name): + """Constructs the full GCR image name for a service.""" + image_prefix = get_image_prefix() + tag = get_image_tag() + return f"{image_prefix}{service_name}:{tag}" diff --git a/AI/modelcloud/go.mod b/AI/modelcloud/go.mod new file mode 100644 index 000000000..9022df046 --- /dev/null +++ b/AI/modelcloud/go.mod @@ -0,0 +1,21 @@ +module k8s.io/examples/AI/modelcloud + +go 1.24.5 + +require ( + google.golang.org/grpc v1.74.2 + k8s.io/klog/v2 v2.130.1 + sigs.k8s.io/yaml v1.6.0 +) + +require ( + github.com/go-logr/logr v1.4.3 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/rogpeppe/go-internal v1.13.1 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect + golang.org/x/net v0.43.0 // indirect + golang.org/x/sys v0.35.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c // indirect + google.golang.org/protobuf v1.36.7 // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect +) diff --git a/AI/modelcloud/go.sum b/AI/modelcloud/go.sum new file mode 100644 index 000000000..a88b001a6 --- /dev/null +++ b/AI/modelcloud/go.sum @@ -0,0 +1,41 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= +go.yaml.in/yaml/v3 v3.0.3 h1:bXOww4E/J3f66rav3pX3m8w6jDE4knZjGOw8b5Y6iNE= +go.yaml.in/yaml/v3 v3.0.3/go.mod h1:tBHosrYAkRZjRAOREWbDnBXUf08JOwYq++0QNwQiWzI= +golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= +golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c h1:qXWI/sQtv5UKboZ/zUk7h+mrf/lXORyI+n9DKDAusdg= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c/go.mod h1:gw1tLEfykwDz2ET4a12jcXt4couGAm7IwsVaTy0Sflo= +google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4= +google.golang.org/grpc v1.74.2/go.mod h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeBEXrM= +google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A= +google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= +k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= +sigs.k8s.io/yaml v1.6.0 h1:G8fkbMSAFqgEFgh4b1wmtzDnioxFCUgTZhlbj5P9QYs= +sigs.k8s.io/yaml v1.6.0/go.mod h1:796bPqUfzR/0jLAl6XjHl3Ck7MiyVv8dbTdyT3/pMf4= diff --git a/AI/modelcloud/images/blob-server/Dockerfile b/AI/modelcloud/images/blob-server/Dockerfile new file mode 100644 index 000000000..f8ece4970 --- /dev/null +++ b/AI/modelcloud/images/blob-server/Dockerfile @@ -0,0 +1,32 @@ +# Copyright 2025 The Kubernetes Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM golang:1.25.1 AS builder + +ADD go.mod /workspace/ +ADD go.sum /workspace/ +ADD cmd/blob-server /workspace/cmd/blob-server/ +ADD pkg /workspace/pkg/ + +WORKDIR /workspace + +RUN CGO_ENABLED=0 go build -o /blob-server ./cmd/blob-server + +# TODO: Don't have permission to write to volumes if nonroot +#FROM gcr.io/distroless/static-debian12:nonroot +FROM gcr.io/distroless/static-debian12:latest + +COPY --from=builder /blob-server /blob-server + +ENTRYPOINT ["/blob-server"] \ No newline at end of file diff --git a/AI/modelcloud/images/vllm-frontend/Dockerfile b/AI/modelcloud/images/vllm-frontend/Dockerfile new file mode 100644 index 000000000..0b19a3739 --- /dev/null +++ b/AI/modelcloud/images/vllm-frontend/Dockerfile @@ -0,0 +1,39 @@ +# Copyright 2025 The Kubernetes Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM golang:1.25.1 AS builder + +ADD go.mod /workspace/ +ADD go.sum /workspace/ +ADD cmd/vllm-frontend /workspace/cmd/vllm-frontend/ +ADD pkg /workspace/pkg/ + +WORKDIR /workspace + +RUN CGO_ENABLED=0 go build -o /vllm-frontend ./cmd/vllm-frontend + + +FROM vllm/vllm-openai:v0.10.1.1 +# gemma3-6cf4765df9-c4nmt gemma3 DEBUG 09-08 14:57:56 [__init__.py:58] Checking if CUDA platform is available. +# gemma3-6cf4765df9-c4nmt gemma3 DEBUG 09-08 14:57:56 [__init__.py:82] Exception happens when checking CUDA platform: NVML Shared Library Not Found +# gemma3-6cf4765df9-c4nmt gemma3 DEBUG 09-08 14:57:56 [__init__.py:99] CUDA platform is not available because: NVML Shared Library Not Found + + +# FROM vllm/vllm-openai:v0.10.0 + +COPY --from=builder /vllm-frontend /vllm-frontend + +ENV LD_LIBRARY_PATH /usr/local/nvidia/lib64:/usr/local/lib/ + +ENTRYPOINT ["/vllm-frontend"] \ No newline at end of file diff --git a/AI/modelcloud/k8s/blob-server/manifest.yaml b/AI/modelcloud/k8s/blob-server/manifest.yaml new file mode 100644 index 000000000..6b983498e --- /dev/null +++ b/AI/modelcloud/k8s/blob-server/manifest.yaml @@ -0,0 +1,57 @@ +kind: ServiceAccount +apiVersion: v1 +metadata: + name: blob-server + +--- + +kind: Service +apiVersion: v1 +metadata: + name: blob-server + labels: + app: blob-server +spec: + selector: + app: blob-server + ports: + - port: 80 + targetPort: 8080 + protocol: TCP + +--- + +kind: StatefulSet +apiVersion: apps/v1 +metadata: + name: blob-server +spec: + podManagementPolicy: "Parallel" + replicas: 1 + selector: + matchLabels: + app: blob-server + #serviceName: blob-server + template: + metadata: + labels: + app: blob-server + spec: + serviceAccountName: blob-server + containers: + - name: blob-server + image: blob-server:latest # placeholder value, replaced by deployment scripts + env: + - name: CACHE_DIR + value: /blobs + volumeMounts: + - mountPath: /blobs + name: blobs + volumeClaimTemplates: + - metadata: + name: blobs + spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 100Gi \ No newline at end of file diff --git a/AI/modelcloud/k8s/gemma3/manifest.yaml b/AI/modelcloud/k8s/gemma3/manifest.yaml new file mode 100644 index 000000000..fc0010151 --- /dev/null +++ b/AI/modelcloud/k8s/gemma3/manifest.yaml @@ -0,0 +1,57 @@ +kind: ServiceAccount +apiVersion: v1 +metadata: + name: gemma3 + +--- + +kind: Service +apiVersion: v1 +metadata: + name: gemma3 + labels: + app: gemma3 +spec: + selector: + app: gemma3 + ports: + - port: 80 + targetPort: 8080 + protocol: TCP + +--- + +kind: Deployment +apiVersion: apps/v1 +metadata: + name: gemma3 +spec: + replicas: 1 + selector: + matchLabels: + app: gemma3 + #serviceName: gemma3 + template: + metadata: + labels: + app: gemma3 + spec: + serviceAccountName: gemma3 + containers: + - name: gemma3 + image: vllm-frontend:latest # placeholder value, replaced by deployment scripts + resources: + requests: + ephemeral-storage: "10Gi" + nvidia.com/gpu: "1" + limits: + ephemeral-storage: "10Gi" + nvidia.com/gpu: "1" + args: + - -- + - --tensor-parallel-size=1 + env: + - name: MODEL_NAME + value: "google/gemma-3-1b-it" + - name: VLLM_LOGGING_LEVEL + value: "DEBUG" diff --git a/AI/modelcloud/pkg/api/model.go b/AI/modelcloud/pkg/api/model.go new file mode 100644 index 000000000..cc8cc4a4f --- /dev/null +++ b/AI/modelcloud/pkg/api/model.go @@ -0,0 +1,28 @@ +// Copyright 2025 The Kubernetes Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +type Model struct { + Spec ModelSpec `json:"spec,omitempty"` +} + +type ModelSpec struct { + Files []ModelFile `json:"files,omitempty"` +} + +type ModelFile struct { + Path string `json:"path,omitempty"` + Hash string `json:"hash,omitempty"` +} diff --git a/AI/modelcloud/pkg/blobs/blobserver.go b/AI/modelcloud/pkg/blobs/blobserver.go new file mode 100644 index 000000000..bbbfeaa68 --- /dev/null +++ b/AI/modelcloud/pkg/blobs/blobserver.go @@ -0,0 +1,153 @@ +// Copyright 2025 The Kubernetes Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package blobs + +import ( + "context" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path/filepath" + "time" + + "k8s.io/klog/v2" +) + +type BlobServer struct { + // URL is the base URL to the blob-server, typically http://blob-store + URL *url.URL +} + +var _ BlobStore = &BlobServer{} + +func (l *BlobServer) Upload(ctx context.Context, r io.Reader, info BlobInfo) error { + url := l.URL.JoinPath(info.Hash) + return l.uploadFile(ctx, url.String(), r) +} + +func (l *BlobServer) Download(ctx context.Context, info BlobInfo, destPath string) error { + url := l.URL.JoinPath(info.Hash) + return l.downloadToFile(ctx, url.String(), destPath) +} + +func (l *BlobServer) downloadToFile(ctx context.Context, url string, destPath string) error { + log := klog.FromContext(ctx) + + dir := filepath.Dir(destPath) + tempFile, err := os.CreateTemp(dir, "model") + if err != nil { + return fmt.Errorf("creating temp file: %w", err) + } + + shouldDeleteTempFile := true + defer func() { + if shouldDeleteTempFile { + if err := os.Remove(tempFile.Name()); err != nil { + log.Error(err, "removing temp file", "path", tempFile.Name) + } + } + }() + + shouldCloseTempFile := true + defer func() { + if shouldCloseTempFile { + if err := tempFile.Close(); err != nil { + log.Error(err, "closing temp file", "path", tempFile.Name) + } + } + }() + + if err := l.downloadToWriter(ctx, url, tempFile); err != nil { + return fmt.Errorf("downloading from %q: %w", url, err) + } + + if err := tempFile.Close(); err != nil { + return fmt.Errorf("closing temp file: %w", err) + } + shouldCloseTempFile = false + + if err := os.Rename(tempFile.Name(), destPath); err != nil { + return fmt.Errorf("renaming temp file: %w", err) + } + shouldDeleteTempFile = false + + return nil +} + +func (l *BlobServer) downloadToWriter(ctx context.Context, url string, w io.Writer) error { + log := klog.FromContext(ctx) + + log.Info("downloading from url", "url", url) + + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return fmt.Errorf("creating request: %w", err) + } + + startedAt := time.Now() + + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("doing request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + if resp.StatusCode == 404 { + return fmt.Errorf("blob not found: %w", os.ErrNotExist) + } + return fmt.Errorf("unexpected status downloading from upstream source: %v", resp.Status) + } + + n, err := io.Copy(w, resp.Body) + if err != nil { + return fmt.Errorf("downloading from upstream source: %w", err) + } + + log.Info("downloaded blob", "url", url, "bytes", n, "duration", time.Since(startedAt)) + + return nil +} + +func (l *BlobServer) uploadFile(ctx context.Context, url string, r io.Reader) error { + log := klog.FromContext(ctx) + + log.Info("uploading to url", "url", url) + + req, err := http.NewRequestWithContext(ctx, "PUT", url, r) + if err != nil { + return fmt.Errorf("creating request: %w", err) + } + + startedAt := time.Now() + + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("doing request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 201 { + return fmt.Errorf("unexpected status uploading to %q: %v", url, resp.Status) + } + + log.Info("uploaded blob", "url", url, "duration", time.Since(startedAt)) + + return nil +} diff --git a/AI/modelcloud/pkg/blobs/interfaces.go b/AI/modelcloud/pkg/blobs/interfaces.go new file mode 100644 index 000000000..bd0992d45 --- /dev/null +++ b/AI/modelcloud/pkg/blobs/interfaces.go @@ -0,0 +1,39 @@ +// Copyright 2025 The Kubernetes Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package blobs + +import ( + "context" + "io" +) + +type BlobReader interface { + // If no such object exists, Download should return an error for which errors.Is(err, os.ErrNotExist) is true. + Download(ctx context.Context, info BlobInfo, destPath string) error +} + +type BlobStore interface { + BlobReader + + // Upload uploads the blob to the blobstore, verifying the hash. + // If an object with the same hash already exists, Upload should not modify the existing object. + // On success, Upload returns (true, nil). + // On failure, Upload returns (false, err). + Upload(ctx context.Context, r io.Reader, info BlobInfo) error +} + +type BlobInfo struct { + Hash string +} diff --git a/AI/modelcloud/pkg/blobs/localblobstore.go b/AI/modelcloud/pkg/blobs/localblobstore.go new file mode 100644 index 000000000..cc352e2a4 --- /dev/null +++ b/AI/modelcloud/pkg/blobs/localblobstore.go @@ -0,0 +1,136 @@ +// Copyright 2025 The Kubernetes Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package blobs + +import ( + "context" + "crypto/sha256" + "fmt" + "io" + "os" + "path/filepath" + "time" + + "k8s.io/klog/v2" +) + +type LocalBlobStore struct { + LocalDir string // Directory to store blobs +} + +var _ BlobStore = (*LocalBlobStore)(nil) + +func (j *LocalBlobStore) Upload(ctx context.Context, r io.Reader, info BlobInfo) error { + log := klog.FromContext(ctx) + + localPath := filepath.Join(j.LocalDir, info.Hash) + if err := os.MkdirAll(filepath.Dir(localPath), 0755); err != nil { + return fmt.Errorf("creating parent directories for %q: %w", localPath, err) + } + + stat, err := os.Stat(localPath) + if err == nil { + log.Info("file already exists, skipping upload", "path", localPath, "size", stat.Size(), "modTime", stat.ModTime()) + return nil + } + + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("checking for destination file %q: %w", localPath, err) + } + + // TODO: Try to optimize case where already exists? + if _, err := writeToFile(ctx, r, localPath, info); err != nil { + return fmt.Errorf("writing file %q: %w", localPath, err) + } + + log.Info("added blob to local store", "path", localPath) + + return nil +} + +func (j *LocalBlobStore) Download(ctx context.Context, info BlobInfo, destinationPath string) error { + log := klog.FromContext(ctx) + + localPath := filepath.Join(j.LocalDir, info.Hash) + + startedAt := time.Now() + f, err := os.Open(localPath) + if err != nil { + return fmt.Errorf("opening local blob %q: %w", localPath, err) + } + defer f.Close() + + n, err := writeToFile(ctx, f, destinationPath, info) + if err != nil { + return fmt.Errorf("copying file %q to %q: %w", localPath, destinationPath, err) + } + + log.Info("downloaded blob from local store", "source", localPath, "destination", destinationPath, "bytes", n, "duration", time.Since(startedAt)) + + return nil +} + +func writeToFile(ctx context.Context, src io.Reader, destinationPath string, info BlobInfo) (int64, error) { + log := klog.FromContext(ctx) + + dir := filepath.Dir(destinationPath) + tempFile, err := os.CreateTemp(dir, "download") + if err != nil { + return 0, fmt.Errorf("creating temp file: %w", err) + } + + shouldDeleteTempFile := true + defer func() { + if shouldDeleteTempFile { + if err := os.Remove(tempFile.Name()); err != nil { + log.Error(err, "removing temp file", "path", tempFile.Name) + } + } + }() + + shouldCloseTempFile := true + defer func() { + if shouldCloseTempFile { + if err := tempFile.Close(); err != nil { + log.Error(err, "closing temp file", "path", tempFile.Name) + } + } + }() + + hasher := sha256.New() + mw := io.MultiWriter(tempFile, hasher) + + n, err := io.Copy(mw, src) + if err != nil { + return n, fmt.Errorf("downloading from upstream source: %w", err) + } + + calculatedHash := fmt.Sprintf("%x", hasher.Sum(nil)) + if info.Hash != "" && calculatedHash != info.Hash { + return n, fmt.Errorf("hash mismatch: expected %q, got %q", info.Hash, calculatedHash) + } + + if err := tempFile.Close(); err != nil { + return n, fmt.Errorf("closing temp file: %w", err) + } + shouldCloseTempFile = false + + if err := os.Rename(tempFile.Name(), destinationPath); err != nil { + return n, fmt.Errorf("renaming temp file: %w", err) + } + shouldDeleteTempFile = false + + return n, nil +}