In [6]:
# Import necessary libraries
import os
import sys
from pathlib import Path
from typing import Dict, List, Any, Optional
import pandas as pd
import numpy as np
from datetime import datetime
import json


# Add backend to path for imports
sys.path.insert(0, str(Path.cwd().parent / 'backend'))

from app.config import settings
from app.services.embeddings import EmbeddingService


In [7]:
class DataFileIngestion:
    """
    Ingestion service for data files (CSV, Excel, JSON, Parquet, etc.)
    Extracts metadata, generates LLM summaries, and creates embeddings
    using the existing EmbeddingService
    """

    def __init__(self):
        self.embedding_service = EmbeddingService()
        self.openai_client = self.embedding_service.openai_client

    def extract_file_metadata(self, filepath: Path) -> Optional[Dict[str, Any]]:
        try:
            file_extension = filepath.suffix.lower()

            if file_extension == '.csv':
                df = pd.read_csv(filepath)
            elif file_extension in ['.xlsx', '.xls']:
                df = pd.read_excel(filepath)
            elif file_extension == '.parquet':
                df = pd.read_parquet(filepath)
            elif file_extension == '.json':
                df = pd.read_json(filepath)
            else:
                print(f"Unsupported file type: {file_extension}")
                return None

            file_stats = filepath.stat()

            column_info = {}
            for col in df.columns:
                column_info[col] = {
                    'dtype': str(df[col].dtype),
                    'null_count': int(df[col].isnull().sum()),
                    'null_percentage': round(float(df[col].isnull().sum() / len(df) * 100), 2),
                    'unique_count': int(df[col].nunique())
                }
                if pd.api.types.is_numeric_dtype(df[col]):
                    min_val = df[col].min()
                    max_val = df[col].max()
                    mean_val = df[col].mean()
                    median_val = df[col].median()
                    column_info[col]['min'] = round(float(min_val), 6) if not pd.isna(min_val) else None
                    column_info[col]['max'] = round(float(max_val), 6) if not pd.isna(max_val) else None
                    column_info[col]['mean'] = round(float(mean_val), 6) if not pd.isna(mean_val) else None
                    column_info[col]['median'] = round(float(median_val), 6) if not pd.isna(median_val) else None

            example_rows = df.head(3).to_dict(orient='records')

            # Convert problematic types (Timestamps, NaN) to strings, keep primitives as-is
            for row in example_rows:
                for key, value in row.items():
                    if pd.isna(value):
                        row[key] = None  # None is JSON-serializable
                    elif isinstance(value, (pd.Timestamp, datetime)):
                        row[key] = value.isoformat()  # Convert Timestamp to string
                    elif isinstance(value, (np.integer, np.floating)):
                        # Convert numpy types to Python primitives
                        row[key] = float(value) if isinstance(value, np.floating) else int(value)
                    # int, float, str, bool stay as-is - they're JSON-serializable

            metadata = {
                'filename': filepath.name,
                'filepath': str(filepath.absolute()),
                'file_extension': file_extension,
                'file_size_bytes': file_stats.st_size,  # int - keep as-is
                'file_size_mb': round(file_stats.st_size / (1024 * 1024), 2),  # float - keep as-is
                'modified_time': datetime.fromtimestamp(file_stats.st_mtime).isoformat(),  # datetime ‚Üí string
                'row_count': len(df),  # int - keep as-is
                'column_count': len(df.columns),  # int - keep as-is
                'column_names': df.columns.tolist(),  # list - keep as list
                'column_info': column_info,  # dict - keep as dict
                'example_rows': example_rows,  # list of dicts - keep as list
                'memory_usage_mb': round(df.memory_usage(deep=True).sum() / (1024 * 1024), 2)  # float - keep as-is
            }

            return metadata

        except Exception as e:
            print(f"Error processing {filepath}: {str(e)}")
            return None

    def metadata_to_context_string(self, metadata: Dict[str, Any]) -> str:
        # Convert entire metadata dict to JSON string
        json_string = json.dumps(metadata, indent=2)
        
        # Print for debugging purposes
        print("üìã Metadata JSON:")
        print(json_string)
        
        # Return the stringified JSON
        return json_string

    def generate_llm_summary(self, context_string: str) -> str:
        prompt = f"""You are analyzing a dataset for a lab data repository. Given the following metadata, write a concise but comprehensive natural language summary (2-4 sentences) that would be optimal for semantic search and RAG (Retrieval Augmented Generation) applications.

The summary should:
1. Describe what this dataset contains and its purpose
2. Highlight key columns and their characteristics (data types, ranges, uniqueness)
3. Be context rich and contain information that is outlined in a way that is most suitable for your output to be vector embedded.

Dataset Metadata:
{context_string}

Write only the summary, no additional commentary or any other text:"""

        response = self.openai_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": "You are a data analyst expert at creating concise, searchable summaries of datasets for RAG applications."},
                {"role": "user", "content": prompt}
            ],
            temperature=0.3,
            max_tokens=300
        )
        
        return response.choices[0].message.content.strip()

    def embed_and_store(self, metadata: Dict[str, Any], namespace: Optional[str] = None):
        """
        Generate LLM summary, create embedding, and store in Pinecone
        """
        print(f"ü§ñ Generating LLM summary for {metadata['filename']}...")
        context_string = self.metadata_to_context_string(metadata)
        llm_summary = self.generate_llm_summary(context_string)
        print(f"‚úì Summary: {llm_summary}")
        
        embedding, _ = self.embedding_service.embed_text(llm_summary)
        
        file_id = f"file_{metadata['filename'].replace('.', '_')}_{hash(metadata['filepath'])}"
        
        vector = {
            "id": file_id,
            "values": embedding,
            "metadata": {
                "filename": metadata['filename'],
                "filepath": metadata['filepath'],
                "file_type": metadata['file_extension'],
                "row_count": metadata['row_count'],
                "column_count": metadata['column_count'],
                "column_names": ','.join(metadata['column_names']),
                "file_size_mb": metadata['file_size_mb'],
                "llm_summary": llm_summary,
                "summary_preview": llm_summary[:500]
            }
        }
        
        if self.embedding_service.index:
            self.embedding_service.index.upsert(vectors=[vector], namespace=namespace or "")
            print(f"‚úì Stored embedding for {metadata['filename']} with ID: {file_id}")
        else:
            print("‚ö† Pinecone not initialized, skipping vector storage")
        
        return {
            "file_id": file_id,
            "embedding_dim": len(embedding),
            "context_length": len(llm_summary),
            "llm_summary": llm_summary,
            "stored_in_pinecone": self.embedding_service.index is not None
        }

    def process_directory(self, directory_path: str, namespace: Optional[str] = None) -> List[Dict[str, Any]]:
        directory = Path(directory_path)
        if not directory.exists():
            print(f"Directory {directory_path} does not exist")
            return []
        supported_extensions = ['.csv', '.xlsx', '.xls', '.parquet', '.json']
        results = []
        for filepath in directory.rglob('*'):
            if filepath.is_file() and filepath.suffix.lower() in supported_extensions:
                print(f"\n{'='*60}")
                print(f"Processing: {filepath.name}")
                print(f"{'='*60}")

                metadata = self.extract_file_metadata(filepath)
                if metadata:
                    embedding_info = self.embed_and_store(metadata, namespace)
                    results.append({
                        'filepath': str(filepath),
                        'metadata': metadata,
                        'embedding_info': embedding_info,
                        'status': 'success'
                    })
                else:
                    results.append({
                        'filepath': str(filepath),
                        'status': 'failed'
                    })

        print(f"\n{'='*60}")
        print(f"Processed {len(results)} files")
        print(f"{'='*60}")

        return results

