From 6a42f05d77fbf30c201ba4a323339730bf7ea23c Mon Sep 17 00:00:00 2001 From: Matias Daloia Date: Wed, 15 Oct 2025 00:16:24 +0200 Subject: [PATCH 1/4] fix: refactor import script, dynamically calculate num of workers, add progress tracking --- cmd/import/main.go | 454 ++++++++++++++--------------- go.mod | 8 + go.sum | 20 ++ internal/utils/progress_tracker.go | 256 ++++++++++++++++ 4 files changed, 505 insertions(+), 233 deletions(-) create mode 100644 internal/utils/progress_tracker.go diff --git a/cmd/import/main.go b/cmd/import/main.go index 5b05ccb..b6ee2d5 100644 --- a/cmd/import/main.go +++ b/cmd/import/main.go @@ -18,6 +18,7 @@ package main import ( + "bufio" "context" "encoding/csv" "encoding/json" @@ -26,8 +27,10 @@ import ( "hash/fnv" "io" "log" + "math" "os" "path/filepath" + "runtime" "strconv" "strings" "sync" @@ -36,30 +39,27 @@ import ( "github.com/qdrant/go-client/qdrant" "github.com/scanoss/folder-hashing-api/internal/domain/entities" + "github.com/scanoss/folder-hashing-api/internal/utils" ) +type WorkerConfig struct { + NumWorkers int + RecommendedBatchSize int +} + const ( // QdrantHost is the default Qdrant server hostname. QdrantHost = "localhost" // QdrantPort is the default Qdrant server port. QdrantPort = 6334 - // BatchSize is the number of records to process in each batch. - BatchSize = 2000 // Large batches are safe when indexing is disabled - // MaxWorkers is the number of parallel workers for file processing. - MaxWorkers = 4 // Reduced workers to prevent overwhelming Qdrant // VectorDim is the dimensionality of the hash vectors. VectorDim = 64 // Single 64-bit hash per collection - // BatchInsertDelay is the delay between batch insertions to rate limit requests. - BatchInsertDelay = 100 * time.Millisecond + // DefaultWorkers is the default number of workers to use. + DefaultWorkers = 2 ) var rankMap map[string]int -var ( - createdCollections = make(map[string]bool) - collectionCreateLock sync.Mutex -) - //nolint:gocyclo // Main function complexity is acceptable for CLI setup func main() { csvDir := flag.String("dir", "", "Directory containing CSV files (required)") @@ -78,8 +78,6 @@ func main() { log.Fatal("Error: You must specify a file with the -top-purls option") } - log.Println("FAST IMPORT MODE: Importing data without HNSW indexing, will enable indexing after import completes.") - // Verify that the directory exists if _, err := os.Stat(*csvDir); os.IsNotExist(err) { log.Fatalf("Error: Directory %s does not exist", *csvDir) @@ -105,12 +103,6 @@ func main() { if err != nil { log.Fatalf("Error connecting to Qdrant: %v", err) } - - // Verify connection health before starting - if err := verifyQdrantHealth(ctx, client); err != nil { - log.Fatalf("Qdrant health check failed: %v", err) - } - defer func() { log.Println("Closing connection to Qdrant") if err := client.Close(); err != nil { @@ -121,24 +113,33 @@ func main() { collections := entities.GetAllSupportedCollections() - log.Printf("Using lazy collection creation for %d language-based collections: %v", len(collections), collections) + log.Printf("Will create/update %d collections: %v", len(collections), collections) - // Only handle overwrite flag - collections will be created lazily - if *overwrite { - for _, collectionName := range collections { - collectionExists, err := client.CollectionExists(ctx, collectionName) + // Check and create collections + for _, collectionName := range collections { + log.Printf("Checking collection: %s", collectionName) + collectionExists, err := client.CollectionExists(ctx, collectionName) + if err != nil { + log.Printf("Error checking if collection %s exists: %v", collectionName, err) + return + } + + if *overwrite && collectionExists { + log.Printf("Collection %s exists and overwrite flag is set. Dropping collection...", collectionName) + err = client.DeleteCollection(ctx, collectionName) if err != nil { - cleanupAndExit(client, "Error checking if collection %s exists: %v", collectionName, err) + //nolint:gocritic // Error is fatal, defer will not help here + log.Fatalf("Error dropping collection %s: %v", collectionName, err) } + log.Printf("Collection '%s' dropped successfully", collectionName) + collectionExists = false + } - if collectionExists { - log.Printf("Collection %s exists and overwrite flag is set. Dropping collection...", collectionName) - err = client.DeleteCollection(ctx, collectionName) - if err != nil { - cleanupAndExit(client, "Error dropping collection %s: %v", collectionName, err) - } - log.Printf("Collection '%s' dropped successfully", collectionName) - } + // Create collection if it doesn't exist + if !collectionExists { + createCollection(ctx, client, collectionName) + } else { + log.Printf("Using existing collection: %s", collectionName) } } @@ -146,13 +147,21 @@ func main() { log.Printf("Reading directory '%s' for CSV files...", *csvDir) files, err := os.ReadDir(*csvDir) if err != nil { - cleanupAndExit(client, "Error reading directory: %v", err) + log.Fatalf("Error reading directory: %v", err) } var csvFiles []string + var csvFilesSize int64 for _, file := range files { if !file.IsDir() && strings.HasSuffix(file.Name(), ".csv") { csvFiles = append(csvFiles, filepath.Join(*csvDir, file.Name())) + + fileInfo, err := os.Stat(filepath.Join(*csvDir, file.Name())) + if err != nil { + log.Printf("Error getting file size for %s: %v", file.Name(), err) + continue + } + csvFilesSize += fileInfo.Size() } } @@ -162,30 +171,33 @@ func main() { return } + progress := utils.NewProgressTracker(len(csvFiles)) + // Channel to process files filesChan := make(chan string, len(csvFiles)) var wg sync.WaitGroup - - // Error channel to collect errors from workers errorsChan := make(chan error, len(csvFiles)) + avgFileSize := csvFilesSize / int64(len(csvFiles)) + optimalWorkers := CalculateOptimalWorkers(len(csvFiles), int(avgFileSize)) + // Start workers to process files in parallel - log.Printf("Starting %d worker(s) to process CSV files...", MaxWorkers) - for workerID := range MaxWorkers { + log.Printf("Starting %d worker(s) to process CSV files...", optimalWorkers.NumWorkers) + for workerID := range optimalWorkers.NumWorkers { wg.Add(1) go func(workerID int) { defer wg.Done() for file := range filesChan { - sectorName := filepath.Base(file) - sectorName = strings.TrimSuffix(sectorName, ".csv") - log.Printf("Worker %d: Processing sector %s", workerID, sectorName) + fileName := filepath.Base(file) + // Progress bars will show file processing status - err := importCSVFile(ctx, client, file, sectorName) + recordCount, err := importCSVFileWithProgress(ctx, client, file, fileName, optimalWorkers.RecommendedBatchSize, progress) if err != nil { log.Printf("Worker %d: Error importing file %s: %v", workerID, file, err) errorsChan <- fmt.Errorf("error importing file %s: %w", file, err) + progress.FileCompleted(0, false) } else { - log.Printf("Worker %d: Successfully processed sector %s", workerID, sectorName) + progress.FileCompleted(recordCount, true) } } }(workerID) @@ -202,6 +214,11 @@ func main() { wg.Wait() close(errorsChan) + // Wait for all progress bars to complete rendering + progress.Wait() + + progress.PrintFinalSummary() + // Check if there were any errors during processing errCount := 0 for err := range errorsChan { @@ -216,111 +233,24 @@ func main() { elapsed := time.Since(startTime) log.Printf("Import process completed. Total time: %s", elapsed) - // Enable HNSW indexing on all collections after import - log.Println("\n========================================") - log.Println("Enabling HNSW indexing on all collections...") - log.Println("========================================") - for _, collectionName := range collections { - if err := updateCollectionIndexing(ctx, client, collectionName); err != nil { - log.Printf("ERROR: Failed to enable indexing for %s: %v", collectionName, err) - } - } - log.Println("Indexing enabled for all collections. Qdrant will build indexes in the background.") - - // Show collection statistics - log.Println("\n========================================") - log.Println("Collection Statistics") - log.Println("========================================") + // Show collection statistics if possible for _, collectionName := range collections { showCollectionStats(ctx, client, collectionName) } } -// verifyQdrantHealth checks if Qdrant is healthy and responsive. -func verifyQdrantHealth(ctx context.Context, client *qdrant.Client) error { - log.Println("Performing Qdrant health check...") - - // Try to list collections as a basic health check - collections, err := client.ListCollections(ctx) - if err != nil { - return fmt.Errorf("failed to list collections: %w", err) - } - - log.Printf("Qdrant is healthy. Found %d existing collections", len(collections)) - return nil -} - -// Gracefully terminate qdrant client. -func cleanupAndExit(client *qdrant.Client, format string, args ...any) { - log.Printf(format, args...) - if client != nil { - if err := client.Close(); err != nil { - log.Printf("Failed to close client: %v", err) - } - } - os.Exit(1) -} - -// ensureCollectionExists checks if a collection exists and creates it lazily if not. -func ensureCollectionExists(ctx context.Context, client *qdrant.Client, collectionName string) error { - // Check if we've already created this collection - collectionCreateLock.Lock() - if createdCollections[collectionName] { - collectionCreateLock.Unlock() - return nil - } - collectionCreateLock.Unlock() - - // Check if collection exists in Qdrant - exists, err := client.CollectionExists(ctx, collectionName) - if err != nil { - return fmt.Errorf("error checking collection existence: %w", err) - } - - if exists { - // Mark as created to avoid checking again - collectionCreateLock.Lock() - createdCollections[collectionName] = true - collectionCreateLock.Unlock() - log.Printf("Collection %s already exists, using it", collectionName) - return nil - } - - // Collection doesn't exist, create it (thread-safe) - collectionCreateLock.Lock() - defer collectionCreateLock.Unlock() - - // Double-check pattern: another goroutine might have created it - if createdCollections[collectionName] { - return nil - } - - log.Printf("Creating collection: %s", collectionName) - err = createCollection(ctx, client, collectionName) - if err != nil { - return fmt.Errorf("error creating collection %s: %w", collectionName, err) - } - createdCollections[collectionName] = true - log.Printf("Collection %s created and marked as ready", collectionName) - - return nil -} - // Create a language-based collection with named vectors (dirs, names, contents). -// Always creates collections with HNSW indexing disabled for fast import. -// Production-optimized with reduced shard/segment counts to prevent memory spikes. -func createCollection(ctx context.Context, client *qdrant.Client, collectionName string) error { +func createCollection(ctx context.Context, client *qdrant.Client, collectionName string) { log.Printf("Creating language-based collection with named vectors: %s", collectionName) - log.Printf("Collection %s: HNSW indexing DISABLED for fast import (m=0)", collectionName) - // Create named vectors configuration with indexing disabled + // Create named vectors configuration for dirs, names, and contents namedVectors := map[string]*qdrant.VectorParams{ "dirs": { Size: VectorDim, Distance: qdrant.Distance_Manhattan, HnswConfig: &qdrant.HnswConfigDiff{ - M: qdrant.PtrOf(uint64(0)), // m=0 disables HNSW index building - EfConstruct: qdrant.PtrOf(uint64(100)), + M: qdrant.PtrOf(uint64(48)), + EfConstruct: qdrant.PtrOf(uint64(500)), FullScanThreshold: qdrant.PtrOf(uint64(100000)), }, }, @@ -328,8 +258,8 @@ func createCollection(ctx context.Context, client *qdrant.Client, collectionName Size: VectorDim, Distance: qdrant.Distance_Manhattan, HnswConfig: &qdrant.HnswConfigDiff{ - M: qdrant.PtrOf(uint64(0)), - EfConstruct: qdrant.PtrOf(uint64(100)), + M: qdrant.PtrOf(uint64(48)), + EfConstruct: qdrant.PtrOf(uint64(500)), FullScanThreshold: qdrant.PtrOf(uint64(100000)), }, }, @@ -337,8 +267,8 @@ func createCollection(ctx context.Context, client *qdrant.Client, collectionName Size: VectorDim, Distance: qdrant.Distance_Manhattan, HnswConfig: &qdrant.HnswConfigDiff{ - M: qdrant.PtrOf(uint64(0)), - EfConstruct: qdrant.PtrOf(uint64(100)), + M: qdrant.PtrOf(uint64(48)), + EfConstruct: qdrant.PtrOf(uint64(500)), FullScanThreshold: qdrant.PtrOf(uint64(100000)), }, }, @@ -348,12 +278,10 @@ func createCollection(ctx context.Context, client *qdrant.Client, collectionName err := client.CreateCollection(ctx, &qdrant.CreateCollection{ CollectionName: collectionName, VectorsConfig: qdrant.NewVectorsConfigMap(namedVectors), - ShardNumber: qdrant.PtrOf(uint32(2)), - // Optimize for fast import with indexing disabled + // Aggressive optimization for large collections OptimizersConfig: &qdrant.OptimizersConfigDiff{ - DefaultSegmentNumber: qdrant.PtrOf(uint64(4)), + DefaultSegmentNumber: qdrant.PtrOf(uint64(32)), // Many segments for parallelism MaxSegmentSize: qdrant.PtrOf(uint64(500000)), // Large segments for efficiency - IndexingThreshold: qdrant.PtrOf(uint64(0)), // Disable indexing during import }, // Binary quantization for memory efficiency QuantizationConfig: &qdrant.QuantizationConfig{ @@ -365,7 +293,7 @@ func createCollection(ctx context.Context, client *qdrant.Client, collectionName }, }) if err != nil { - return fmt.Errorf("failed to create collection: %w", err) + log.Fatalf("Error creating collection %s: %v", collectionName, err) } log.Printf("Collection '%s' with named vectors created successfully", collectionName) @@ -398,16 +326,13 @@ func createCollection(ctx context.Context, client *qdrant.Client, collectionName } else { log.Printf("Created index for field: rank in %s", collectionName) } - - return nil } // Import data from a CSV file to separate collections. -func importCSVFile(ctx context.Context, client *qdrant.Client, filePath, sectorName string) error { - log.Printf("Opening CSV file: %s", filePath) +func importCSVFileWithProgress(ctx context.Context, client *qdrant.Client, filePath, fileName string, batchSize int, progress *utils.ProgressTracker) (int, error) { file, err := os.Open(filePath) if err != nil { - return fmt.Errorf("error opening CSV file: %w", err) + return 0, fmt.Errorf("error opening CSV file: %w", err) } defer func() { if err := file.Close(); err != nil { @@ -415,67 +340,62 @@ func importCSVFile(ctx context.Context, client *qdrant.Client, filePath, sectorN } }() - validRecords := make([][]string, 0, 10000) // Pre-allocate for performance - var lineNumber int - var errorCount int - reader := csv.NewReader(file) reader.FieldsPerRecord = 0 reader.LazyQuotes = true reader.TrimLeadingSpace = true + batch := make([][]string, 0, batchSize) + totalRecords := 0 + batchNum := 0 + for { - lineNumber++ record, err := reader.Read() - if err != nil { - if err == io.EOF { - break + if err == io.EOF { + if len(batch) > 0 { + collectionCounts, err := insertBatchToCollections(ctx, client, batch, progress) + if err != nil { + log.Printf("WARNING: Error inserting final batch: %v", err) + } else { + // Update progress for each collection + for collection, count := range collectionCounts { + progress.AddRecords(count, collection) + } + totalRecords += len(batch) + } } - log.Printf("WARNING: Error reading line %d in file %s: %v", lineNumber, filePath, err) - continue + break } - validRecords = append(validRecords, record) - } - totalRecords := len(validRecords) - if totalRecords == 0 { - log.Printf("No valid records found in %s, skipping", filePath) - return nil - } + if err != nil { + log.Printf("WARNING: Error reading line in %s: %v", filePath, err) + continue + } - if errorCount > 0 { - log.Printf("Processed %s: %d valid records, %d parsing errors", filePath, totalRecords, errorCount) - } + batch = append(batch, record) - log.Printf("Importing %d valid records from sector %s to separate collections", totalRecords, sectorName) + if len(batch) >= batchSize { + batchNum++ + // Progress bars will show batch processing status - // Process in batches for better performance - batchesProcessed := 0 - for i := 0; i < totalRecords; i += BatchSize { - end := i + BatchSize - if end > totalRecords { - end = totalRecords - } - batch := validRecords[i:end] - batchNum := i/BatchSize + 1 - log.Printf("Processing batch %d/%d (%d records) for sector %s", - batchNum, (totalRecords+BatchSize-1)/BatchSize, len(batch), sectorName) + collectionCounts, err := insertBatchToCollections(ctx, client, batch, progress) + if err != nil { + log.Printf("WARNING: Error inserting batch %d: %v", batchNum, err) + } else { + // Update progress for each collection + for collection, count := range collectionCounts { + progress.AddRecords(count, collection) + } + totalRecords += len(batch) + } - // Insert batch to collections - err := insertBatchToSeparateCollections(ctx, client, batch) - if err != nil { - log.Printf("WARNING: Error inserting batch %d: %v. Continuing with next batch.", batchNum, err) - continue + batch = make([][]string, 0, batchSize) } - batchesProcessed++ - - // Rate limit to avoid overwhelming Qdrant - time.Sleep(BatchInsertDelay) } - log.Printf("All %d batches for sector %s imported successfully", batchesProcessed, sectorName) - return nil + // File completed successfully - progress bar will reflect this + return totalRecords, nil } // hexSimhashToVector converts hex hash string to vector. @@ -511,7 +431,7 @@ func hexSimhashToVector(hexHashString string, bits int) ([]float32, error) { // Insert a batch of records to language-based collections. // //nolint:gocyclo,nestif // Batch processing complexity is inherent to CSV import -func insertBatchToSeparateCollections(ctx context.Context, client *qdrant.Client, batch [][]string) error { +func insertBatchToCollections(ctx context.Context, client *qdrant.Client, batch [][]string, progress *utils.ProgressTracker) (map[string]int, error) { // Group points by collection (language) collectionPoints := make(map[string][]*qdrant.PointStruct) @@ -618,7 +538,7 @@ func insertBatchToSeparateCollections(ctx context.Context, client *qdrant.Client langExtStr := strings.TrimSpace(record[17]) if err := json.Unmarshal([]byte(langExtStr), &langExtensions); err != nil { - log.Printf("WARNING: Failed to parse language_extensions JSON '%s': %v. Using misc_collection.", langExtStr, err) + // Failed to parse JSON - use misc_collection payload["language_extensions"] = qdrant.NewValueString(langExtStr) } else { // Convert to Qdrant struct format for proper indexing @@ -662,55 +582,48 @@ func insertBatchToSeparateCollections(ctx context.Context, client *qdrant.Client } if len(collectionPoints) == 0 { - return nil + return nil, nil } - // Insert to language-based collections sequentially to avoid connection storms - // Collections are created lazily on first use to prevent memory spikes + // Insert to collections in parallel + var wg sync.WaitGroup + var mu sync.Mutex + errChan := make(chan error, len(collectionPoints)) + + // Track counts per collection + collectionCounts := make(map[string]int) + for collectionName, points := range collectionPoints { if len(points) > 0 { - // Ensure collection exists before inserting (lazy creation) - if err := ensureCollectionExists(ctx, client, collectionName); err != nil { - return fmt.Errorf("error ensuring collection %s exists: %w", collectionName, err) - } - - _, err := client.Upsert(ctx, &qdrant.UpsertPoints{ - CollectionName: collectionName, - Points: points, - }) - if err != nil { - return fmt.Errorf("error inserting to %s collection: %w", collectionName, err) - } - log.Printf("Successfully inserted %d points to %s", len(points), collectionName) + wg.Add(1) + go func(colName string, pts []*qdrant.PointStruct) { + defer wg.Done() + _, err := client.Upsert(ctx, &qdrant.UpsertPoints{ + CollectionName: colName, + Points: pts, + }) + if err != nil { + errChan <- fmt.Errorf("error inserting to %s collection: %w", colName, err) + } else { + mu.Lock() + collectionCounts[colName] = len(pts) + mu.Unlock() + } + }(collectionName, points) } } - return nil -} - -// updateCollectionIndexing enables HNSW indexing on an existing collection. -// This is called automatically after import completes to build indexes in the background. -func updateCollectionIndexing(ctx context.Context, client *qdrant.Client, collectionName string) error { - log.Printf("Updating collection %s to enable HNSW indexing...", collectionName) + wg.Wait() + close(errChan) - // Update indexing threshold - err := client.UpdateCollection(ctx, &qdrant.UpdateCollection{ - CollectionName: collectionName, - HnswConfig: &qdrant.HnswConfigDiff{ - M: qdrant.PtrOf(uint64(48)), - EfConstruct: qdrant.PtrOf(uint64(500)), - FullScanThreshold: qdrant.PtrOf(uint64(100000)), - }, - OptimizersConfig: &qdrant.OptimizersConfigDiff{ - IndexingThreshold: qdrant.PtrOf(uint64(100000)), - }, - }) - if err != nil { - return fmt.Errorf("error updating indexing threshold for %s: %w", collectionName, err) + // Check for errors + for err := range errChan { + if err != nil { + return collectionCounts, err + } } - log.Printf("Successfully enabled indexing for collection: %s. Qdrant will build indexes in the background.", collectionName) - return nil + return collectionCounts, nil } // Function to show collection statistics. @@ -757,3 +670,78 @@ func initPurlMap(filename string) (map[string]int, error) { return result, nil } + +// GetAvailableMemoryMB returns available memory in MB +func GetAvailableMemoryMB() (int, error) { + file, err := os.Open("/proc/meminfo") + if err != nil { + return 0, err + } + defer file.Close() + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "MemAvailable:") { + fields := strings.Fields(line) + if len(fields) >= 2 { + kb, err := strconv.Atoi(fields[1]) + if err != nil { + return 0, err + } + return kb / 1024, nil // Convert KB to MB + } + } + } + return 0, scanner.Err() +} + +// CalculateOptimalWorkers calculates the optimal number of workers for processing CSV files, +// based on the number of files and their average size. +// It takes into account available memory and CPU cores to determine the optimal number of workers. +func CalculateOptimalWorkers(fileCount int, avgFileSizeMB int) WorkerConfig { + numCPU := runtime.NumCPU() + + log.Printf("Number of CPU cores: %d", numCPU) + + // Get available memory + availableMemMB, err := GetAvailableMemoryMB() + if err != nil { + log.Printf("Error getting memory info: %v, falling back to conservative estimate", err) + availableMemMB = 1024 // 1GB fallback + } + + log.Printf("Available memory: %d MB", availableMemMB) + + // Estimate memory per worker (rough calculation) + // Each worker might hold BatchSize * record size in memory + estimatedMemPerWorkerMB := avgFileSizeMB + 100 + // Calculate max workers based on memory + maxWorkersByMem := availableMemMB / estimatedMemPerWorkerMB / 2 // Use only 50% of available + + // Calculate based on CPU (I/O bound: 2x cores) + maxWorkersByCPU := numCPU * 2 + + // Take the minimum of constraints + optimalWorkers := int(math.Min( + float64(maxWorkersByMem), + float64(maxWorkersByCPU), + )) + + // Cap at file count and set bounds between 2 and 32 + optimalWorkers = max(2, min(optimalWorkers, fileCount, 32)) + + log.Printf("Optimal numbers of workers: %d", optimalWorkers) + + // Adjust batch size based on workers + recommendedBatchSize := 2000 + if optimalWorkers > 16 { + // More workers = smaller batches for better distribution + recommendedBatchSize = 1000 + } + + return WorkerConfig{ + NumWorkers: optimalWorkers, + RecommendedBatchSize: recommendedBatchSize, + } +} diff --git a/go.mod b/go.mod index 231ee5e..c00db63 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.23.0 toolchain go1.23.2 require ( + github.com/fatih/color v1.18.0 github.com/go-playground/validator/v10 v10.26.0 github.com/golobby/config/v3 v3.4.2 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 @@ -12,6 +13,7 @@ require ( github.com/scanoss/go-grpc-helper v0.8.0 github.com/scanoss/papi v0.22.0 github.com/scanoss/zap-logging-helper v0.3.2 + github.com/vbauerster/mpb/v8 v8.10.2 go.opentelemetry.io/otel v1.35.0 go.opentelemetry.io/otel/metric v1.35.0 google.golang.org/grpc v1.73.0 @@ -21,6 +23,8 @@ require ( require ( github.com/BurntSushi/toml v1.2.1 // indirect + github.com/VividCortex/ewma v1.2.0 // indirect + github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/gabriel-vasile/mimetype v1.4.8 // indirect github.com/go-logr/logr v1.4.2 // indirect @@ -34,7 +38,11 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect github.com/leodido/go-urn v1.4.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-runewidth v0.0.16 // indirect github.com/phuslu/iploc v1.0.20230201 // indirect + github.com/rivo/uniseg v0.4.7 // indirect github.com/scanoss/ipfilter/v2 v2.0.2 // indirect github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect diff --git a/go.sum b/go.sum index c067ce0..4e8be13 100644 --- a/go.sum +++ b/go.sum @@ -390,6 +390,10 @@ github.com/BurntSushi/toml v1.2.1 h1:9F2/+DoOYIOksmaJFPw1tGFy1eDnIJXg+UHjuD8lTak github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= +github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= +github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8= +github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= @@ -433,6 +437,8 @@ github.com/envoyproxy/go-control-plane v0.10.3/go.mod h1:fJJn/j26vwOu972OllsvAgJ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J6romD608Ba7Hij42vrOBCo= github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w= +github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= +github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM= github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -589,6 +595,13 @@ github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/lyft/protoc-gen-star v0.6.0/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA= github.com/lyft/protoc-gen-star v0.6.1/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= +github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/phuslu/iploc v1.0.20230201 h1:AMhy7j8z0N5iI0jaqh514KTDEB7wVdQJ4Y4DJPCvKBU= github.com/phuslu/iploc v1.0.20230201/go.mod h1:gsgExGWldwv1AEzZm+Ki9/vGfyjkL33pbSr9HGpt2Xg= @@ -603,6 +616,9 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1: github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/qdrant/go-client v1.14.0 h1:cyz9OOooAexudw5w69LRe9vKCQFYJvaFvt9icOciI1U= github.com/qdrant/go-client v1.14.0/go.mod h1:iO8ts78jL4x6LDHFOViyYWELVtIBDTjOykBmiOTHLnQ= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= @@ -638,6 +654,8 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce h1:fb190+cK2Xz/dvi9Hv8eCYJYvIGUTN2/KLq1pT6CjEc= github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce/go.mod h1:o8v6yHRoik09Xen7gje4m9ERNah1d1PPsVq1VEx9vE4= +github.com/vbauerster/mpb/v8 v8.10.2 h1:2uBykSHAYHekE11YvJhKxYmLATKHAGorZwFlyNw4hHM= +github.com/vbauerster/mpb/v8 v8.10.2/go.mod h1:+Ja4P92E3/CorSZgfDtK46D7AVbDqmBQRTmyTqPElo0= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -896,8 +914,10 @@ golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/internal/utils/progress_tracker.go b/internal/utils/progress_tracker.go new file mode 100644 index 0000000..2fec670 --- /dev/null +++ b/internal/utils/progress_tracker.go @@ -0,0 +1,256 @@ +package utils + +import ( + "fmt" + "log" + "sync" + "time" + + "github.com/fatih/color" + "github.com/vbauerster/mpb/v8" + "github.com/vbauerster/mpb/v8/decor" +) + +// ProgressTracker tracks import progress across multiple workers +type ProgressTracker struct { + mu sync.Mutex + startTime time.Time + totalFiles int + processedFiles int + totalRecords int64 + processedRecords int64 + failedFiles int + collectionStats map[string]int64 // records per collection + estimatedTotal int64 // estimated total records + hasEstimate bool // whether we have enough data for estimation + + // mpb progress bars + progress *mpb.Progress + fileBar *mpb.Bar + recordBar *mpb.Bar + collectionBars map[string]*mpb.Bar +} + +// NewProgressTracker creates a new progress tracker with mpb progress bars +func NewProgressTracker(totalFiles int) *ProgressTracker { + p := mpb.New( + mpb.WithOutput(color.Output), + mpb.WithAutoRefresh(), + ) + + // Create file progress bar (known total - shows visual bar with default style) + fileBar := p.AddBar( + int64(totalFiles), + mpb.PrependDecorators( + decor.Name("Files: ", decor.WC{C: decor.DindentRight | decor.DextraSpace}), + decor.CountersNoUnit("%d / %d", decor.WCSyncWidth), + ), + mpb.AppendDecorators( + decor.OnComplete(decor.Percentage(decor.WC{W: 5}), "done"), + decor.OnComplete( + // ETA decorator with ewma age of 30 + decor.EwmaETA(decor.ET_STYLE_GO, 30, decor.WCSyncWidth), "done", + ), + ), + ) + + // Create records progress bar (starts with unknown total, will show bar once estimated) + recordBar := p.AddBar(0, + mpb.PrependDecorators( + decor.Name("Records: ", decor.WC{C: decor.DindentRight | decor.DextraSpace}), + decor.CurrentNoUnit("%d", decor.WCSyncWidth), + decor.OnComplete( + decor.Spinner(nil, decor.WCSyncSpace), "done", + ), + ), + mpb.AppendDecorators( + decor.AverageSpeed(0, "%.0f/s", decor.WCSyncWidth), + ), + ) + + return &ProgressTracker{ + startTime: time.Now(), + totalFiles: totalFiles, + collectionStats: make(map[string]int64), + progress: p, + fileBar: fileBar, + recordBar: recordBar, + collectionBars: make(map[string]*mpb.Bar), + } +} + +// AddRecords increments the record count and updates progress bars +func (pt *ProgressTracker) AddRecords(count int, collectionName string) { + pt.mu.Lock() + defer pt.mu.Unlock() + + pt.processedRecords += int64(count) + pt.collectionStats[collectionName] += int64(count) + + // Update record bar + pt.recordBar.IncrBy(count) + + // Create or update collection-specific bar + if _, exists := pt.collectionBars[collectionName]; !exists { + // Create a new bar for this collection (unknown total - just shows count) + collectionBar := pt.progress.AddBar(0, + mpb.BarFillerClearOnComplete(), + mpb.PrependDecorators( + decor.Name(" "+collectionName+": ", decor.WC{C: decor.DindentRight | decor.DextraSpace}), + decor.CurrentNoUnit("%d", decor.WCSyncWidth), + decor.OnComplete( + decor.Spinner(nil, decor.WCSyncSpace), "done", + ), + ), + ) + pt.collectionBars[collectionName] = collectionBar + } + + // Update the collection bar + bar := pt.collectionBars[collectionName] + bar.IncrBy(count) +} + +// FileCompleted marks a file as completed and updates the file progress bar +func (pt *ProgressTracker) FileCompleted(recordCount int, success bool) { + pt.mu.Lock() + defer pt.mu.Unlock() + + pt.processedFiles++ + pt.totalRecords += int64(recordCount) + if !success { + pt.failedFiles++ + } + + // Calculate estimated total after processing at least 10 files (or 5% of total) + minFilesForEstimate := 10 + if pt.totalFiles < 200 { + minFilesForEstimate = max(5, pt.totalFiles/20) // At least 5% of files + } + + if !pt.hasEstimate && pt.processedFiles >= minFilesForEstimate { + // Calculate average records per file + avgRecordsPerFile := float64(pt.processedRecords) / float64(pt.processedFiles) + pt.estimatedTotal = int64(avgRecordsPerFile * float64(pt.totalFiles)) + pt.hasEstimate = true + + // Update record bar with estimated total + pt.recordBar.SetTotal(pt.estimatedTotal, false) + } else if pt.hasEstimate { + // Continuously update estimate as we get more data + avgRecordsPerFile := float64(pt.processedRecords) / float64(pt.processedFiles) + pt.estimatedTotal = int64(avgRecordsPerFile * float64(pt.totalFiles)) + pt.recordBar.SetTotal(pt.estimatedTotal, false) + } + + // Update file progress bar + pt.fileBar.Increment() +} + +// MarkFileFailed increments the failed file counter and can be called separately +func (pt *ProgressTracker) MarkFileFailed() { + pt.mu.Lock() + defer pt.mu.Unlock() + pt.failedFiles++ +} + +// GetFailedFiles returns the current number of failed files +func (pt *ProgressTracker) GetFailedFiles() int { + pt.mu.Lock() + defer pt.mu.Unlock() + return pt.failedFiles +} + +// Wait waits for all progress bars to complete +func (pt *ProgressTracker) Wait() { + pt.progress.Wait() +} + +// PrintFinalSummary prints the final import summary after all bars are complete +func (pt *ProgressTracker) PrintFinalSummary() { + pt.mu.Lock() + defer pt.mu.Unlock() + + elapsed := time.Since(pt.startTime) + recordsPerSec := float64(pt.processedRecords) / elapsed.Seconds() + + log.Printf("\n") + log.Printf("╔════════════════════════════════════════════════════════════╗") + log.Printf("║ IMPORT COMPLETE ║") + log.Printf("╚════════════════════════════════════════════════════════════╝") + log.Printf("") + + // Files section + if pt.failedFiles > 0 { + log.Printf("📁 Files Processed: %d/%d (❌ %d failed)", pt.processedFiles, pt.totalFiles, pt.failedFiles) + } else { + log.Printf("📁 Files Processed: %d/%d (✓ all successful)", pt.processedFiles, pt.totalFiles) + } + + // Records section + log.Printf("📊 Total Records: %s", formatNumber(pt.totalRecords)) + log.Printf("✅ Records Inserted: %s", formatNumber(pt.processedRecords)) + + // Performance section + log.Printf("⚡ Average Speed: %.0f records/sec", recordsPerSec) + log.Printf("⏱️ Total Time: %s", formatDuration(elapsed)) + + // Collections section + log.Printf("") + log.Printf("📚 Records by Collection:") + + // Sort collections by count (descending) + type collectionStat struct { + name string + count int64 + } + stats := make([]collectionStat, 0, len(pt.collectionStats)) + for name, count := range pt.collectionStats { + stats = append(stats, collectionStat{name, count}) + } + // Simple bubble sort + for i := 0; i < len(stats); i++ { + for j := i + 1; j < len(stats); j++ { + if stats[j].count > stats[i].count { + stats[i], stats[j] = stats[j], stats[i] + } + } + } + + // Print all collections sorted by count + for _, stat := range stats { + percentage := float64(stat.count) / float64(pt.processedRecords) * 100 + log.Printf(" %-25s %12s (%.1f%%)", + stat.name+":", formatNumber(stat.count), percentage) + } + + log.Printf("") +} + +func formatNumber(n int64) string { + if n < 1000 { + return fmt.Sprintf("%d", n) + } else if n < 1000000 { + return fmt.Sprintf("%.1fK", float64(n)/1000) + } else if n < 1000000000 { + return fmt.Sprintf("%.2fM", float64(n)/1000000) + } + return fmt.Sprintf("%.2fB", float64(n)/1000000000) +} + +func formatDuration(d time.Duration) string { + if d == 0 { + return "calculating..." + } + + hours := int(d.Hours()) + minutes := int(d.Minutes()) % 60 + seconds := int(d.Seconds()) % 60 + + if hours > 0 { + return fmt.Sprintf("%dh %dm %ds", hours, minutes, seconds) + } else if minutes > 0 { + return fmt.Sprintf("%dm %ds", minutes, seconds) + } + return fmt.Sprintf("%ds", seconds) +} From 7e832f0855b1f266626111600fb9f528d8d7e6f6 Mon Sep 17 00:00:00 2001 From: Matias Daloia Date: Wed, 15 Oct 2025 00:26:33 +0200 Subject: [PATCH 2/4] fix: lint issues --- cmd/import/main.go | 28 ++++++++++---------- internal/utils/progress_tracker.go | 42 +++++++++++++++++++++--------- 2 files changed, 44 insertions(+), 26 deletions(-) diff --git a/cmd/import/main.go b/cmd/import/main.go index b6ee2d5..62cd020 100644 --- a/cmd/import/main.go +++ b/cmd/import/main.go @@ -39,7 +39,7 @@ import ( "github.com/qdrant/go-client/qdrant" "github.com/scanoss/folder-hashing-api/internal/domain/entities" - "github.com/scanoss/folder-hashing-api/internal/utils" + progresstracker "github.com/scanoss/folder-hashing-api/internal/utils" ) type WorkerConfig struct { @@ -171,7 +171,7 @@ func main() { return } - progress := utils.NewProgressTracker(len(csvFiles)) + progress := progresstracker.NewProgressTracker(len(csvFiles)) // Channel to process files filesChan := make(chan string, len(csvFiles)) @@ -188,10 +188,7 @@ func main() { go func(workerID int) { defer wg.Done() for file := range filesChan { - fileName := filepath.Base(file) - // Progress bars will show file processing status - - recordCount, err := importCSVFileWithProgress(ctx, client, file, fileName, optimalWorkers.RecommendedBatchSize, progress) + recordCount, err := importCSVFileWithProgress(ctx, client, file, optimalWorkers.RecommendedBatchSize, progress) if err != nil { log.Printf("Worker %d: Error importing file %s: %v", workerID, file, err) errorsChan <- fmt.Errorf("error importing file %s: %w", file, err) @@ -329,7 +326,7 @@ func createCollection(ctx context.Context, client *qdrant.Client, collectionName } // Import data from a CSV file to separate collections. -func importCSVFileWithProgress(ctx context.Context, client *qdrant.Client, filePath, fileName string, batchSize int, progress *utils.ProgressTracker) (int, error) { +func importCSVFileWithProgress(ctx context.Context, client *qdrant.Client, filePath string, batchSize int, progress *progresstracker.ProgressTracker) (int, error) { file, err := os.Open(filePath) if err != nil { return 0, fmt.Errorf("error opening CSV file: %w", err) @@ -353,7 +350,7 @@ func importCSVFileWithProgress(ctx context.Context, client *qdrant.Client, fileP record, err := reader.Read() if err == io.EOF { if len(batch) > 0 { - collectionCounts, err := insertBatchToCollections(ctx, client, batch, progress) + collectionCounts, err := insertBatchToCollections(ctx, client, batch) if err != nil { log.Printf("WARNING: Error inserting final batch: %v", err) } else { @@ -378,7 +375,7 @@ func importCSVFileWithProgress(ctx context.Context, client *qdrant.Client, fileP batchNum++ // Progress bars will show batch processing status - collectionCounts, err := insertBatchToCollections(ctx, client, batch, progress) + collectionCounts, err := insertBatchToCollections(ctx, client, batch) if err != nil { log.Printf("WARNING: Error inserting batch %d: %v", batchNum, err) } else { @@ -391,7 +388,6 @@ func importCSVFileWithProgress(ctx context.Context, client *qdrant.Client, fileP batch = make([][]string, 0, batchSize) } - } // File completed successfully - progress bar will reflect this @@ -431,7 +427,7 @@ func hexSimhashToVector(hexHashString string, bits int) ([]float32, error) { // Insert a batch of records to language-based collections. // //nolint:gocyclo,nestif // Batch processing complexity is inherent to CSV import -func insertBatchToCollections(ctx context.Context, client *qdrant.Client, batch [][]string, progress *utils.ProgressTracker) (map[string]int, error) { +func insertBatchToCollections(ctx context.Context, client *qdrant.Client, batch [][]string) (map[string]int, error) { // Group points by collection (language) collectionPoints := make(map[string][]*qdrant.PointStruct) @@ -671,13 +667,17 @@ func initPurlMap(filename string) (map[string]int, error) { return result, nil } -// GetAvailableMemoryMB returns available memory in MB +// GetAvailableMemoryMB returns available memory in MB. func GetAvailableMemoryMB() (int, error) { file, err := os.Open("/proc/meminfo") if err != nil { return 0, err } - defer file.Close() + defer func() { + if err := file.Close(); err != nil { + log.Printf("Warning: Error closing file: %v", err) + } + }() scanner := bufio.NewScanner(file) for scanner.Scan() { @@ -699,7 +699,7 @@ func GetAvailableMemoryMB() (int, error) { // CalculateOptimalWorkers calculates the optimal number of workers for processing CSV files, // based on the number of files and their average size. // It takes into account available memory and CPU cores to determine the optimal number of workers. -func CalculateOptimalWorkers(fileCount int, avgFileSizeMB int) WorkerConfig { +func CalculateOptimalWorkers(fileCount, avgFileSizeMB int) WorkerConfig { numCPU := runtime.NumCPU() log.Printf("Number of CPU cores: %d", numCPU) diff --git a/internal/utils/progress_tracker.go b/internal/utils/progress_tracker.go index 2fec670..045093f 100644 --- a/internal/utils/progress_tracker.go +++ b/internal/utils/progress_tracker.go @@ -1,4 +1,21 @@ -package utils +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * Copyright (C) 2024 SCANOSS.COM + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +// Package progresstracker provides utilities for progress +package progresstracker import ( "fmt" @@ -11,7 +28,7 @@ import ( "github.com/vbauerster/mpb/v8/decor" ) -// ProgressTracker tracks import progress across multiple workers +// ProgressTracker tracks import progress across multiple workers. type ProgressTracker struct { mu sync.Mutex startTime time.Time @@ -31,7 +48,7 @@ type ProgressTracker struct { collectionBars map[string]*mpb.Bar } -// NewProgressTracker creates a new progress tracker with mpb progress bars +// NewProgressTracker creates a new progress tracker with mpb progress bars. func NewProgressTracker(totalFiles int) *ProgressTracker { p := mpb.New( mpb.WithOutput(color.Output), @@ -79,7 +96,7 @@ func NewProgressTracker(totalFiles int) *ProgressTracker { } } -// AddRecords increments the record count and updates progress bars +// AddRecords increments the record count and updates progress bars. func (pt *ProgressTracker) AddRecords(count int, collectionName string) { pt.mu.Lock() defer pt.mu.Unlock() @@ -111,7 +128,7 @@ func (pt *ProgressTracker) AddRecords(count int, collectionName string) { bar.IncrBy(count) } -// FileCompleted marks a file as completed and updates the file progress bar +// FileCompleted marks a file as completed and updates the file progress bar. func (pt *ProgressTracker) FileCompleted(recordCount int, success bool) { pt.mu.Lock() defer pt.mu.Unlock() @@ -147,26 +164,26 @@ func (pt *ProgressTracker) FileCompleted(recordCount int, success bool) { pt.fileBar.Increment() } -// MarkFileFailed increments the failed file counter and can be called separately +// MarkFileFailed increments the failed file counter and can be called separately. func (pt *ProgressTracker) MarkFileFailed() { pt.mu.Lock() defer pt.mu.Unlock() pt.failedFiles++ } -// GetFailedFiles returns the current number of failed files +// GetFailedFiles returns the current number of failed files. func (pt *ProgressTracker) GetFailedFiles() int { pt.mu.Lock() defer pt.mu.Unlock() return pt.failedFiles } -// Wait waits for all progress bars to complete +// Wait waits for all progress bars to complete. func (pt *ProgressTracker) Wait() { pt.progress.Wait() } -// PrintFinalSummary prints the final import summary after all bars are complete +// PrintFinalSummary prints the final import summary after all bars are complete. func (pt *ProgressTracker) PrintFinalSummary() { pt.mu.Lock() defer pt.mu.Unlock() @@ -228,11 +245,12 @@ func (pt *ProgressTracker) PrintFinalSummary() { } func formatNumber(n int64) string { - if n < 1000 { + switch { + case n < 1000: return fmt.Sprintf("%d", n) - } else if n < 1000000 { + case n < 1000000: return fmt.Sprintf("%.1fK", float64(n)/1000) - } else if n < 1000000000 { + case n < 1000000000: return fmt.Sprintf("%.2fM", float64(n)/1000000) } return fmt.Sprintf("%.2fB", float64(n)/1000000000) From ff3618ed90733ff4922e8bb3a306e0baa2da3644 Mon Sep 17 00:00:00 2001 From: Matias Daloia Date: Wed, 15 Oct 2025 00:29:52 +0200 Subject: [PATCH 3/4] fix: pr comments --- cmd/import/main.go | 8 ++++++-- internal/utils/progress_tracker.go | 9 +++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/cmd/import/main.go b/cmd/import/main.go index 62cd020..e796699 100644 --- a/cmd/import/main.go +++ b/cmd/import/main.go @@ -178,8 +178,12 @@ func main() { var wg sync.WaitGroup errorsChan := make(chan error, len(csvFiles)) - avgFileSize := csvFilesSize / int64(len(csvFiles)) - optimalWorkers := CalculateOptimalWorkers(len(csvFiles), int(avgFileSize)) + avgFileSizeBytes := csvFilesSize / int64(len(csvFiles)) + avgFileSizeMB := int(avgFileSizeBytes / (1024 * 1024)) + if avgFileSizeMB == 0 { + avgFileSizeMB = 1 + } + optimalWorkers := CalculateOptimalWorkers(len(csvFiles), avgFileSizeMB) // Start workers to process files in parallel log.Printf("Starting %d worker(s) to process CSV files...", optimalWorkers.NumWorkers) diff --git a/internal/utils/progress_tracker.go b/internal/utils/progress_tracker.go index 045093f..69b2e37 100644 --- a/internal/utils/progress_tracker.go +++ b/internal/utils/progress_tracker.go @@ -180,6 +180,15 @@ func (pt *ProgressTracker) GetFailedFiles() int { // Wait waits for all progress bars to complete. func (pt *ProgressTracker) Wait() { + pt.mu.Lock() + if pt.recordBar != nil { + pt.recordBar.SetTotal(pt.processedRecords, true) + } + for name, bar := range pt.collectionBars { + total := pt.collectionStats[name] + bar.SetTotal(total, true) + } + pt.mu.Unlock() pt.progress.Wait() } From 2ca04bde8bc3ce37c658cfadb7148768c816522b Mon Sep 17 00:00:00 2001 From: Matias Daloia Date: Wed, 15 Oct 2025 00:38:50 +0200 Subject: [PATCH 4/4] fix: pr comments --- cmd/import/main.go | 10 +++++++++- internal/utils/progress_tracker.go | 10 ++++++---- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/cmd/import/main.go b/cmd/import/main.go index e796699..edb74c0 100644 --- a/cmd/import/main.go +++ b/cmd/import/main.go @@ -733,7 +733,15 @@ func CalculateOptimalWorkers(fileCount, avgFileSizeMB int) WorkerConfig { )) // Cap at file count and set bounds between 2 and 32 - optimalWorkers = max(2, min(optimalWorkers, fileCount, 32)) + if optimalWorkers < 2 { + optimalWorkers = 2 + } + if optimalWorkers > fileCount { + optimalWorkers = fileCount + } + if optimalWorkers > 32 { + optimalWorkers = 32 + } log.Printf("Optimal numbers of workers: %d", optimalWorkers) diff --git a/internal/utils/progress_tracker.go b/internal/utils/progress_tracker.go index 69b2e37..28fed28 100644 --- a/internal/utils/progress_tracker.go +++ b/internal/utils/progress_tracker.go @@ -111,7 +111,7 @@ func (pt *ProgressTracker) AddRecords(count int, collectionName string) { if _, exists := pt.collectionBars[collectionName]; !exists { // Create a new bar for this collection (unknown total - just shows count) collectionBar := pt.progress.AddBar(0, - mpb.BarFillerClearOnComplete(), + mpb.BarRemoveOnComplete(), mpb.PrependDecorators( decor.Name(" "+collectionName+": ", decor.WC{C: decor.DindentRight | decor.DextraSpace}), decor.CurrentNoUnit("%d", decor.WCSyncWidth), @@ -245,9 +245,11 @@ func (pt *ProgressTracker) PrintFinalSummary() { // Print all collections sorted by count for _, stat := range stats { - percentage := float64(stat.count) / float64(pt.processedRecords) * 100 - log.Printf(" %-25s %12s (%.1f%%)", - stat.name+":", formatNumber(stat.count), percentage) + var percentage float64 + if pt.processedRecords > 0 { + percentage = float64(stat.count) / float64(pt.processedRecords) * 100 + } + log.Printf(" %-25s %12s (%.1f%%)", stat.name+":", formatNumber(stat.count), percentage) } log.Printf("")