In [2]:
# Import necessary libraries
import sys
from pathlib import Path
import json
import re
import sqlparse
import numpy as np
import hashlib
from typing import List
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain.embeddings.base import Embeddings

In [4]:
class SimpleHashEmbeddings(Embeddings):
    """Simple deterministic hash-based embeddings"""
    
    def __init__(self, dimension: int = 384):
        self.dimension = dimension
    
    def _hash_text(self, text: str) -> List[float]:
        """Convert text to a deterministic vector using hash"""
        hash_object = hashlib.sha256(text.encode())
        hash_hex = hash_object.hexdigest()
        
        float_array = []
        for i in range(0, len(hash_hex), 8):
            chunk = hash_hex[i:i+8]
            float_val = int(chunk, 16) / 2**32 - 1
            float_array.append(float_val)
        
        array = np.array(float_array, dtype=np.float32)
        
        if len(array) < self.dimension:
            array = np.pad(array, (0, self.dimension - len(array)))
        else:
            array = array[:self.dimension]
            
        norm = np.linalg.norm(array)
        if norm > 0:
            array = array / norm
            
        return array.tolist()
    
    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        """Embed a list of texts"""
        return [self._hash_text(text) for text in texts]
    
    def embed_query(self, text: str) -> List[float]:
        """Embed a single text"""
        return self._hash_text(text)


In [5]:
class ChessPackageManager:
    """Specialized manager for chess engine packages"""
    
    def __init__(self, base_path="documents"):
        self.base_path = Path(base_path)
        self.packages_path = self.base_path / "packages"
        self.metadata_path = self.base_path / "metadata"
        
    def validate_paths(self):
        """Validate that required paths exist"""
        if not self.packages_path.exists():
            raise ValueError(f"Packages directory not found: {self.packages_path}")
        if not self.metadata_path.exists():
            print(f"Creating metadata directory: {self.metadata_path}")
            self.metadata_path.mkdir(parents=True, exist_ok=True)
    
    def load_metadata(self):
        """Load chess package metadata"""
        metadata_file = self.metadata_path / "package_info.json"
        if metadata_file.exists():
            try:
                with open(metadata_file, 'r') as f:
                    return json.load(f)
            except json.JSONDecodeError as e:
                print(f"Error reading metadata file: {e}. Using empty metadata.")
                return {}
        return {}

    def get_package_dependencies(self):
        """Extract package dependencies from metadata"""
        metadata = self.load_metadata()
        dependencies = {}
        if 'chess' in metadata and 'packages' in metadata['chess']:
            for pkg, info in metadata['chess']['packages'].items():
                dependencies[pkg] = info.get('dependencies', [])
        return dependencies

    def get_package_details(self):
        """Get detailed information about each package"""
        metadata = self.load_metadata()
        if 'chess' in metadata and 'packages' in metadata['chess']:
            return metadata['chess']['packages']
        return {}

    def get_plsql_files(self):
        """Get all PL/SQL files in the packages directory"""
        return list(self.packages_path.glob("*.pk[sb]"))

In [6]:
class PLSQLParser:
    """Enhanced parser for PL/SQL chess packages"""
    
    @staticmethod
    def parse_package(content):
        """Parse package content and extract chess-specific metadata"""
        content = content.strip()
        
        package_match = re.search(r'package\s+(\w+)', content, re.IGNORECASE)
        package_name = package_match.group(1) if package_match else "Unknown"
        
        routines = []
        procedure_matches = re.finditer(
            r'(procedure|function)\s+(\w+)[^;]*?(\(.*?\))?\s*(return\s+\w+)?',
            content,
            re.IGNORECASE | re.DOTALL
        )
        
        for match in procedure_matches:
            routine_type = match.group(1)
            routine_name = match.group(2)
            parameters = match.group(3) or ""
            return_type = match.group(4) or ""
            
            category = "general"
            if any(term in routine_name.lower() for term in ['move', 'position', 'piece']):
                category = "move_generation"
            elif any(term in routine_name.lower() for term in ['eval', 'score', 'value']):
                category = "evaluation"
            elif any(term in routine_name.lower() for term in ['fen', 'pgn', 'notation']):
                category = "notation"
            
            routines.append({
                "type": routine_type,
                "name": routine_name,
                "parameters": parameters.strip("()"),
                "return_type": return_type.replace("return ", "") if return_type else None,
                "category": category
            })
        
        return {
            "package_name": package_name,
            "routines": routines,
            "content": content
        }


