-
Notifications
You must be signed in to change notification settings - Fork 0
fix: refactor import script, dynamically calculate num of workers, add progress tracking #27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…d progress tracking
WalkthroughAdds an adaptive, memory/CPU-aware worker configuration and parallel CSV import with adaptive batching, introduces a multi-bar ProgressTracker for per-file/collection progress and final summaries, changes collection-creation error handling, and adds terminal/progress dependencies. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor U as User
participant CLI as Import CLI
participant PT as ProgressTracker
participant WP as Worker Pool
participant Q as Qdrant Client
U->>CLI: Start import (CSV files)
CLI->>CLI: runtime.NumCPU() & GetAvailableMemoryMB()
CLI->>CLI: CalculateOptimalWorkers(fileCount, avgFileSizeMB)
CLI->>PT: NewProgressTracker(totalFiles)
CLI->>WP: Start N workers (adaptive)
loop enqueue files
CLI->>WP: Enqueue file
end
par workers processing
WP->>WP: Parse CSV, form batches
WP->>Q: Upsert batch (per-collection)
Q-->>WP: Ack / Error
WP->>PT: AddRecords(count, collection)
alt success
WP->>PT: FileCompleted(count, true)
else failure
WP->>PT: FileCompleted(0, false)
WP->>PT: MarkFileFailed()
end
end
CLI->>PT: Wait()
PT-->>CLI: Final summary
CLI-->>U: Exit
sequenceDiagram
autonumber
participant CLI as Import CLI
participant SYS as /proc/meminfo
participant RT as Go runtime
CLI->>RT: runtime.NumCPU()
CLI->>SYS: read /proc/meminfo (MemAvailable)
SYS-->>CLI: MemAvailable (MB)
CLI->>CLI: CalculateOptimalWorkers() → WorkerConfig{NumWorkers, RecommendedBatchSize}
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (2)
🧰 Additional context used🧬 Code graph analysis (1)cmd/import/main.go (2)
🔇 Additional comments (9)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
cmd/import/main.go (1)
185-205: Non-compilable worker loop.range cannot iterate an int. Use a classic for.
- log.Printf("Starting %d worker(s) to process CSV files...", optimalWorkers.NumWorkers) - for workerID := range optimalWorkers.NumWorkers { + log.Printf("Starting %d worker(s) to process CSV files...", optimalWorkers.NumWorkers) + for workerID := 0; workerID < optimalWorkers.NumWorkers; workerID++ { wg.Add(1) go func(workerID int) { defer wg.Done() for file := range filesChan { fileName := filepath.Base(file) // Progress bars will show file processing status
🧹 Nitpick comments (6)
internal/utils/progress_tracker.go (3)
14-14: Satisfy godot: end comments with periods.Tiny punctuation fixes to appease lint.
-// ProgressTracker tracks import progress across multiple workers +// ProgressTracker tracks import progress across multiple workers. @@ -// NewProgressTracker creates a new progress tracker with mpb progress bars +// NewProgressTracker creates a new progress tracker with mpb progress bars. @@ -// AddRecords increments the record count and updates progress bars +// AddRecords increments the record count and updates progress bars. @@ -// FileCompleted marks a file as completed and updates the file progress bar +// FileCompleted marks a file as completed and updates the file progress bar. @@ -// MarkFileFailed increments the failed file counter and can be called separately +// MarkFileFailed increments the failed file counter and can be called separately. @@ -// GetFailedFiles returns the current number of failed files +// GetFailedFiles returns the current number of failed files.Also applies to: 34-34, 82-82, 114-114, 150-150, 157-157
212-218: Prefer sort.Slice over manual bubble sort.Cleaner and faster; requires importing sort.
- // Simple bubble sort - for i := 0; i < len(stats); i++ { - for j := i + 1; j < len(stats); j++ { - if stats[j].count > stats[i].count { - stats[i], stats[j] = stats[j], stats[i] - } - } - } + // Sort by count desc. + sort.Slice(stats, func(i, j int) bool { return stats[i].count > stats[j].count })Add import:
import ( "fmt" "log" + "sort" "sync" "time"
230-239: Replace if/else chain with switch (gocritic).Matches linter hint and improves readability.
-func formatNumber(n int64) string { - if n < 1000 { - return fmt.Sprintf("%d", n) - } else if n < 1000000 { - return fmt.Sprintf("%.1fK", float64(n)/1000) - } else if n < 1000000000 { - return fmt.Sprintf("%.2fM", float64(n)/1000000) - } - return fmt.Sprintf("%.2fB", float64(n)/1000000000) -} +func formatNumber(n int64) string { + switch { + case n < 1_000: + return fmt.Sprintf("%d", n) + case n < 1_000_000: + return fmt.Sprintf("%.1fK", float64(n)/1_000) + case n < 1_000_000_000: + return fmt.Sprintf("%.2fM", float64(n)/1_000_000) + default: + return fmt.Sprintf("%.2fB", float64(n)/1_000_000_000) + } +}cmd/import/main.go (3)
674-674: Small polish: comment period + combine param types (gocritic).
- End comment with a period.
- Use combined param types for brevity.
-// GetAvailableMemoryMB returns available memory in MB +// GetAvailableMemoryMB returns available memory in MB. @@ -func CalculateOptimalWorkers(fileCount int, avgFileSizeMB int) WorkerConfig { +func CalculateOptimalWorkers(fileCount, avgFileSizeMB int) WorkerConfig {Also applies to: 702-702
695-697: Return error if MemAvailable not found to trigger fallback.Avoid returning (0, nil) which produces confusing logs and math artifacts.
- return 0, scanner.Err() + if err := scanner.Err(); err != nil { + return 0, err + } + return 0, fmt.Errorf("MemAvailable not found in /proc/meminfo")
645-657: Simplify initPurlMap: avoid redundant os.Open; use ReadFile(absPath).Removes an unused open/close and reduces I/O.
- absPath, err := filepath.Abs(filename) - if err != nil { - return nil, err - } - file, err := os.Open(absPath) - if err != nil { - return nil, err - } - defer func() { - if err := file.Close(); err != nil { - log.Printf("Warning: Error closing file %s: %v", filename, err) - } - }() - - data, err := os.ReadFile(filename) + absPath, err := filepath.Abs(filename) + if err != nil { + return nil, err + } + data, err := os.ReadFile(absPath) if err != nil { return nil, err }Also applies to: 659-662
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (3)
cmd/import/main.go(14 hunks)go.mod(3 hunks)internal/utils/progress_tracker.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
cmd/import/main.go (1)
internal/utils/progress_tracker.go (2)
NewProgressTracker(35-80)ProgressTracker(15-32)
🪛 GitHub Actions: Golang CI Lint
cmd/import/main.go
[error] 680-680: golangci-lint: Error return value of file.Close is not checked (errcheck).
🪛 GitHub Check: build
cmd/import/main.go
[failure] 674-674:
Comment should end in a period (godot)
[failure] 702-702:
paramTypeCombine: func(fileCount int, avgFileSizeMB int) WorkerConfig could be replaced with func(fileCount, avgFileSizeMB int) WorkerConfig (gocritic)
[failure] 680-680:
Error return value of file.Close is not checked (errcheck)
internal/utils/progress_tracker.go
[failure] 157-157:
Comment should end in a period (godot)
[failure] 150-150:
Comment should end in a period (godot)
[failure] 114-114:
Comment should end in a period (godot)
[failure] 82-82:
Comment should end in a period (godot)
[failure] 34-34:
Comment should end in a period (godot)
[failure] 14-14:
Comment should end in a period (godot)
[failure] 231-231:
ifElseChain: rewrite if-else to switch statement (gocritic)
🔇 Additional comments (2)
go.mod (1)
8-16: Deps for progress UI look correct.color v1.18.0 and mpb/v8 v8.10.2 align with the new progress tracker. Qdrant client v1.14.0 matches the intended features. LGTM.
Based on learnings
cmd/import/main.go (1)
534-566: No action needed: misc_collection is already included in supported collections and created at startup.Likely an incorrect or invalid review comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
cmd/import/main.go (1)
185-201: Range over int: compile errorUse a counted for-loop to spawn N workers.
- for workerID := range optimalWorkers.NumWorkers { + for workerID := 0; workerID < optimalWorkers.NumWorkers; workerID++ { wg.Add(1) go func(workerID int) { defer wg.Done() for file := range filesChan { recordCount, err := importCSVFileWithProgress(ctx, client, file, optimalWorkers.RecommendedBatchSize, progress)
♻️ Duplicate comments (3)
cmd/import/main.go (2)
670-681: LGTM: close error handledThe deferred file.Close error is now checked and logged. This addresses errcheck.
181-183: Fix units: pass average size in MB to CalculateOptimalWorkersCurrently using bytes; memory math will be skewed.
- avgFileSize := csvFilesSize / int64(len(csvFiles)) - optimalWorkers := CalculateOptimalWorkers(len(csvFiles), int(avgFileSize)) + avgFileSizeBytes := csvFilesSize / int64(len(csvFiles)) + avgFileSizeMB := int(avgFileSizeBytes / (1024 * 1024)) + if avgFileSizeMB == 0 { + avgFileSizeMB = 1 + } + optimalWorkers := CalculateOptimalWorkers(len(csvFiles), avgFileSizeMB)internal/utils/progress_tracker.go (1)
182-184: Finalize bars before waiting to avoid hangIf record/collection bars have unknown totals, Wait can block. Finalize totals before Wait.
func (pt *ProgressTracker) Wait() { - pt.progress.Wait() + // Finalize bars to avoid hanging on Wait. + pt.mu.Lock() + if pt.recordBar != nil { + pt.recordBar.SetTotal(pt.processedRecords, true) + } + for name, bar := range pt.collectionBars { + total := pt.collectionStats[name] + bar.SetTotal(total, true) + } + pt.mu.Unlock() + pt.progress.Wait() }
🧹 Nitpick comments (1)
internal/utils/progress_tracker.go (1)
229-235: Use sort.Slice instead of bubble sort (minor)Cleaner and faster; avoids O(n^2) bubble sort.
- // Simple bubble sort - for i := 0; i < len(stats); i++ { - for j := i + 1; j < len(stats); j++ { - if stats[j].count > stats[i].count { - stats[i], stats[j] = stats[j], stats[i] - } - } - } + sort.Slice(stats, func(i, j int) bool { return stats[i].count > stats[j].count })Add import:
-import ( +import ( "fmt" "log" + "sort" "sync" "time"
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
cmd/import/main.go(14 hunks)internal/utils/progress_tracker.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
cmd/import/main.go (2)
internal/utils/progress_tracker.go (2)
NewProgressTracker(52-97)ProgressTracker(32-49)internal/repository/scan_repository_qdrant_impl.go (1)
VectorDim(35-35)
🔇 Additional comments (1)
internal/utils/progress_tracker.go (1)
17-19: No package inconsistencies detected: all files in internal/utils declare packageprogresstracker.
Summary by CodeRabbit
New Features
Refactor