From 27ce6084bb8ed5e69934b12a5691718af1960c72 Mon Sep 17 00:00:00 2001 From: Matias Daloia Date: Tue, 14 Oct 2025 13:40:11 +0200 Subject: [PATCH] fix: create collections lazily, remove qdrant config, update docker compose --- cmd/import/main.go | 96 +++++++++++++++++++++++++++++---------- docker-compose.qdrant.yml | 1 - qdrant-config.yaml | 77 ------------------------------- 3 files changed, 71 insertions(+), 103 deletions(-) delete mode 100644 qdrant-config.yaml diff --git a/cmd/import/main.go b/cmd/import/main.go index 2db18b3..08ce58e 100644 --- a/cmd/import/main.go +++ b/cmd/import/main.go @@ -55,6 +55,11 @@ const ( var rankMap map[string]int +var ( + createdCollections = make(map[string]bool) + collectionCreateLock sync.Mutex +) + //nolint:gocyclo // Main function complexity is acceptable for CLI setup func main() { csvDir := flag.String("dir", "", "Directory containing CSV files (required)") @@ -116,32 +121,25 @@ func main() { collections := entities.GetAllSupportedCollections() - log.Printf("Will create/check %d language-based collections: %v", len(collections), collections) - - // Check and create collections - for _, collectionName := range collections { - log.Printf("Checking collection: %s", collectionName) - collectionExists, err := client.CollectionExists(ctx, collectionName) - if err != nil { - log.Printf("Error checking if collection %s exists: %v", collectionName, err) - return - } + log.Printf("Using lazy collection creation for %d language-based collections: %v", len(collections), collections) - if *overwrite && collectionExists { - log.Printf("Collection %s exists and overwrite flag is set. Dropping collection...", collectionName) - err = client.DeleteCollection(ctx, collectionName) + // Only handle overwrite flag - collections will be created lazily + if *overwrite { + for _, collectionName := range collections { + collectionExists, err := client.CollectionExists(ctx, collectionName) if err != nil { - cleanupAndExit(client, "Error dropping collection %s: %v", collectionName, err) + log.Printf("Error checking if collection %s exists: %v", collectionName, err) + return } - log.Printf("Collection '%s' dropped successfully", collectionName) - collectionExists = false - } - // Create collection if it doesn't exist - if !collectionExists { - createCollection(ctx, client, collectionName) - } else { - log.Printf("Using existing collection: %s", collectionName) + if collectionExists { + log.Printf("Collection %s exists and overwrite flag is set. Dropping collection...", collectionName) + err = client.DeleteCollection(ctx, collectionName) + if err != nil { + cleanupAndExit(client, "Error dropping collection %s: %v", collectionName, err) + } + log.Printf("Collection '%s' dropped successfully", collectionName) + } } } @@ -264,8 +262,51 @@ func cleanupAndExit(client *qdrant.Client, format string, args ...any) { os.Exit(1) } +// ensureCollectionExists checks if a collection exists and creates it lazily if not. +func ensureCollectionExists(ctx context.Context, client *qdrant.Client, collectionName string) error { + // Check if we've already created this collection + collectionCreateLock.Lock() + if createdCollections[collectionName] { + collectionCreateLock.Unlock() + return nil + } + collectionCreateLock.Unlock() + + // Check if collection exists in Qdrant + exists, err := client.CollectionExists(ctx, collectionName) + if err != nil { + return fmt.Errorf("error checking collection existence: %w", err) + } + + if exists { + // Mark as created to avoid checking again + collectionCreateLock.Lock() + createdCollections[collectionName] = true + collectionCreateLock.Unlock() + log.Printf("Collection %s already exists, using it", collectionName) + return nil + } + + // Collection doesn't exist, create it (thread-safe) + collectionCreateLock.Lock() + defer collectionCreateLock.Unlock() + + // Double-check pattern: another goroutine might have created it + if createdCollections[collectionName] { + return nil + } + + log.Printf("Creating collection: %s", collectionName) + createCollection(ctx, client, collectionName) + createdCollections[collectionName] = true + log.Printf("Collection %s created and marked as ready", collectionName) + + return nil +} + // Create a language-based collection with named vectors (dirs, names, contents). // Always creates collections with HNSW indexing disabled for fast import. +// Production-optimized with reduced shard/segment counts to prevent memory spikes. func createCollection(ctx context.Context, client *qdrant.Client, collectionName string) { log.Printf("Creating language-based collection with named vectors: %s", collectionName) log.Printf("Collection %s: HNSW indexing DISABLED for fast import (m=0)", collectionName) @@ -305,10 +346,10 @@ func createCollection(ctx context.Context, client *qdrant.Client, collectionName err := client.CreateCollection(ctx, &qdrant.CreateCollection{ CollectionName: collectionName, VectorsConfig: qdrant.NewVectorsConfigMap(namedVectors), - ShardNumber: qdrant.PtrOf(uint32(4)), // 4 shards for parallel WAL processing + ShardNumber: qdrant.PtrOf(uint32(2)), // Optimize for fast import with indexing disabled OptimizersConfig: &qdrant.OptimizersConfigDiff{ - DefaultSegmentNumber: qdrant.PtrOf(uint64(32)), // Many segments for parallelism + DefaultSegmentNumber: qdrant.PtrOf(uint64(4)), MaxSegmentSize: qdrant.PtrOf(uint64(500000)), // Large segments for efficiency IndexingThreshold: qdrant.PtrOf(uint64(0)), // Disable indexing during import }, @@ -621,9 +662,14 @@ func insertBatchToSeparateCollections(ctx context.Context, client *qdrant.Client } // Insert to language-based collections sequentially to avoid connection storms - // Qdrant with 4 shards will parallelize internally via separate WAL processing + // Collections are created lazily on first use to prevent memory spikes for collectionName, points := range collectionPoints { if len(points) > 0 { + // Ensure collection exists before inserting (lazy creation) + if err := ensureCollectionExists(ctx, client, collectionName); err != nil { + return fmt.Errorf("error ensuring collection %s exists: %w", collectionName, err) + } + _, err := client.Upsert(ctx, &qdrant.UpsertPoints{ CollectionName: collectionName, Points: points, diff --git a/docker-compose.qdrant.yml b/docker-compose.qdrant.yml index aaddd47..34f72df 100644 --- a/docker-compose.qdrant.yml +++ b/docker-compose.qdrant.yml @@ -7,7 +7,6 @@ services: - 6334:6334 # gRPC API port (used by our Go client) volumes: - ./qdrant_data:/qdrant/storage - - ./qdrant-config.yaml:/qdrant/config/production.yaml:ro expose: - 6333 - 6334 diff --git a/qdrant-config.yaml b/qdrant-config.yaml deleted file mode 100644 index 02d4c02..0000000 --- a/qdrant-config.yaml +++ /dev/null @@ -1,77 +0,0 @@ -# Qdrant Configuration for Bulk Import Optimization -# Place this file at: /path/to/qdrant/config/config.yaml -# Or use with Docker: docker run -v $(pwd)/qdrant-config.yaml:/qdrant/config/production.yaml:ro qdrant/qdrant - -log_level: INFO - -storage: - # Storage path for Qdrant data - storage_path: /qdrant/storage - - # Performance optimization for bulk imports - performance: - # Maximum number of concurrent API workers (default: number of CPU cores) - # Increase for better handling of concurrent requests - max_workers: 16 - - # Maximum concurrent updates to shard replicas - # Higher values allow more parallel writes - update_concurrency: 10 - - # Optimizer CPU budget - threads allocated for optimization jobs - # Set to number of cores or less to avoid overloading - optimizer_cpu_budget: 8 - - # Write-Ahead Log (WAL) configuration for better write throughput - wal: - # Size of each WAL segment in MB (default: 32) - # Larger segments can improve write performance - wal_capacity_mb: 64 - - # Number of WAL segments to create in advance (default: 0) - # Pre-creating segments can reduce latency during writes - wal_segments_ahead: 1 - - # Optimizer configuration - optimizers: - # Number of segments created initially (default: auto) - # More segments allow better parallelism during indexing - default_segment_number: 32 - - # Maximum points per segment (default: 100000) - # Larger segments reduce overhead but may slow down updates - max_segment_size: 500000 - - # Vacuum optimizer - controls when to merge small segments - vacuum_min_vector_number: 10000 - - # Memmap threshold - vectors larger than this use disk storage - # Set higher to keep more data in RAM (if you have enough memory) - memmap_threshold_kb: 50000 - -service: - # gRPC port (default: 6334) - grpc_port: 6334 - - # HTTP port (default: 6333) - http_port: 6333 - - # Maximum size of POST request in MB (default: 32) - # Increase for larger batch uploads - max_request_size_mb: 128 - - # Maximum allowed search/upsert timeout in seconds - max_timeout_sec: 120 - -# Cluster configuration (optional - only if running Qdrant cluster) -# cluster: -# enabled: false - -# TLS configuration (optional) -# service: -# enable_tls: false -# tls_cert_file: /path/to/cert.pem -# tls_key_file: /path/to/key.pem - -# Telemetry (optional - set to false to disable) -telemetry_disabled: false