Skip to content

Commit

Permalink
Decouple persist and display events (#15529)
Browse files Browse the repository at this point in the history
# Description

This removes a scenario where events could not be persisted to the cloud
because they were waiting on the same event being displayed

~This uses the same buffer size for both the display and persist
channels~ [Removed to make PR a single change]

The primary change, however, is to stop rendering the tree every time a
row is updated, instead, theis renders when the display actually happens
in the the `frame` call. The renderer instead simply marks itself as
dirty in the `rowUpdated`, `tick`, `systemMessage` and `done` methods
and relies on the frame being redrawn on a 60Hz timer (the `done` method
calls `frame` explicitly). This makes the rowUpdated call exceedingly
cheap (it simply marks the treeRenderer as dirty) which allows the
ProgressDisplay instance to service the display events faster, which
prevents it from blocking the persist events.

This requires a minor refactor to ensure that the display object is
available in the frame method

Because the treeRenderer is calling back into the ProgressDisplay object
in a goroutine, the ProgressDisplay object needs to be thread safe, so a
read-write mutex is added to protect the `eventUrnToResourceRow` map.
The unused `urnToID` map was removed in passing.

## Impact

There are scenarios where the total time taken for an operation was
dominated by servicing the events.

This reduces the time for a complex (~2000 resources) `pulumi preview`
from 1m45s to 45s

For a `pulumi up` with `-v=11` on a the same stack, where all the
register resource spans were completing in 1h6m and the
postEngineEventBatch events were taking 3h45m, this PR removes the time
impact of reporting the events (greatly inflated by the high verbosity
setting) and the operation takes the anticipated 1h6m

<!--- 
Thanks so much for your contribution! If this is your first time
contributing, please ensure that you have read the
[CONTRIBUTING](https://github.com/pulumi/pulumi/blob/master/CONTRIBUTING.md)
documentation.
-->

<!--- Please include a summary of the change and which issue is fixed.
Please also include relevant motivation and context. -->

Fixes # (issue)

## Checklist

- [X] I have run `make tidy` to update any new dependencies
- [X] I have run `make lint` to verify my code passes the lint check
  - [ ] I have formatted my code using `gofumpt`

<!--- Please provide details if the checkbox below is to be left
unchecked. -->
- [ ] I have added tests that prove my fix is effective or that my
feature works
<!--- 
User-facing changes require a CHANGELOG entry.
-->
- [ ] I have run `make changelog` and committed the
`changelog/pending/<file>` documenting my change
<!--
If the change(s) in this PR is a modification of an existing call to the
Pulumi Cloud,
then the service should honor older versions of the CLI where this
change would not exist.
You must then bump the API version in
/pkg/backend/httpstate/client/api.go, as well as add
it to the service.
-->
- [ ] Yes, there are changes in this PR that warrants bumping the Pulumi
Cloud API version
<!-- @pulumi employees: If yes, you must submit corresponding changes in
the service repo. -->

---------

Co-authored-by: Paul Roberts <proberts@pulumi.com>
Co-authored-by: Thomas Gummerer <t.gummerer@gmail.com>
  • Loading branch information
3 people committed Mar 4, 2024
1 parent 8c7a9e5 commit ff7a7b4
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 65 deletions.
52 changes: 28 additions & 24 deletions pkg/backend/display/jsonmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type messageRenderer struct {
opts Options
isInteractive bool

display *ProgressDisplay
terminal terminal.Terminal
terminalWidth int
terminalHeight int
Expand Down Expand Up @@ -139,6 +140,10 @@ func (r *messageRenderer) Close() error {
return nil
}

func (r *messageRenderer) initializeDisplay(display *ProgressDisplay) {
r.display = display
}

// Converts the colorization tags in a progress message and then actually writes the progress
// message to the output stream. This should be the only place in this file where we actually
// process colorization tags.
Expand Down Expand Up @@ -174,21 +179,20 @@ func (r *messageRenderer) writeSimpleMessage(msg string) {
r.colorizeAndWriteProgress(makeMessageProgress(msg))
}

func (r *messageRenderer) println(display *ProgressDisplay, line string) {
func (r *messageRenderer) println(line string) {
r.writeSimpleMessage(line)
}

func (r *messageRenderer) tick(display *ProgressDisplay) {
func (r *messageRenderer) tick() {
if r.isInteractive {
r.render(display, false)
r.render(false)
} else {
// Update the spinner to let the user know that that work is still happening.
r.nonInteractiveSpinner.Tick()
}
}

func (r *messageRenderer) renderRow(display *ProgressDisplay,
id string, colorizedColumns []string, maxColumnLengths []int,
func (r *messageRenderer) renderRow(id string, colorizedColumns []string, maxColumnLengths []int,
) {
row := renderRow(colorizedColumns, maxColumnLengths)
if r.isInteractive {
Expand All @@ -212,50 +216,50 @@ func (r *messageRenderer) renderRow(display *ProgressDisplay,
}
}

func (r *messageRenderer) rowUpdated(display *ProgressDisplay, row Row) {
func (r *messageRenderer) rowUpdated(row Row) {
if r.isInteractive {
// if we're in a terminal, then refresh everything so that all our columns line up
r.render(display, false)
r.render(false)
} else {
// otherwise, just print out this single row.
colorizedColumns := row.ColorizedColumns()
colorizedColumns[display.suffixColumn] += row.ColorizedSuffix()
r.renderRow(display, "", colorizedColumns, nil)
colorizedColumns[r.display.suffixColumn] += row.ColorizedSuffix()
r.renderRow("", colorizedColumns, nil)
}
}

func (r *messageRenderer) systemMessage(display *ProgressDisplay, payload engine.StdoutEventPayload) {
func (r *messageRenderer) systemMessage(payload engine.StdoutEventPayload) {
if r.isInteractive {
// if we're in a terminal, then refresh everything. The system events will come after
// all the normal rows
r.render(display, false)
r.render(false)
} else {
// otherwise, in a non-terminal, just print out the actual event.
r.writeSimpleMessage(renderStdoutColorEvent(payload, display.opts))
r.writeSimpleMessage(renderStdoutColorEvent(payload, r.display.opts))
}
}

func (r *messageRenderer) done(display *ProgressDisplay) {
func (r *messageRenderer) done() {
if r.isInteractive {
r.render(display, false)
r.render(false)
}
}

func (r *messageRenderer) render(display *ProgressDisplay, done bool) {
if !r.isInteractive || display.headerRow == nil {
func (r *messageRenderer) render(done bool) {
if !r.isInteractive || r.display.headerRow == nil {
return
}

// make sure our stored dimension info is up to date
r.updateTerminalDimensions()

rootNodes := display.generateTreeNodes()
rootNodes = display.filterOutUnnecessaryNodesAndSetDisplayTimes(rootNodes)
rootNodes := r.display.generateTreeNodes()
rootNodes = r.display.filterOutUnnecessaryNodesAndSetDisplayTimes(rootNodes)
sortNodes(rootNodes)
display.addIndentations(rootNodes, true /*isRoot*/, "")
r.display.addIndentations(rootNodes, true /*isRoot*/, "")

maxSuffixLength := 0
for _, v := range display.suffixesArray {
for _, v := range r.display.suffixesArray {
runeCount := utf8.RuneCountInString(v)
if runeCount > maxSuffixLength {
maxSuffixLength = runeCount
Expand All @@ -264,18 +268,18 @@ func (r *messageRenderer) render(display *ProgressDisplay, done bool) {

var rows [][]string
var maxColumnLengths []int
display.convertNodesToRows(rootNodes, maxSuffixLength, &rows, &maxColumnLengths)
r.display.convertNodesToRows(rootNodes, maxSuffixLength, &rows, &maxColumnLengths)

removeInfoColumnIfUnneeded(rows)

for i, row := range rows {
r.renderRow(display, strconv.Itoa(i), row, maxColumnLengths)
r.renderRow(strconv.Itoa(i), row, maxColumnLengths)
}

systemID := len(rows)

printedHeader := false
for _, payload := range display.systemEventPayloads {
for _, payload := range r.display.systemEventPayloads {
msg := payload.Color.Colorize(payload.Message)
lines := splitIntoDisplayableLines(msg)

Expand Down Expand Up @@ -303,7 +307,7 @@ func (r *messageRenderer) render(display *ProgressDisplay, done bool) {
}

if done {
r.println(display, "")
r.println("")
}
}

Expand Down
52 changes: 37 additions & 15 deletions pkg/backend/display/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"runtime"
"sort"
"strings"
"sync"
"time"
"unicode"

Expand Down Expand Up @@ -65,15 +66,19 @@ type DiagInfo struct {
type progressRenderer interface {
io.Closer

tick(display *ProgressDisplay)
rowUpdated(display *ProgressDisplay, row Row)
systemMessage(display *ProgressDisplay, payload engine.StdoutEventPayload)
done(display *ProgressDisplay)
println(display *ProgressDisplay, line string)
initializeDisplay(display *ProgressDisplay)
tick()
rowUpdated(row Row)
systemMessage(payload engine.StdoutEventPayload)
done()
println(line string)
}

// ProgressDisplay organizes all the information needed for a dynamically updated "progress" view of an update.
type ProgressDisplay struct {
// mutex is used to synchronize access to eventUrnToResourceRow, which is accessed by the treeRenderer
m sync.RWMutex

opts Options

renderer progressRenderer
Expand Down Expand Up @@ -134,9 +139,6 @@ type ProgressDisplay struct {
// the list of suffixes to rotate through
suffixesArray []string

// Maps used so we can generate short IDs for resource urns.
urnToID map[resource.URN]string

// Structure that tracks the time taken to perform an action on a resource.
opStopwatch opStopwatch

Expand Down Expand Up @@ -235,10 +237,10 @@ func ShowProgressEvents(op string, action apitype.UpdateKind, stack tokens.Stack
eventUrnToResourceRow: make(map[resource.URN]ResourceRow),
suffixColumn: int(statusColumn),
suffixesArray: []string{"", ".", "..", "..."},
urnToID: make(map[resource.URN]string),
displayOrderCounter: 1,
opStopwatch: newOpStopwatch(),
}
renderer.initializeDisplay(display)

ticker := time.NewTicker(1 * time.Second)
if opts.deterministicOutput {
Expand All @@ -253,7 +255,7 @@ func ShowProgressEvents(op string, action apitype.UpdateKind, stack tokens.Stack
}

func (display *ProgressDisplay) println(line string) {
display.renderer.println(display, line)
display.renderer.println(line)
}

type treeNode struct {
Expand Down Expand Up @@ -310,6 +312,11 @@ func (display *ProgressDisplay) getOrCreateTreeNode(
}

func (display *ProgressDisplay) generateTreeNodes() []*treeNode {
// We take the reader lock here because this is called from the renderer and reads from
// the eventUrnToResourceRow map
display.m.RLock()
defer display.m.RUnlock()

result := []*treeNode{}

result = append(result, &treeNode{
Expand Down Expand Up @@ -440,6 +447,10 @@ func removeInfoColumnIfUnneeded(rows [][]string) {
// Specifically, this will update the status messages for any resources, and will also then
// print out all final diagnostics. and finally will print out the summary.
func (display *ProgressDisplay) processEndSteps() {
// Take the read lock here because we are reading from the eventUrnToResourceRow map
display.m.RLock()
defer display.m.RUnlock()

// Figure out the rows that are currently in progress.
var inProgressRows []ResourceRow
if !display.isTerminal {
Expand All @@ -458,15 +469,15 @@ func (display *ProgressDisplay) processEndSteps() {
// since the display was marked 'done'.
if !display.isTerminal {
for _, v := range inProgressRows {
display.renderer.rowUpdated(display, v)
display.renderer.rowUpdated(v)
}
}

// Now refresh everything. This ensures that we go back and remove things like the diagnostic
// messages from a status message (since we're going to print them all) below. Note, this will
// only do something in a terminal. This is what we want, because if we're not in a terminal we
// don't really want to reprint any finished items we've already printed.
display.renderer.done(display)
display.renderer.done()

// Render the policies section; this will print all policy packs that ran plus any specific
// policies that led to violations or remediations. This comes before diagnostics since policy
Expand Down Expand Up @@ -807,10 +818,14 @@ func (display *ProgressDisplay) processTick() {
// often timeout a process if output is not seen in a while.
display.currentTick++

display.renderer.tick(display)
display.renderer.tick()
}

func (display *ProgressDisplay) getRowForURN(urn resource.URN, metadata *engine.StepEventMetadata) ResourceRow {
// Take the write lock here because this can write the the eventUrnToResourceRow map
display.m.Lock()
defer display.m.Unlock()

// If there's already a row for this URN, return it.
row, has := display.eventUrnToResourceRow[urn]
if has {
Expand Down Expand Up @@ -993,19 +1008,26 @@ func (display *ProgressDisplay) processNormalEvent(event engine.Event) {
contract.Failf("Unhandled event type '%s'", event.Type)
}

display.renderer.rowUpdated(display, row)
display.renderer.rowUpdated(row)
}

func (display *ProgressDisplay) handleSystemEvent(payload engine.StdoutEventPayload) {
// We need too take the writer lock here because ensureHeaderAndStackRows expects to be
// called under the write lock
display.m.Lock()
defer display.m.Unlock()

// Make sure we have a header to display
display.ensureHeaderAndStackRows()

display.systemEventPayloads = append(display.systemEventPayloads, payload)

display.renderer.systemMessage(display, payload)
display.renderer.systemMessage(payload)
}

func (display *ProgressDisplay) ensureHeaderAndStackRows() {
contract.Assertf(!display.m.TryLock(), "ProgressDisplay.ensureHeaderAndStackRows MUST be called "+
"under the write lock")
if display.headerRow == nil {
// about to make our first status message. make sure we present the header line first.
display.headerRow = &headerRowData{display: display}
Expand Down

0 comments on commit ff7a7b4

Please sign in to comment.