In [7]:
def load_and_process_documents():
    """Load and process all documents"""
    chess_manager = ChessPackageManager()
    chess_manager.validate_paths()
    
    documents = []
    metadata = []
    
    package_details = chess_manager.get_package_details()
    dependencies = chess_manager.get_package_dependencies()
    plsql_files = chess_manager.get_plsql_files()
    
    print(f"Found {len(plsql_files)} PL/SQL files")
    
    for file_path in plsql_files:
        print(f"Processing {file_path.name}...")
        
        # Try different encodings
        encodings = ['latin1', 'cp1252', 'iso-8859-1', 'utf-8']
        content = None
        
        for encoding in encodings:
            try:
                with open(file_path, 'r', encoding=encoding) as f:
                    content = f.read()
                    print(f"Successfully read with {encoding} encoding")
                    break
            except UnicodeDecodeError:
                continue
        
        if content is None:
            print(f"Could not read {file_path.name} with any supported encoding")
            continue
            
        try:
            parsed = PLSQLParser.parse_package(content)
            package_name = parsed["package_name"]
            
            if package_name.lower() in package_details:
                parsed.update(package_details[package_name.lower()])
            
            parsed["dependencies"] = dependencies.get(package_name.lower(), [])
            
            formatted_content = sqlparse.format(
                content,
                reindent=True,
                keyword_case='upper'
            )
            
            doc = Document(
                page_content=content,
                metadata={
                    "package_name": parsed["package_name"],
                    "routines": parsed["routines"],
                    "purpose": parsed.get("purpose", ""),
                    "dependencies": parsed["dependencies"],
                    "file_type": file_path.suffix,
                    "formatted_content": formatted_content,
                    "source": str(file_path),
                    "file_name": file_path.name
                }
            )
            documents.append(doc)
            metadata.append(parsed)
            
        except Exception as e:
            print(f"Error parsing {file_path.name}: {str(e)}")
    
    print(f"Successfully loaded {len(documents)} PL/SQL packages")
    return documents, metadata


In [17]:
import datetime


def create_vectorstore(documents):
    """Create and save the vector store"""
    print("Starting document processing...")
    print(f"Number of documents to process: {len(documents)}")
    
    # Initialize text splitter
    print("Initializing text splitter...")
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=500,
        chunk_overlap=50,
        length_function=len,
        separators=["\n\n", "\n", " ", ""]
    )
    
    # Split documents
    print("Splitting documents...")
    splits = text_splitter.split_documents(documents)
    print(f"Created {len(splits)} splits from the documents")
    
    # Initialize embeddings
    print("Initializing embeddings...")
    embeddings = SimpleHashEmbeddings(dimension=384)
    
    # Create vector store
    print("Creating FAISS vector store...")
    vectorstore = FAISS.from_documents(splits, embeddings)
    
    # # Save vector store
    # print("Saving vector store...")
    # vectorstore.save_local("vectorstore")
    
    # # Save metadata
    # print("Saving metadata...")
    # with open("chess_metadata.json", "w") as f:
    #     json.dump([doc.metadata for doc in documents], f, indent=2)

    # In your document_preprocessor.ipynb
    vectorstore.save_local("../data/vectorstore")

    # Save metadata
    with open("../data/metadata/chess_metadata.json", "w") as f:
        json.dump({
            "packages": metadata,
            "total_documents": len(documents),
            "creation_date": datetime.datetime.now().isoformat()
        }, f, indent=2)
    
    print("Processing complete!")
    return vectorstore

In [18]:
print("Starting document processing pipeline...")

Starting document processing pipeline...


In [19]:
documents, metadata = load_and_process_documents()
print("\nDocument Loading Complete!")


Found 7 PL/SQL files
Processing pl_pig_chess_interface.pkb...
Successfully read with latin1 encoding
Processing pl_pig_chess_engine.pks...
Successfully read with latin1 encoding
Processing pl_pig_chess_engine_eval.pkb...
Successfully read with latin1 encoding
Processing pl_pig_chess_data.pks...
Successfully read with latin1 encoding
Processing pl_pig_chess_engine.pkb...
Successfully read with latin1 encoding
Processing pl_pig_chess_interface.pks...
Successfully read with latin1 encoding
Processing pl_pig_chess_engine_eval.pks...
Successfully read with latin1 encoding
Successfully loaded 7 PL/SQL packages

Document Loading Complete!


In [20]:
vectorstore = create_vectorstore(documents)
print("\nVector Store Creation Complete!")


Starting document processing...
Number of documents to process: 7
Initializing text splitter...
Splitting documents...
Created 1665 splits from the documents
Initializing embeddings...
Creating FAISS vector store...
Processing complete!

Vector Store Creation Complete!


In [10]:
# Save the metadata separately for easy access
import datetime


with open("chess_metadata.json", "w") as f:
    json.dump({
        "packages": metadata,
        "total_documents": len(documents),
        "creation_date": datetime.datetime.now().isoformat()
    }, f, indent=2)
print("\nMetadata Saved!")


Metadata Saved!


In [11]:
# Test the vector store
test_embeddings = SimpleHashEmbeddings(dimension=384)
loaded_vectorstore = FAISS.load_local("vectorstore", test_embeddings)
print("\nVector Store Loading Test Complete!")


Vector Store Loading Test Complete!


In [13]:
# Test a simple query
test_query = "How does move generation work?"
docs = loaded_vectorstore.similarity_search(test_query, k=2)
print("\nTest Query Results:")
for i, doc in enumerate(docs):
    print(f"\nResult {i+1}:")
    print(f"Package: {doc.metadata['package_name']}")
    print(f"File: {doc.metadata['file_name']}")

print("\nProcessing Pipeline Complete!")



Test Query Results:

Result 1:
Package: BODY
File: pl_pig_chess_engine_eval.pkb

Result 2:
Package: PL_PIG_CHESS_DATA
File: pl_pig_chess_data.pks

Processing Pipeline Complete!
