Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 71 additions & 25 deletions cmd/import/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ const (

var rankMap map[string]int

var (
createdCollections = make(map[string]bool)
collectionCreateLock sync.Mutex
)

//nolint:gocyclo // Main function complexity is acceptable for CLI setup
func main() {
csvDir := flag.String("dir", "", "Directory containing CSV files (required)")
Expand Down Expand Up @@ -116,32 +121,25 @@ func main() {

collections := entities.GetAllSupportedCollections()

log.Printf("Will create/check %d language-based collections: %v", len(collections), collections)

// Check and create collections
for _, collectionName := range collections {
log.Printf("Checking collection: %s", collectionName)
collectionExists, err := client.CollectionExists(ctx, collectionName)
if err != nil {
log.Printf("Error checking if collection %s exists: %v", collectionName, err)
return
}
log.Printf("Using lazy collection creation for %d language-based collections: %v", len(collections), collections)

if *overwrite && collectionExists {
log.Printf("Collection %s exists and overwrite flag is set. Dropping collection...", collectionName)
err = client.DeleteCollection(ctx, collectionName)
// Only handle overwrite flag - collections will be created lazily
if *overwrite {
for _, collectionName := range collections {
collectionExists, err := client.CollectionExists(ctx, collectionName)
if err != nil {
cleanupAndExit(client, "Error dropping collection %s: %v", collectionName, err)
log.Printf("Error checking if collection %s exists: %v", collectionName, err)
return
}
log.Printf("Collection '%s' dropped successfully", collectionName)
collectionExists = false
}

// Create collection if it doesn't exist
if !collectionExists {
createCollection(ctx, client, collectionName)
} else {
log.Printf("Using existing collection: %s", collectionName)
if collectionExists {
log.Printf("Collection %s exists and overwrite flag is set. Dropping collection...", collectionName)
err = client.DeleteCollection(ctx, collectionName)
if err != nil {
cleanupAndExit(client, "Error dropping collection %s: %v", collectionName, err)
}
log.Printf("Collection '%s' dropped successfully", collectionName)
}
}
}
Comment on lines +127 to 144
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix incomplete error handling in overwrite loop.

Line 132 uses a bare return when checking collection existence fails, which exits the entire overwrite loop early. This leaves the system in an inconsistent state where some collections may be dropped but others are never checked.

Apply this diff to ensure consistent error handling:

 		for _, collectionName := range collections {
 			collectionExists, err := client.CollectionExists(ctx, collectionName)
 			if err != nil {
-				log.Printf("Error checking if collection %s exists: %v", collectionName, err)
-				return
+				cleanupAndExit(client, "Error checking if collection %s exists: %v", collectionName, err)
 			}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if *overwrite {
for _, collectionName := range collections {
collectionExists, err := client.CollectionExists(ctx, collectionName)
if err != nil {
cleanupAndExit(client, "Error dropping collection %s: %v", collectionName, err)
log.Printf("Error checking if collection %s exists: %v", collectionName, err)
return
}
log.Printf("Collection '%s' dropped successfully", collectionName)
collectionExists = false
}
// Create collection if it doesn't exist
if !collectionExists {
createCollection(ctx, client, collectionName)
} else {
log.Printf("Using existing collection: %s", collectionName)
if collectionExists {
log.Printf("Collection %s exists and overwrite flag is set. Dropping collection...", collectionName)
err = client.DeleteCollection(ctx, collectionName)
if err != nil {
cleanupAndExit(client, "Error dropping collection %s: %v", collectionName, err)
}
log.Printf("Collection '%s' dropped successfully", collectionName)
}
}
}
if *overwrite {
for _, collectionName := range collections {
collectionExists, err := client.CollectionExists(ctx, collectionName)
if err != nil {
cleanupAndExit(client, "Error checking if collection %s exists: %v", collectionName, err)
}
if collectionExists {
log.Printf("Collection %s exists and overwrite flag is set. Dropping collection...", collectionName)
err = client.DeleteCollection(ctx, collectionName)
if err != nil {
cleanupAndExit(client, "Error dropping collection %s: %v", collectionName, err)
}
log.Printf("Collection '%s' dropped successfully", collectionName)
}
}
}
🤖 Prompt for AI Agents
In cmd/import/main.go around lines 127 to 144, the overwrite loop currently uses
a bare `return` when CollectionExists returns an error which aborts the loop
without consistent cleanup; replace that bare `return` with a call to
`cleanupAndExit(client, "Error checking if collection %s exists: %v",
collectionName, err)` so the error is handled consistently (matching the other
error paths), ensuring proper cleanup and process exit when the existence check
fails.


Expand Down Expand Up @@ -264,8 +262,51 @@ func cleanupAndExit(client *qdrant.Client, format string, args ...any) {
os.Exit(1)
}

// ensureCollectionExists checks if a collection exists and creates it lazily if not.
func ensureCollectionExists(ctx context.Context, client *qdrant.Client, collectionName string) error {
// Check if we've already created this collection
collectionCreateLock.Lock()
if createdCollections[collectionName] {
collectionCreateLock.Unlock()
return nil
}
collectionCreateLock.Unlock()

// Check if collection exists in Qdrant
exists, err := client.CollectionExists(ctx, collectionName)
if err != nil {
return fmt.Errorf("error checking collection existence: %w", err)
}

if exists {
// Mark as created to avoid checking again
collectionCreateLock.Lock()
createdCollections[collectionName] = true
collectionCreateLock.Unlock()
log.Printf("Collection %s already exists, using it", collectionName)
return nil
}

// Collection doesn't exist, create it (thread-safe)
collectionCreateLock.Lock()
defer collectionCreateLock.Unlock()

// Double-check pattern: another goroutine might have created it
if createdCollections[collectionName] {
return nil
}

log.Printf("Creating collection: %s", collectionName)
createCollection(ctx, client, collectionName)
createdCollections[collectionName] = true
log.Printf("Collection %s created and marked as ready", collectionName)

return nil
}
Comment on lines +265 to +305
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: createCollection terminates program in concurrent context.

Line 300 calls createCollection which uses cleanupAndExit on errors, terminating the entire program. In this concurrent context with multiple worker goroutines, a collection creation failure will abruptly kill all workers mid-processing, potentially losing in-flight data.

Refactor to return errors gracefully:

Step 1: Modify createCollection to return an error instead of calling cleanupAndExit:

-func createCollection(ctx context.Context, client *qdrant.Client, collectionName string) {
+func createCollection(ctx context.Context, client *qdrant.Client, collectionName string) error {
 	log.Printf("Creating language-based collection with named vectors: %s", collectionName)
 	// ... rest of function ...
 	})
 	if err != nil {
-		cleanupAndExit(client, "Error creating collection %s: %v", collectionName, err)
+		return fmt.Errorf("error creating collection %s: %w", collectionName, err)
 	}
 	log.Printf("Collection '%s' with named vectors created successfully", collectionName)
+	return nil
 }

Step 2: Update Line 300 to handle the error:

 	log.Printf("Creating collection: %s", collectionName)
-	createCollection(ctx, client, collectionName)
+	if err := createCollection(ctx, client, collectionName); err != nil {
+		return err
+	}
 	createdCollections[collectionName] = true

This allows worker goroutines to handle creation failures gracefully and report errors through the errorsChan mechanism already in place (lines 171, 187).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// ensureCollectionExists checks if a collection exists and creates it lazily if not.
func ensureCollectionExists(ctx context.Context, client *qdrant.Client, collectionName string) error {
// Check if we've already created this collection
collectionCreateLock.Lock()
if createdCollections[collectionName] {
collectionCreateLock.Unlock()
return nil
}
collectionCreateLock.Unlock()
// Check if collection exists in Qdrant
exists, err := client.CollectionExists(ctx, collectionName)
if err != nil {
return fmt.Errorf("error checking collection existence: %w", err)
}
if exists {
// Mark as created to avoid checking again
collectionCreateLock.Lock()
createdCollections[collectionName] = true
collectionCreateLock.Unlock()
log.Printf("Collection %s already exists, using it", collectionName)
return nil
}
// Collection doesn't exist, create it (thread-safe)
collectionCreateLock.Lock()
defer collectionCreateLock.Unlock()
// Double-check pattern: another goroutine might have created it
if createdCollections[collectionName] {
return nil
}
log.Printf("Creating collection: %s", collectionName)
createCollection(ctx, client, collectionName)
createdCollections[collectionName] = true
log.Printf("Collection %s created and marked as ready", collectionName)
return nil
}
// ensureCollectionExists checks if a collection exists and creates it lazily if not.
func ensureCollectionExists(ctx context.Context, client *qdrant.Client, collectionName string) error {
// Check if we've already created this collection
collectionCreateLock.Lock()
if createdCollections[collectionName] {
collectionCreateLock.Unlock()
return nil
}
collectionCreateLock.Unlock()
// Check if collection exists in Qdrant
exists, err := client.CollectionExists(ctx, collectionName)
if err != nil {
return fmt.Errorf("error checking collection existence: %w", err)
}
if exists {
// Mark as created to avoid checking again
collectionCreateLock.Lock()
createdCollections[collectionName] = true
collectionCreateLock.Unlock()
log.Printf("Collection %s already exists, using it", collectionName)
return nil
}
// Collection doesn't exist, create it (thread-safe)
collectionCreateLock.Lock()
defer collectionCreateLock.Unlock()
// Double-check pattern: another goroutine might have created it
if createdCollections[collectionName] {
return nil
}
log.Printf("Creating collection: %s", collectionName)
if err := createCollection(ctx, client, collectionName); err != nil {
return err
}
createdCollections[collectionName] = true
log.Printf("Collection %s created and marked as ready", collectionName)
return nil
}
🤖 Prompt for AI Agents
In cmd/import/main.go around lines 265 to 305, the call to createCollection at
line ~300 can terminate the whole process via cleanupAndExit which is unsafe in
a concurrent worker context; change createCollection to return an error instead
of exiting on failure, remove any direct calls to cleanupAndExit from it, and
update this function to call createCollection(ctx, client, collectionName) and
check the returned error; if non-nil, release locks as appropriate and forward
the error to the worker error channel (errorsChan) or return the error up to the
caller so the worker goroutine can handle/report it gracefully rather than
killing the process.


// Create a language-based collection with named vectors (dirs, names, contents).
// Always creates collections with HNSW indexing disabled for fast import.
// Production-optimized with reduced shard/segment counts to prevent memory spikes.
func createCollection(ctx context.Context, client *qdrant.Client, collectionName string) {
log.Printf("Creating language-based collection with named vectors: %s", collectionName)
log.Printf("Collection %s: HNSW indexing DISABLED for fast import (m=0)", collectionName)
Expand Down Expand Up @@ -305,10 +346,10 @@ func createCollection(ctx context.Context, client *qdrant.Client, collectionName
err := client.CreateCollection(ctx, &qdrant.CreateCollection{
CollectionName: collectionName,
VectorsConfig: qdrant.NewVectorsConfigMap(namedVectors),
ShardNumber: qdrant.PtrOf(uint32(4)), // 4 shards for parallel WAL processing
ShardNumber: qdrant.PtrOf(uint32(2)),
// Optimize for fast import with indexing disabled
OptimizersConfig: &qdrant.OptimizersConfigDiff{
DefaultSegmentNumber: qdrant.PtrOf(uint64(32)), // Many segments for parallelism
DefaultSegmentNumber: qdrant.PtrOf(uint64(4)),
MaxSegmentSize: qdrant.PtrOf(uint64(500000)), // Large segments for efficiency
IndexingThreshold: qdrant.PtrOf(uint64(0)), // Disable indexing during import
},
Expand Down Expand Up @@ -621,9 +662,14 @@ func insertBatchToSeparateCollections(ctx context.Context, client *qdrant.Client
}

// Insert to language-based collections sequentially to avoid connection storms
// Qdrant with 4 shards will parallelize internally via separate WAL processing
// Collections are created lazily on first use to prevent memory spikes
for collectionName, points := range collectionPoints {
if len(points) > 0 {
// Ensure collection exists before inserting (lazy creation)
if err := ensureCollectionExists(ctx, client, collectionName); err != nil {
return fmt.Errorf("error ensuring collection %s exists: %w", collectionName, err)
}

_, err := client.Upsert(ctx, &qdrant.UpsertPoints{
CollectionName: collectionName,
Points: points,
Expand Down
1 change: 0 additions & 1 deletion docker-compose.qdrant.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ services:
- 6334:6334 # gRPC API port (used by our Go client)
volumes:
- ./qdrant_data:/qdrant/storage
- ./qdrant-config.yaml:/qdrant/config/production.yaml:ro
expose:
- 6333
- 6334
Expand Down
77 changes: 0 additions & 77 deletions qdrant-config.yaml

This file was deleted.

Loading