In [8]:
# Example 1: Test with a single file
# Create an instance of the ingestion service
ingestion = DataFileIngestion()

# Test with a single file (update path to your actual file)
test_file = Path("../test_dataset/labverse_experiments_5k.xlsx")

# Extract metadata
metadata = ingestion.extract_file_metadata(test_file)

print("üìä METADATA EXTRACTED:")
print(metadata)

üìä METADATA EXTRACTED:
{'filename': 'labverse_experiments_5k.xlsx', 'filepath': '/Users/sid/Programming/LabVerse/test-notebooks/../test_dataset/labverse_experiments_5k.xlsx', 'file_extension': '.xlsx', 'file_size_bytes': 251637, 'file_size_mb': 0.24, 'modified_time': '2025-10-03T16:05:24.253615', 'row_count': 5000, 'column_count': 7, 'column_names': ['experiment_id', 'sample_date', 'sample_name', 'temperature', 'result', 'status', 'notes'], 'column_info': {'experiment_id': {'dtype': 'int64', 'null_count': 0, 'null_percentage': 0.0, 'unique_count': 5000, 'min': 1013.0, 'max': 11036.0, 'mean': 6016.9474, 'median': 6006.5}, 'sample_date': {'dtype': 'datetime64[ns]', 'null_count': 0, 'null_percentage': 0.0, 'unique_count': 965}, 'sample_name': {'dtype': 'object', 'null_count': 0, 'null_percentage': 0.0, 'unique_count': 24}, 'temperature': {'dtype': 'float64', 'null_count': 0, 'null_percentage': 0.0, 'unique_count': 645, 'min': 32.59, 'max': 52.69, 'mean': 37.051716, 'median': 37.02}, 're

In [9]:
# Generate LLM summary
print("ü§ñ GENERATING LLM SUMMARY...")
context_string = ingestion.metadata_to_context_string(metadata)
llm_summary = ingestion.generate_llm_summary(context_string)
print("üìù LLM SUMMARY:")
print(llm_summary)
print()

ü§ñ GENERATING LLM SUMMARY...
üìã Metadata JSON:
{
  "filename": "labverse_experiments_5k.xlsx",
  "filepath": "/Users/sid/Programming/LabVerse/test-notebooks/../test_dataset/labverse_experiments_5k.xlsx",
  "file_extension": ".xlsx",
  "file_size_bytes": 251637,
  "file_size_mb": 0.24,
  "modified_time": "2025-10-03T16:05:24.253615",
  "row_count": 5000,
  "column_count": 7,
  "column_names": [
    "experiment_id",
    "sample_date",
    "sample_name",
    "temperature",
    "result",
    "status",
    "notes"
  ],
  "column_info": {
    "experiment_id": {
      "dtype": "int64",
      "null_count": 0,
      "null_percentage": 0.0,
      "unique_count": 5000,
      "min": 1013.0,
      "max": 11036.0,
      "mean": 6016.9474,
      "median": 6006.5
    },
    "sample_date": {
      "dtype": "datetime64[ns]",
      "null_count": 0,
      "null_percentage": 0.0,
      "unique_count": 965
    },
    "sample_name": {
      "dtype": "object",
      "null_count": 0,
      "null_percentage

The dataset "labverse_experiments_5k.xlsx" contains 5,000 rows of experimental data aimed at analyzing various lab samples, with key columns including "experiment_id" (unique integers ranging from 1013 to 11036), "sample_date" (datetime entries), "sample_name" (categorical with 24 unique values), "temperature" (float values between 32.59 and 52.69 degrees), and "result" (float values from 45.71 to 79.53). Additional columns include "status" (categorical with 4 unique statuses) and "notes" (textual, with 10.2% null entries), providing insights into experimental outcomes and conditions. This structured dataset is designed for comprehensive analysis of sample performance and experimental conditions in a laboratory setting.

In [10]:
# Generate and store embedding
print("üî¢ EMBEDDING INFO:")
embedding_info = ingestion.embed_and_store(metadata)
print(f"  File ID: {embedding_info['file_id']}")
print(f"  Embedding dimension: {embedding_info['embedding_dim']}")
print(f"  Summary length: {embedding_info['context_length']} chars")
print(f"  Stored in Pinecone: {embedding_info['stored_in_pinecone']}")

üî¢ EMBEDDING INFO:
ü§ñ Generating LLM summary for labverse_experiments_5k.xlsx...
üìã Metadata JSON:
{
  "filename": "labverse_experiments_5k.xlsx",
  "filepath": "/Users/sid/Programming/LabVerse/test-notebooks/../test_dataset/labverse_experiments_5k.xlsx",
  "file_extension": ".xlsx",
  "file_size_bytes": 251637,
  "file_size_mb": 0.24,
  "modified_time": "2025-10-03T16:05:24.253615",
  "row_count": 5000,
  "column_count": 7,
  "column_names": [
    "experiment_id",
    "sample_date",
    "sample_name",
    "temperature",
    "result",
    "status",
    "notes"
  ],
  "column_info": {
    "experiment_id": {
      "dtype": "int64",
      "null_count": 0,
      "null_percentage": 0.0,
      "unique_count": 5000,
      "min": 1013.0,
      "max": 11036.0,
      "mean": 6016.9474,
      "median": 6006.5
    },
    "sample_date": {
      "dtype": "datetime64[ns]",
      "null_count": 0,
      "null_percentage": 0.0,
      "unique_count": 965
    },
    "sample_name": {
      "dtype": "

In [11]:
# Example 2: Process an entire directory
# Update this path to your data directory
data_directory = "path/to/your/data/directory"

# Process all files in the directory
results = ingestion.process_directory(data_directory, namespace="data-files")

# Display summary
print("\n" + "="*60)
print("SUMMARY")
print("="*60)
successful = [r for r in results if r['status'] == 'success']
failed = [r for r in results if r['status'] == 'failed']

print(f"‚úì Successfully processed: {len(successful)} files")
print(f"‚úó Failed: {len(failed)} files")

if successful:
    print("\nSuccessfully processed files:")
    for result in successful:
        meta = result['metadata']
        print(f"  - {meta['filename']}: {meta['row_count']} rows, {meta['column_count']} columns")


Directory path/to/your/data/directory does not exist

SUMMARY
‚úì Successfully processed: 0 files
‚úó Failed: 0 files


In [12]:
# Example 3: Search for files using semantic search
# Query for files that match a description
query = "files with glucose measurements and timestamps"

# Generate embedding for the query
query_embedding, _ = ingestion.embedding_service.embed_text(query)

# Search Pinecone for similar files
similar_files = ingestion.embedding_service.search_similar(query_embedding, top_k=5)

print(f"üîç SEARCH RESULTS for: '{query}'")
print("="*60)

for i, match in enumerate(similar_files, 1):
    print(f"\n{i}. Match Score: {match['score']:.4f}")
    meta = match['metadata']
    print(f"   Filename: {meta.get('filename', 'N/A')}")
    print(f"   Path: {meta.get('filepath', 'N/A')}")
    print(f"   Columns: {meta.get('column_names', 'N/A')}")
    print(f"   Rows: {meta.get('row_count', 'N/A')}")
    # Show LLM summary if available
    if 'llm_summary' in meta:
        print(f"   Summary: {meta['llm_summary']}")
    elif 'summary_preview' in meta:
        print(f"   Preview: {meta['summary_preview']}")


length of response 5
üîç SEARCH RESULTS for: 'files with glucose measurements and timestamps'

1. Match Score: 0.3835
   Filename: labverse_experiments_5k.xlsx
   Path: /Users/sid/Programming/LabVerse/test-notebooks/../test_dataset/labverse_experiments_5k.xlsx
   Columns: experiment_id,sample_date,sample_name,temperature,result,status,notes
   Rows: 5000.0
   Summary: The dataset "labverse_experiments_5k.xlsx" contains 5,000 rows of experimental data aimed at analyzing various lab samples, with key columns including "experiment_id" (unique integer identifiers), "sample_date" (datetime values), "sample_name" (categorical identifiers), "temperature" (float values ranging from 32.59 to 52.69¬∞C), "result" (float values between 45.71 and 79.53), "status" (categorical with four unique statuses), and "notes" (textual observations, with some missing entries). This structured dataset facilitates the evaluation of experimental outcomes and conditions, providing insights into sample performance