In [None]:

class AbstractModel(BaseModel):
    name: str
     
    @classmethod
    @property
    def entity_name(self):
        return "test"
    
    @classmethod
    @property
    def entity_namespace(cls):
        return "test"
    
    @classmethod
    @property
    def fullname(cls):
        return f'{cls.entity_name}/{cls.entity_namespace}'
    
    
    #create model, from data etc.
    

class AbstractContentModel(LanceModel, AbstractModel):
    """
    MyModel = AbstractContentModel(name='test', content='test', vector=nd.zeros(EmbeddingFunctions.openai.ndims()))
    
    """
    content: str = EmbeddingFunctions.openai.VectorField()
    vector: Vector(EmbeddingFunctions.openai.ndims()) = EmbeddingFunctions.openai.SourceField()
        
    

In [None]:


import os
import duckdb

class DuckDBClient:
    def __init__(self, **options):
        self._cursor = duckdb.connect()
        AWS_ACCESS_KEY_ID = os.environ["AWS_ACCESS_KEY_ID"]
        AWS_SECRET_ACCESS_KEY = os.environ["AWS_SECRET_ACCESS_KEY"]
        AWS_DEFAULT_REGION = os.environ["AWS_DEFAULT_REGION"]
        if AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY:
            creds = f"""
                SET s3_region='{AWS_DEFAULT_REGION}';
                SET s3_access_key_id='{AWS_ACCESS_KEY_ID}';
                SET s3_secret_access_key='{AWS_SECRET_ACCESS_KEY}';"""

        self._cursor.execute(
            f"""
            INSTALL httpfs;
            LOAD httpfs;
            {creds}
        """
        )

    def inspect_enums(
        self, uri, enum_threshold=200, max_str_length=100, omit_fields=None
    ):
        """
        inspect enums is used to send context to LLM
        dont use this if you have sensitive data in fields or add protection

        for example this can be used if we ask vague questions
        or questions that reference misspelled or alternately spelt data - the LLM can make sense of it

        this is probably necessary for SQL types to be useful but avoides sending too much data in context
        """
        df = self.execute(f"SELECT * FROM '{uri}'")

        def try_unique(c):
            try:
                # dont allow big strings (polars notation)
                l = df[c].str.lengths().mean()
                # filter by sending back max threshold in these cases
                if l > max_str_length or c in (omit_fields or []):
                    return enum_threshold
                # if we are happy, return the list of enumerated values for LLM context
                return len(df[c].unique())

            except:
                return enum_threshold

        columns = df.columns
        enum_types = [c for c in columns if try_unique(c) < enum_threshold]
        return {c: list(df[c].unique()) for c in df.columns if c in enum_types}

    def execute(self, query):
        """ """
        return self._cursor.execute(query).pl()

    def query_from_root(self, root):
        root = root.rstrip("/")
        if root[-1 * len(".parquet") :] != ".parquet":
            root += "/*.parquet"
        return _query(self, root)
    
class StoreSearchVector:
    pass

class QueryOptions(BaseModel):
    limit: int = 5
    probes: int: 20
    metric: str = 'l2'
    refine_Factor: int = 10
    columns: typing.List[str] = ['name', 'content']

class VectorStoreBase:
    """
    In funkyprompt we have a generic store that implements search semantics over stores
    """
    def __init__(self, model: AbstractContentModel, description:str, store_vector: StoreSearchVector=None):
        """
        **Args**
            model: A pydantic model that inheirts from a suitable schema aware base
            description: a description of the store that can be registered for search
            store_vector: a funcky vectorized description of how and when to use the store
        
        """
        self._model = model
        self._description = description
        self._store_vector = store_vector
        
        self._db_uri = f"{VECTOR_STORE_ROOT}/{self._model.entity_namespace}"
        self._table_uri = self._db_uri = f"{self._db_uri}/{self._model.entity_name}.lance"
        self._table = self._open_table()
        self._duck_client = DuckDBClient()
            
    def _get_lance_connection(self):
        #from env + do some s3 stuff
        #os.environ["AWS_ACCESS_KEY_ID"] = AWS_ACCESS_KEY_ID
        #os.environ["AWS_SECRET_ACCESS_KEY"] = AWS_SECRET_ACCESS_KEY
        #return lancedb.connect(root, region=os.environ.get("AWS_DEFAULT_REGION"))

        db = lancedb.connect(self._db_uri)
        return db

    def _open_table(self):
        db = self._get_lance_connection()
        name = self._model.entity_name
        if name in db:
            return db[name]
        self.register_store()
        return db.create_table(name, schema=self._model)
    
    def register_store(self):
        """
        upsert the description and components that we use to discover this store
        """
        pass
    
    
    def query_dataset(self, query):
        dataset = lance.dataset(self._table_uri)
        return self._duck_client.execute(query)

    def load(self, limit=None):
        """
        returns the polars data for the records
        """
        dataset = lance.dataset(self._table_uri)
        #logger.debug(f"Fetching from {self._table_uri}")
        limit = f"LIMIT {limit}" if limit else ""
        return self._duck_client.execute(f"SELECT * FROM dataset {limit}")
    
    def add(self, records: typing.List[AbstractContentModel], key_field=None):
        """
        Add record(s) to the store using the correct schema
        
        **Args**
           records: Alist of abstract entities (or dicts taht confirm to that schema)
           key_field: The pyantic type either by default or config should define the key field for upsert or it can be passed in here
        """
        if not isinstance(records,list):
            records = [records]
            
    
    def run_search(self, 
                   questions: typing.List[str], 
                   since_date=None, 
                   #lance db settings + our own system predicates including probe, limit
                   query_options: = QueryOptions(), 
                   #add schema predicates for IN and = to matches for simple filtering
                   **predicates):
        """
        the search exposed to an agent
        aribtrary predicates can be passed if the schema of the store is known but usually it will not be known to the agent
        the agent should focus on text and possible date related queries
        
        **Args**
           questions: one or more questions to ask the store. Recommend full sentences.
           since_date: for restricting vector searches post some date
           query_options: underlying query options - usually the defaults are sufficient      
        """
        pass
    
    def plot(self, **options):
        pass


store = VectorStoreBase(AbstractContentModel, description="A store for search for some things")

In [None]:
store.load()