-
Notifications
You must be signed in to change notification settings - Fork 428
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sdk, hatchery, worker): get queue from server side events #3376
Conversation
sdk/cdsclient/http_sse.go
Outdated
} | ||
|
||
//Client is the default client used for requests. | ||
var sseClient = &http.Client{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sseClient
is unused
This comment has been minimized.
This comment has been minimized.
sdk/cdsclient/http_sse.go
Outdated
} | ||
|
||
// RequestSSEGet takes the uri of an SSE stream and channel, and will send an Event | ||
// down the channel when recieved, until the stream is closed. It will then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recieved => received
sdk/cdsclient/client_queue.go
Outdated
chanSSEvt := make(chan SSEvent) | ||
sdk.GoRoutine("RequestSSEGet", func() { | ||
for ctx.Err() == nil { | ||
time.Sleep(5 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sleep should be at the end of the loop, if ctx.Err() != nil in that delay a useless SSE request will be prepared.
} | ||
|
||
if len(bs) < 2 { | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case where err == io.EOF, we should break.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
currEvent.Data = bytes.NewBuffer(bytes.TrimSpace(spl[1])) | ||
evCh <- *currEvent | ||
} | ||
if err == io.EOF { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should be just after if err != nil && err != io.EOF
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
sdk/cdsclient/client_queue.go
Outdated
if errParse != nil { | ||
errs <- sdk.WrapError(errParse, "Unable to load jobs, failed to parse API Time") | ||
continue | ||
if _, err := c.GetJSON("/queue/workflows", &queue); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing query params modelType & ratio:
url, _ := url.Parse("/queue/workflows")
q := url.Query()
if ratioService != nil {
q.Add("ratioService", fmt.Sprintf("%d", *ratioService))
}
if modelType != "" {
q.Add("modelType", modelType)
}
url.RawQuery = q.Encode()
_, header, _, errReq := c.RequestJSON(http.MethodGet, url.String(), nil, &queue, SetHeader(RequestedIfModifiedSinceHeader, t0.Format(time.RFC1123)))
On master branch, the oldJobsTicker does not do that, but it's a mistake. I don't fix it on master to avoid conflict with this branch. ie: hatchery swarm with ratioService 100% receives 'old' jobs without service, but it's not expected. The ticket jobsTicker is ok (no worker without service send to hatchery swarm)
// wait for the grace time before pushing the job in the channel | ||
go func() { | ||
time.Sleep(time.Duration(graceTime) * time.Second) | ||
job, err := c.QueueJobInfo(int64(jobRunID)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Job received by SSE should contains modelType and ContainsService attributes.
So, an hatchery openstack avoid call QueueJobInfo for each job type docker,
a hatchery marathon avoid cal QueueJobInfo for each job containsService, etc...
This will be the same behaviour with existing call /queue/workflow, with queryParams modelType and ratioService
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ovh/cds