Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions cmd/ci-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,16 +977,18 @@ func (o *options) Report(errs ...error) {
}
}

func (o *options) startHTTPServer(cancel func(), srv *http.Server) error {
// startHTTPServer start the HTTP server asynchronously and returns the address it
// is listening to.
func (o *options) startHTTPServer(cancel func(), srv *http.Server) (string, error) {
srvIP := os.Getenv(api.CIOperatorHTTPServerIPEnvVarName)
if srvIP == "" {
return nil
return "", nil
}

ipAndPort := srvIP + ":" + strconv.Itoa(api.CIOperatorHTTPServerPort)
ln, err := net.Listen("tcp", ipAndPort)
if err != nil {
return fmt.Errorf("listen tcp on %s: %w", ipAndPort, err)
return "", fmt.Errorf("listen tcp on %s: %w", ipAndPort, err)
}

go func() {
Expand All @@ -997,18 +999,19 @@ func (o *options) startHTTPServer(cancel func(), srv *http.Server) error {
}
}()

return nil
addr := "http://" + ipAndPort
return addr, nil
}

func (o *options) Run() (errs []error) {
start := time.Now()
var srv *http.Server
var httpSrv *http.Server

defer func() {
logrus.Infof("Ran for %s", time.Since(start).Truncate(time.Second))
o.metricsAgent.Stop()
if srv != nil {
if err := srv.Close(); err != nil {
if httpSrv != nil {
if err := httpSrv.Close(); err != nil {
errs = append(errs, fmt.Errorf("close http server: %w", err))
}
}
Expand All @@ -1020,13 +1023,14 @@ func (o *options) Run() (errs []error) {
cancel()
}

srvMux := http.NewServeMux()
srv = &http.Server{
Handler: srvMux,
httpSrvMux := http.NewServeMux()
httpSrv = &http.Server{
Handler: httpSrvMux,
BaseContext: func(net.Listener) context.Context { return ctx },
}

if err := o.startHTTPServer(cancel, srv); err != nil {
httpSrvAddr, err := o.startHTTPServer(cancel, httpSrv)
if err != nil {
errs = append(errs, fmt.Errorf("run http server: %w", err))
return
}
Expand Down Expand Up @@ -1089,7 +1093,8 @@ func (o *options) Run() (errs []error) {
cfg.IntegratedStreams = streams
cfg.InjectedTest = o.injectTest != ""
cfg.GSMConfig = gsmConfig
cfg.HTTPServerMux = srvMux
cfg.HTTPServerAddr = httpSrvAddr
cfg.HTTPServerMux = httpSrvMux
// load the graph from the configuration
buildSteps, promotionSteps, err := defaults.FromConfig(ctx, cfg)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/api/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ const (

CIOperatorHTTPServerIPEnvVarName = "HTTP_SERVER_IP"
CIOperatorHTTPServerPort = 8080
LeaseProxyServerURLEnvVarName = "LEASE_PROXY_SERVER_URL"
)

var (
Expand Down
3 changes: 2 additions & 1 deletion pkg/defaults/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ type Config struct {
SkippedImages sets.Set[string]
params *api.DeferredParameters

HTTPServerMux *http.ServeMux
HTTPServerAddr string
HTTPServerMux *http.ServeMux
}

type Clients struct {
Expand Down
9 changes: 6 additions & 3 deletions pkg/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ func fromConfig(ctx context.Context, cfg *Config) ([]api.Step, []api.Step, error
rawSteps = append(cfg.GraphConf.Steps, rawSteps...)
rawSteps = append(rawSteps, stepsForImageOverrides(utils.GetOverriddenImages())...)

buildSteps = append(buildSteps, leaseProxyServerStep(cfg)...)
for _, leaseProxyServerStep := range leaseProxyServerStep(cfg) {
buildSteps = append(buildSteps, leaseProxyServerStep)
addProvidesForStep(leaseProxyServerStep, cfg.params)
}

for _, rawStep := range rawSteps {
if testStep := rawStep.TestStepConfiguration; testStep != nil {
Expand Down Expand Up @@ -1239,7 +1242,7 @@ func filterRequiredBinariesFromSkipped(images []api.ProjectDirectoryImageBuildSt
}

func isLeaseProxyServerAvailable(cfg *Config) bool {
return cfg.LeaseClientEnabled
return cfg.LeaseClientEnabled && cfg.HTTPServerAddr != "" && cfg.HTTPServerMux != nil
}

func leaseProxyServerStep(cfg *Config) []api.Step {
Expand All @@ -1249,5 +1252,5 @@ func leaseProxyServerStep(cfg *Config) []api.Step {
}

logger := logrus.NewEntry(logrus.StandardLogger()).WithField("step", "lease-proxy-server")
return append(ret, steps.LeaseProxyStep(logger))
return append(ret, steps.LeaseProxyStep(logger, cfg.HTTPServerAddr, cfg.HTTPServerMux, cfg.LeaseClient))
}
6 changes: 5 additions & 1 deletion pkg/defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1863,6 +1863,9 @@ func TestFromConfig(t *testing.T) {
name: "enable lease proxy server",
enableLeaseClient: true,
expectedSteps: []string{"[output-images]", "[images]", "lease-proxy-server"},
expectedParams: map[string]string{
"LEASE_PROXY_SERVER_URL": "http://10.0.0.1:8080",
},
}} {
t.Run(tc.name, func(t *testing.T) {
jobSpec := api.JobSpec{
Expand Down Expand Up @@ -1909,9 +1912,10 @@ func TestFromConfig(t *testing.T) {
NodeArchitectures: nil,
IntegratedStreams: map[string]*configresolver.IntegratedStream{},
InjectedTest: tc.injectedTest,
GSMConfig: nil,
MetricsAgent: nil,
SkippedImages: tc.skippedImages,
HTTPServerAddr: "http://10.0.0.1:8080",
HTTPServerMux: &http.ServeMux{},
}
configSteps, post, err := fromConfig(context.Background(), cfg)
if diff := cmp.Diff(tc.expectedErr, err); diff != "" {
Expand Down
34 changes: 25 additions & 9 deletions pkg/lease/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ type Metrics struct {
Free, Leased int
}

type clientOptions struct {
randID func() string
}

type ClientOptions func(*clientOptions)

func WithRandID(randID func() string) ClientOptions {
return func(o *clientOptions) { o.randID = randID }
}

// Client manages resource leases, acquiring, releasing, and keeping them
// updated.
type Client interface {
Expand All @@ -65,23 +75,28 @@ type Client interface {
}

// NewClient creates a client that leases resources with the specified owner.
func NewClient(owner, url, username string, passwordGetter func() []byte, retries int, acquireTimeout time.Duration) (Client, error) {
randId = func() string {
return strconv.Itoa(rand.Int())
}
func NewClient(owner, url, username string, passwordGetter func() []byte, retries int, acquireTimeout time.Duration, opts ...ClientOptions) (Client, error) {
c, err := boskos.NewClientWithPasswordGetter(owner, url, username, passwordGetter)
if err != nil {
return nil, err
}
c.DistinguishNotFoundVsTypeNotFound = true
return newClient(c, retries, acquireTimeout), nil
return newClient(c, retries, acquireTimeout, opts...), nil
}

// for test mocking
var randId func() string
func newClient(boskos boskosClient, retries int, acquireTimeout time.Duration, opts ...ClientOptions) Client {
defOpts := &clientOptions{
randID: func() string {
return strconv.Itoa(rand.Int())
},
}

for _, f := range opts {
f(defOpts)
}

func newClient(boskos boskosClient, retries int, acquireTimeout time.Duration) Client {
return &client{
opts: defOpts,
boskos: boskos,
retries: retries,
acquireTimeout: acquireTimeout,
Expand All @@ -91,6 +106,7 @@ func newClient(boskos boskosClient, retries int, acquireTimeout time.Duration) C

type client struct {
sync.RWMutex
opts *clientOptions
boskos boskosClient
retries int
acquireTimeout time.Duration
Expand All @@ -113,7 +129,7 @@ func (c *client) Acquire(rtype string, n uint, ctx context.Context, cancel conte
var ret []string
// TODO `m` processes may fight for the last `m * n` remaining leases
for i := uint(0); i < n; i++ {
r, err := c.boskos.AcquireWaitWithPriority(ctx, rtype, freeState, leasedState, randId())
r, err := c.boskos.AcquireWaitWithPriority(ctx, rtype, freeState, leasedState, c.opts.randID())
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/lease/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func TestAcquire(t *testing.T) {
ctx := context.Background()
var calls []string
client := NewFakeClient("owner", "url", 0, nil, &calls)
client := NewFakeClient("owner", "url", 0, nil, &calls, nil)
if _, err := client.Acquire("rtype", 1, ctx, nil); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -43,7 +43,7 @@ func TestAcquire(t *testing.T) {
func TestHeartbeatCancel(t *testing.T) {
ctx := context.Background()
var calls []string
client := NewFakeClient("owner", "url", 0, map[string]error{"updateone owner rtype_0 leased 0": errors.New("injected error")}, &calls)
client := NewFakeClient("owner", "url", 0, map[string]error{"updateone owner rtype_0 leased 0": errors.New("injected error")}, &calls, nil)
var called bool
if _, err := client.Acquire("rtype", 1, ctx, func() { called = true }); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestHeartbeatRetries(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
var calls []string
client := NewFakeClient("owner", "url", 2, tc.failures, &calls)
client := NewFakeClient("owner", "url", 2, tc.failures, &calls, nil)
var called bool
if _, err := client.Acquire("rtype", 1, ctx, func() { called = true }); err != nil {
t.Fatal(err)
Expand Down
28 changes: 18 additions & 10 deletions pkg/lease/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,25 @@ import (
)

type fakeClient struct {
owner string
failures map[string]error
calls *[]string
owner string
failures map[string]error
calls *[]string
resources map[string]*common.Resource
}

func NewFakeClient(owner, url string, retries int, failures map[string]error, calls *[]string) Client {
func NewFakeClient(owner, url string, retries int, failures map[string]error, calls *[]string, resources map[string]*common.Resource) Client {
if calls == nil {
calls = &[]string{}
}
randId = func() string {
return "random"
if resources == nil {
resources = make(map[string]*common.Resource)
}
return newClient(&fakeClient{
owner: owner,
failures: failures,
calls: calls,
}, retries, time.Duration(0))
owner: owner,
failures: failures,
calls: calls,
resources: resources,
}, retries, time.Duration(0), WithRandID(func() string { return "random" }))
}

func (c *fakeClient) addCall(call string, args ...string) error {
Expand All @@ -44,11 +46,17 @@ func (c *fakeClient) addCall(call string, args ...string) error {

func (c *fakeClient) AcquireWaitWithPriority(ctx context.Context, rtype, state, dest, requestID string) (*common.Resource, error) {
err := c.addCall("acquireWaitWithPriority", rtype, state, dest, requestID)
if res, ok := c.resources[fmt.Sprintf("acquireWaitWithPriority_%s_%s_%s_%s", rtype, state, dest, requestID)]; ok {
return res, nil
}
return &common.Resource{Name: fmt.Sprintf("%s_%d", rtype, len(*c.calls)-1)}, err
}

func (c *fakeClient) Acquire(rtype, state, dest string) (*common.Resource, error) {
err := c.addCall("acquire", rtype, state, dest)
if res, ok := c.resources[fmt.Sprintf("acquire_%s_%s_%s", rtype, state, dest)]; ok {
return res, nil
}
return &common.Resource{Name: fmt.Sprintf("%s_%d", rtype, len(*c.calls)-1)}, err
Comment on lines 47 to 60
Copy link
Copy Markdown

@coderabbitai coderabbitai bot Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Preserve injected failures even when a predefined resource exists.

addCall can return an error, but the current branch returns nil when a resource is found, masking failures in tests. Return the resource with err instead.

🛠️ Suggested fix
 	if res, ok := c.resources[fmt.Sprintf("acquireWaitWithPriority_%s_%s_%s_%s", rtype, state, dest, requestID)]; ok {
-		return res, nil
+		return res, err
 	}
@@
 	if res, ok := c.resources[fmt.Sprintf("acquire_%s_%s_%s", rtype, state, dest)]; ok {
-		return res, nil
+		return res, err
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/lease/fake.go` around lines 47 - 60, The fake client currently discards
any error returned by addCall when a predefined resource is found; in
AcquireWaitWithPriority and Acquire return the found resource together with err
instead of returning (res, nil). Update the success branches in
fakeClient.AcquireWaitWithPriority (lookup key
"acquireWaitWithPriority_%s_%s_%s_%s") and fakeClient.Acquire (lookup key
"acquire_%s_%s_%s") to return (res, err) so injected failures from addCall are
preserved.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to address this issue in this PR

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danilo-gemoli, understood! Would you like me to open a GitHub issue to track this for a future PR?

}

Expand Down
Loading