Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 56 additions & 17 deletions cli/commands/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"time"

tea "github.com/charmbracelet/bubbletea"
Expand Down Expand Up @@ -501,13 +502,34 @@ func Deploy(ctx *Context, opts struct {
ctx.Log.Debug("manifest computation failed, falling back to full upload", "error", manifestErr)
}

// Compute uncompressed totals for progress estimation and cached bytes for the summary.
var totalUncompressed int64
var cachedBytes int64
if manifest != nil {
var totalManifestBytes int64
for _, m := range manifest {
totalManifestBytes += m.Size
}
if useOptimized && neededPaths != nil {
for _, m := range manifest {
if neededPaths[m.Path] {
totalUncompressed += m.Size
}
}
cachedBytes = totalManifestBytes - totalUncompressed
} else {
totalUncompressed = totalManifestBytes
}
}

var uncompressedWritten atomic.Int64
var r io.ReadCloser
if useOptimized && len(neededPaths) > 0 {
r, err = tarx.MakeFilteredTar(dir, includePatterns, neededPaths)
r, err = tarx.MakeFilteredTar(dir, includePatterns, neededPaths, &uncompressedWritten)
} else if useOptimized {
r = tarx.MakeEmptyTar()
} else {
r, err = tarx.MakeTar(dir, includePatterns)
r, err = tarx.MakeTar(dir, includePatterns, &uncompressedWritten)
}
if err != nil {
uploadSpan.RecordError(err)
Expand Down Expand Up @@ -560,13 +582,16 @@ func Deploy(ctx *Context, opts struct {
var lastPrintTime time.Time

progressReader := upload.NewProgressReader(r, func(progress upload.Progress) {
enrichUploadProgress(&progress, &uncompressedWritten, totalUncompressed)
uploadBytes = progress.BytesRead
// Print progress every 500ms to avoid spamming
if time.Since(lastPrintTime) >= 500*time.Millisecond {
if progress.Fraction > 0 && time.Since(lastPrintTime) >= 500*time.Millisecond {
lastPrintTime = time.Now()
fmt.Fprintf(os.Stderr, "\r\033[K") // Clear to end of line
fmt.Fprintf(os.Stderr, "Uploading artifacts: %s at %s",
fmt.Fprintf(os.Stderr, "Uploading artifacts: %d%% — %s / ~%s at %s",
int(progress.Fraction*100),
upload.FormatBytes(progress.BytesRead),
upload.FormatBytes(progress.EstimatedTotalBytes),
upload.FormatSpeed(progress.BytesPerSecond))
}
})
Expand All @@ -578,10 +603,20 @@ func Deploy(ctx *Context, opts struct {
if uploadBytes > 0 {
uploadDuration := time.Since(uploadStartTime)
avgSpeed := float64(uploadBytes) / uploadDuration.Seconds()
fmt.Fprintf(os.Stderr, "\rUpload complete: %s in %.1fs at %s\n",
summary := fmt.Sprintf("\rUpload complete: %s in %.1fs at %s",
upload.FormatBytes(uploadBytes),
uploadDuration.Seconds(),
upload.FormatSpeed(avgSpeed))
if useOptimized && cachedFiles > 0 {
summary += fmt.Sprintf(", reused %d/%d files", cachedFiles, totalFiles)
if cachedBytes > 0 && avgSpeed > 0 {
savedSec := float64(cachedBytes) / avgSpeed
summary += fmt.Sprintf(" (saved %s, ~%s)", upload.FormatBytes(cachedBytes), upload.FormatDuration(time.Duration(savedSec*float64(time.Second))))
} else if cachedBytes > 0 {
summary += fmt.Sprintf(" (saved %s)", upload.FormatBytes(cachedBytes))
}
}
fmt.Fprintf(os.Stderr, "%s\n", summary)
uploadBytes = 0 // Only print once
}

Expand Down Expand Up @@ -648,6 +683,7 @@ func Deploy(ctx *Context, opts struct {
defer wg.Wait()

progressReader := upload.NewProgressReader(r, func(progress upload.Progress) {
enrichUploadProgress(&progress, &uncompressedWritten, totalUncompressed)
select {
case uploadProgressCh <- progress:
default:
Expand All @@ -659,7 +695,7 @@ func Deploy(ctx *Context, opts struct {
deployCtx, cancelDeploy := context.WithCancel(buildCtx)
defer cancelDeploy()

model := initialModel(updateCh, buildCh, uploadProgressCh, cachedFiles, totalFiles)
model := initialModel(updateCh, buildCh, uploadProgressCh, cachedFiles, totalFiles, cachedBytes)
p := tea.NewProgram(model)

var finalModel tea.Model
Expand All @@ -670,11 +706,9 @@ func Deploy(ctx *Context, opts struct {
defer wg.Done()
finalModel, runErr = p.Run()
if runErr == nil {
// Check if we exited due to interrupt or actual timeout
if dm, ok := finalModel.(*deployInfo); ok && dm.interrupted {
cancelDeploy() // Cancel the deployment context
cancelDeploy()
}
// Note: we don't cancel on timeout phase anymore as that's handled by the UI
} else {
// UI died; ensure we don't keep uploading/building
cancelDeploy()
Expand Down Expand Up @@ -736,13 +770,6 @@ func Deploy(ctx *Context, opts struct {
return context.Canceled
}

// Check if this was a buildkit startup timeout (handled by UI)
if isDeploy && dm.currentPhase == "timeout" {
// The UI already printed the timeout message
updateDeploymentOnError("Buildkit startup timeout")
return fmt.Errorf("buildkit startup timeout")
}

// Check if this was a server panic
var panicErr cond.ErrPanic
if errors.As(err, &panicErr) {
Expand Down Expand Up @@ -820,6 +847,18 @@ func Deploy(ctx *Context, opts struct {
return nil
}

// enrichUploadProgress fills in Fraction and EstimatedTotalBytes on a Progress
// snapshot using the atomic uncompressed-byte counter and the known total.
func enrichUploadProgress(p *upload.Progress, written *atomic.Int64, totalUncompressed int64) {
if totalUncompressed <= 0 {
return
}
p.Fraction = float64(written.Load()) / float64(totalUncompressed)
if p.Fraction > 0 {
p.EstimatedTotalBytes = int64(float64(p.BytesRead) / p.Fraction)
}
}

// Helper function to print build errors and logs
func printBuildErrors(ctx *Context, buildErrors []string, buildLogs []string) {
if len(buildErrors) > 0 {
Expand Down Expand Up @@ -1139,7 +1178,7 @@ func analyzeApp(ctx *Context, bc *build_v1alpha.BuilderClient, dir string) error
includePatterns = ac.Include
}

r, err := tarx.MakeTar(dir, includePatterns)
r, err := tarx.MakeTar(dir, includePatterns, nil)
if err != nil {
return fmt.Errorf("failed to create tar: %w", err)
}
Expand Down
106 changes: 49 additions & 57 deletions cli/commands/deploy_ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ type updateMsg struct {
msg string
}

type timeoutCheckMsg struct{}

// deployInfo is the TEA model for the deploy UI
type deployInfo struct {
spinner spinner.Model
Expand All @@ -113,21 +111,22 @@ type deployInfo struct {
uploadDuration time.Duration
finalUploadSpeed float64

// Upload progress estimation
uploadPct float64 // 0.0–1.0 fraction, -1 if unknown
uploadEstTotal int64 // estimated compressed total bytes

// Source cache info
cachedFiles int32
totalFiles int
cachedBytes int64

// Timeout and interrupt handling
lastActivity time.Time
buildkitTimeout time.Duration
buildkitStarted bool // Track if buildkit has shown any activity
interrupted bool
interrupted bool

showProgress bool
bp tea.Model
}

func initialModel(update chan string, buildCh chan buildProgress, uploadProgress chan upload.Progress, cachedFiles int32, totalFiles int) *deployInfo {
func initialModel(update chan string, buildCh chan buildProgress, uploadProgress chan upload.Progress, cachedFiles int32, totalFiles int, cachedBytes int64) *deployInfo {
s := spinner.New()
s.Spinner = Meter
s.Style = lipgloss.NewStyle()
Expand All @@ -142,36 +141,46 @@ func initialModel(update chan string, buildCh chan buildProgress, uploadProgress
uploadS.Style = lipgloss.NewStyle()

return &deployInfo{
spinner: s,
message: "Reading application data",
update: update,
buildCh: buildCh,
prog: p,
uploadProgress: uploadProgress,
uploadSpin: uploadS,
isUploading: true,
uploadSpeed: "calculating...",
phaseStart: time.Now(),
currentPhase: "upload",
cachedFiles: cachedFiles,
totalFiles: totalFiles,
lastActivity: time.Now(),
buildkitTimeout: 60 * time.Second, // 60 second timeout for buildkit to start
buildkitStarted: false,
bp: progressui.TeaModel(),
spinner: s,
message: "Reading application data",
update: update,
buildCh: buildCh,
prog: p,
uploadProgress: uploadProgress,
uploadSpin: uploadS,
isUploading: true,
uploadSpeed: "calculating...",
phaseStart: time.Now(),
currentPhase: "upload",
cachedFiles: cachedFiles,
totalFiles: totalFiles,
cachedBytes: cachedBytes,
bp: progressui.TeaModel(),
}
}

func (m *deployInfo) uploadDetails() string {
var parts []string
if m.cachedFiles > 0 {
parts = append(parts, fmt.Sprintf("reused %d/%d files", m.cachedFiles, m.totalFiles))
// All files cached — special case
if m.cachedFiles > 0 && m.cachedFiles == int32(m.totalFiles) {
return fmt.Sprintf("all %d files reused from previous deploy", m.totalFiles)
}

var parts []string
if m.uploadBytes > 0 {
parts = append(parts, fmt.Sprintf("%s at %s",
upload.FormatBytes(m.uploadBytes),
upload.FormatSpeed(m.finalUploadSpeed)))
}
if m.cachedFiles > 0 {
detail := fmt.Sprintf("reused %d/%d files", m.cachedFiles, m.totalFiles)
if m.cachedBytes > 0 && m.finalUploadSpeed > 0 {
savedSec := float64(m.cachedBytes) / m.finalUploadSpeed
detail += fmt.Sprintf(" (saved %s, ~%s)", upload.FormatBytes(m.cachedBytes), upload.FormatDuration(time.Duration(savedSec*float64(time.Second))))
} else if m.cachedBytes > 0 {
detail += fmt.Sprintf(" (saved %s)", upload.FormatBytes(m.cachedBytes))
}
parts = append(parts, detail)
}
return strings.Join(parts, ", ")
}

Expand All @@ -180,7 +189,6 @@ func (m *deployInfo) Init() tea.Cmd {
m.bp.Init(),
m.spinner.Tick,
m.uploadSpin.Tick,
m.checkTimeout(), // Start timeout monitoring
}

// Only wait for channels that are expected to have data
Expand All @@ -206,12 +214,6 @@ func (m *deployInfo) Init() tea.Cmd {
return tea.Batch(cmds...)
}

func (m *deployInfo) checkTimeout() tea.Cmd {
return tea.Tick(time.Second, func(t time.Time) tea.Msg {
return timeoutCheckMsg{}
})
}

func (m *deployInfo) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
var (
cmd tea.Cmd
Expand Down Expand Up @@ -239,21 +241,9 @@ func (m *deployInfo) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
case tea.KeyEnter:
m.showProgress = !m.showProgress
}
case timeoutCheckMsg:
// Only check for timeout if buildkit hasn't started yet
if m.currentPhase == "buildkit" && !m.buildkitStarted && time.Since(m.lastActivity) > m.buildkitTimeout {
m.currentPhase = "timeout"
return m, tea.Sequence(
tea.Println("\n\n❌ Buildkit failed to start after 60 seconds. This may indicate a server issue."),
tea.Quit,
)
}
// Continue checking
cmds = append(cmds, m.checkTimeout())
case updateMsg:
prevMessage := m.message
m.message = msg.msg
m.lastActivity = time.Now() // Reset activity timer

// Track phase transitions
if prevMessage != msg.msg {
Expand All @@ -272,7 +262,6 @@ func (m *deployInfo) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
// Start tracking buildkit phase
m.phaseStart = time.Now()
m.currentPhase = "buildkit"
m.buildkitStarted = true
}

}
Expand All @@ -281,13 +270,13 @@ func (m *deployInfo) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
return updateMsg{msg: <-m.update}
})
case upload.Progress:
m.lastActivity = time.Now() // Reset activity timer
if m.isUploading {
m.uploadSpeed = upload.FormatSpeed(msg.BytesPerSecond)
m.uploadBytes = msg.BytesRead
m.finalUploadSpeed = msg.BytesPerSecond
m.uploadPct = msg.Fraction
m.uploadEstTotal = msg.EstimatedTotalBytes

// Spinner tick is already handled in the Update method
// Continue reading upload progress
if m.uploadProgress != nil {
cmds = append(cmds, func() tea.Msg {
Expand All @@ -296,7 +285,6 @@ func (m *deployInfo) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
}
}
case buildProgress:
m.lastActivity = time.Now() // Reset activity timer

// Build progress means upload is complete (fallback if no "Launching builder" message)
if m.isUploading {
Expand Down Expand Up @@ -325,8 +313,6 @@ func (m *deployInfo) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
m.phaseStart = time.Now()
m.currentPhase = "buildkit"
}
m.buildkitStarted = true

m.buildSteps = msg.total

if msg.total > 0 {
Expand Down Expand Up @@ -364,10 +350,16 @@ func (m *deployInfo) View() string {
// Show current progress
var currentLine string
if m.isUploading {
bytesInfo := upload.FormatBytes(m.uploadBytes)
speedInfo := deployPrefixStyle.Render(fmt.Sprintf("%s uploaded at %s", bytesInfo, m.uploadSpeed))
currentLine = fmt.Sprintf(" %s %s...\n %s %s",
m.uploadSpin.View(), m.message, m.spinner.View(), speedInfo)
pct := m.uploadPct
if pct < 0 {
pct = 0
}
pctStr := deployPrefixStyle.Render(fmt.Sprintf("%d%% — %s at %s",
int(pct*100),
upload.FormatBytes(m.uploadBytes),
m.uploadSpeed))
currentLine = fmt.Sprintf(" %s Uploading artifacts...\n %s %s",
m.uploadSpin.View(), m.prog.ViewAs(pct), pctStr)
} else if m.currentPhase != "completed" {
if m.buildSteps > 0 {
steps := deployPrefixStyle.Render(fmt.Sprintf("Building %d steps:", m.buildSteps))
Expand Down
2 changes: 1 addition & 1 deletion pkg/buildkit/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestBuildKitLocal(t *testing.T) {

r := require.New(t)

dfr, err := tarx.MakeTar("testdata/df1", nil)
dfr, err := tarx.MakeTar("testdata/df1", nil, nil)
r.NoError(err)

datafs, err := tarx.TarFS(dfr, t.TempDir())
Expand Down
8 changes: 5 additions & 3 deletions pkg/progress/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ type ProgressReader struct {

// Progress represents a snapshot of the current upload stats.
type Progress struct {
BytesRead int64
BytesPerSecond float64
Duration time.Duration
BytesRead int64
BytesPerSecond float64
Duration time.Duration
Fraction float64 // 0.0–1.0 upload fraction complete
EstimatedTotalBytes int64 // projected total compressed bytes; 0 if unknown
}

// NewProgressReader creates a new progress tracking reader that wraps the given io.Reader.
Expand Down
Loading
Loading