-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Log Streaming Bug Fix and Refactor #101
Conversation
/ptal @msarvar @nishkrishnan #orchestration |
|
||
pull := pullInfo.String() | ||
|
||
err = j.ProjectCommandOutputHandler.Receive(pull, func(msg string) error { | ||
// Buffer size = 10 allows for any dealys in ws connection message. | ||
wsChannel := make(chan string, 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why do we need buffer size here? What delay in connection impacts the buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we get a new message in the ProjectCmdOutput channel, we iterate through all the channels in ReceiverBuffer and forward the message. However, if a unbuffered channel is not ready to receive msg, we delete the chan here:
atlantis/server/handlers/project_command_output_handler.go
Lines 120 to 127 in 22f56bb
for ch := range p.receiverBuffers[pull] { | |
select { | |
case ch <- line: | |
default: | |
// Buffered chan of size 10. Delete chan if still blocking | |
delete(p.receiverBuffers[pull], ch) | |
} | |
} |
This is what happens when the UI gets stuck. When one of the message takes longer to send (callback function) through the ws and the receiving channel is not ready to receive the message (shown below), we just delete the channel which causes the UI to get stuck since there's no channel that's forwarding the messages to the UI.
atlantis/server/handlers/project_command_output_handler.go
Lines 69 to 73 in 22f56bb
for msg := range ch { | |
if err := callback(msg); err != nil { | |
return err | |
} | |
} |
Making the channel buffered (size 10 for example) allows the writeLogLine() method to keep on pushing messages even if the channel is not ready to receive the messages yet. If it's still not ready after 10 enqued messaged, we could assume that something's wrong with the ch and delete the channel. We could increase the buffer size imo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the fix to the UI bug btw 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, ok. If that's the case we can revert this value back to 1000 as it was in the original code. And we can revisit the implementation later. Receiving part of log streaming is a bit too complicated for what it is doing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True! I'll revisit the architecture in a later PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work!
for { | ||
_, _, err := c.ReadMessage() | ||
if err != nil { | ||
j.Logger.Warn("Failed to read WS message: %s", err) | ||
return | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand the point of this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently, there needs to be a reader in order to listen for events from the ws client. Since we're using the close handler here:
atlantis/server/controllers/logstreaming_controller.go
Lines 140 to 145 in 22f56bb
c.SetCloseHandler(func(code int, text string) error { | |
// Close the channnel after websocket connection closed. | |
// Will gracefully exit the ProjectCommandOutputHandler.Receive() call and cleanup. | |
close(wsChannel) | |
return nil | |
}) |
Removing the reader makes the closeHandler() non-functional.
if p.receiverBuffers[pull] == nil { | ||
p.receiverBuffers[pull] = map[chan string]bool{} | ||
} | ||
p.receiverBuffers[pull][ch] = true | ||
p.controllerBufferLock.Unlock() | ||
return ch | ||
p.receiverBuffersLock.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are so many locks being used everywhere it makes everything kind of difficult to understand/read. Not necessary to change as part of this PR, but as a thought exercise it would be good to think about basically having a single thread manage receiver buffers and projectBuffers and basically using channels to communicate between that thread and other threads thereby eliminating the need to have locks everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can implement something more event driven? We can register new webhooks to output_handler instances and emit any new requests to that webhook that should remove the need to using channels to sending messages to the webhook sessions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm that's true. I was trying to fix what was already working but I could definitely work on some refactoring and making the code-flow more understandable. I'll work on it after this PR.
|
||
// Avoid deadlock when projectOutputBuffer size is greater than the channel (currently set to 1000) | ||
// Running this as a goroutine allows for the channel to be read in callback | ||
go p.addChan(ch, projectInfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: I think my suggestion would eliminate the need for this. Kinda weird to be creating a new thread for every ws conn just to register it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, will rethink about how we do this in a later PR.
defer c.Close() | ||
// Buffer size set to 1000 to ensure messages get queued (upto 1000) if the receiverCh is not ready to | ||
// receive messages before the channel is closed and resources cleaned up. | ||
receiverChan := make(chan string, 1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small nit: don't need to suffix the variable with its type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
@@ -29,7 +31,7 @@ type ProjectCommandOutputHandler interface { | |||
Clear(ctx models.ProjectCommandContext) | |||
|
|||
// Receive will create a channel for projectPullInfo and run a callback function argument when the new channel receives a message. | |||
Receive(projectInfo string, callback func(msg string) error) error | |||
Receive(projectInfo string, ch chan string, callback func(msg string) error) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can probably remove argument names from the interface. Or update the ch
to receiver
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, couple nits
No description provided.