Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC SQL lobe #44

Open
andre-senna opened this issue Jan 22, 2024 · 4 comments
Open

PoC SQL lobe #44

andre-senna opened this issue Jan 22, 2024 · 4 comments
Assignees
Labels
EPIC Public SHould be cc'ed to the public board

Comments

@andre-senna
Copy link
Contributor

andre-senna commented Jan 22, 2024

@andre-senna
Copy link
Contributor Author

andre-senna commented Jun 7, 2024

After looking at the preliminary results and many internal design discussions, we came up with some design notes that need to be observed by the general Lobe AtomDBs design. This is not a detailed design yet.

Different users of Lobe-related classes

Firstly, we need to notice that Lobe is intended to be a subclass of AtomDB. It means its API is supposed to be used only internally by DAS components. There's no API to be used by final users of DAS, which will take advantage of Lobe features exclusively via DAS public API. After this consideration, we can list the users of Lobe API:

(1) Final DAS user (MeTTa or Python program using DAS libraries)

This user will not access Lobe (and related classes) API directly. It will use them indirectly by instantiating a remote DAS pointing to an endpoint where a Lobe is living.

das = DistributedAtomSpace(query_engine='remote', host=my.postgres.server, port=5432)

Any queries are made in the same way they are done using regular Redis-Mongo servers. Actually, one of the goals of the Lobe design is exactly to allow it to be queried exactly in the same way any DAS server is queried. Some overhead is expected, since the user is supposed to know that the remote DAS being connected is a Lobe, not a plain DAS, but the query calls should be the same. Another important difference is that Lobes are mainly read-only DASs.

(2) Openfaas starting up a Lobe which has already been indexed

This is like we currently do to start up a plain DAS using already loaded MongoDB and Redis. A function in Openfaas will just instantiate a local DAS of type `postgress_lobe' passing all required parameters.

das = DistributedAtomSpace(
              atom_db='postgres_lobe',
              query_engine='local', 
              postgress_host=my.postgres.server, 
              postgres_port=5432, 
              redis_hostname=...,
              ...)

There may be other required parameters. The main point to focus here is that all indexing and DB loading is already done when this constructor is called. So Lobe-related classes will assume that everything required to process queries is already loaded in Redis (and eventually another DB). And, of course, in the original Postgress server.

(3) Sysadmin indexing or loading an already computed index into Lobe's DBs.

This is like we currently do using das-cli metta load myfile.metta to load a MeTTa file into the DBs which will be attached to a DAS server a posteriori. We will also use das-cli to make all operations related to Lobes (indexing, loading of index data into the DBs, etc).

$ das-cli postgress load --host x.x.x.x --port 5432

This command will be implemented to be orthogonal with current call to load MeTTa files. The idea is to use the mapping SQL->Nodes/Links to actually create the Redis and MongoDB to be used as a plain DAS server. This would work like making a conversion of the SQL database into DAS' nodes and links. Notice that the DAS which will actually use the resulting DBs is not a Lobe, but a regular Redis-Mongo DAS.

$ das-cli postgress-lobe index --host x.x.x.x --port 5432 --index-path dir/to/index/files/

This command will use the mapping SQL->Nodes/Links to create all the index files required to feed the Lobe's Redis.

$ das-cli postgress-lobe load --host x.x.x.x --port 5432 --index-path dir/to/index/files/

Thyis command will look for the index files in the passed path and load them into Lobe's Redis. If the index files are not created yet, than the command should create them just like the command above.

Main entry-points of Lobe-related classes

In order to provide all the functionalities listed above, we need to have entry-points in Lobe-related classes. This is a first suggestion for them.

We don't need to make anything special in Lobe related classes in order to provide what's required by use cases for user (1), since this user will not treat its remote Lobe DAS differently from plain remote DAS. To cover use-cases for user (2), we just need to make Lobe inherit from AtomDB and implement all the required methods. The use-cases for user (3) require the implementation of executable scripts to make the required processing. If we follow the design we currently have for MeTTa knowledge bases, we are supposed to make all this processing (indexing, DB loading, etc) outside the classes used by (1) and (2).

class PostgresLobeBuilder:

        @staticmethod
        def index(host, port, mapper, index_path):
                ...

        @staticmethod
        def load(host, port, mapper, index_path):
                ...

PostgresLobeBuilder has only two static methods in its public API. load() may check for a flag file in the passed path to decide if there are a set of index files ready there. If yes, it just need to load the index into Redis. Otherwise, any (supposedly incomplete) files are deleted and index() is called to create the index files.

mapper is a class which knows the rules to convert from SQL tables to DAS' nodes and links.

@dataclass
class SQLTable:
    row_count: int
    table_reference: str
    table_object: Any
class SQLMapper(ABC):

    def __init__(host, port):
        self.host = host
        self.port = port
        db = self.connect(host, port)
        # Return a dict of SQLTable objects indexed by a reference string
        self.tables = self.list_tables(db)

    def table_mapping(table_reference) -> List[Dict[str, any]]:
        db = self.connect(host, port)
        for atom_list in self.row_mapping_iterator(db, table_reference):
            yield atom_list
        self.disconnect(db)

    @abstractmethod
    def connect(host, port) -> Any:
        pass

    @abstractmethod
    def disconnect(db: Any) -> Any:
        pass

    @abstractmethod
    def list_tables(host, port) -> Dict[str, SQLTable]:
        pass

    @abstractmethod
    def row_mapping_iterator(db: Any, table_reference: str) -> List[Dict[str, any]]:
        pass

The first concrete implementation of SQLMapper would be PostgresMapper:

class PostgresMapper(SQLMapper):

    def connect(host, port) -> Any:
        ...

    def disconnect(db: Any) -> Any:
        ...

    def list_tables(host, port) -> Dict[str, SQLTable]:
        ...

    def row_mapping_iterator(db: Any) -> List[Dict[str, any]]:
        # Iterate through each row of the passed table and return (via 'yield')
        # the list of mapped atoms.

To use SQLMapper, the caller would need to instantiate a concrete mapper and iterate through the tables to decide which ones should be mapped. The mapping results should then be processed properly. The way to use mapper is something like this:

mapper = PostgresMapper(host, port)
for table_reference in mapper.tables():
        # table size and name (reference) can be used to decide which tables to process
        for atom_list in mapper.table_mapping(table_reference):
            # atom_list contains a list of dicts defining atoms.
            # do something with atom_list

Inside PostgresLobeBuilder.index(), the mapper can be used to iterate through table rows in order to build the index properly.

Lobe class itself

With index() and load() implemented in a separate class, Lobe become a simpler class. It may grow as we get a better understanding of the problems but initially it wouldn't need to use the mapper, for instance. Actually it's unclear if we need to have specialization of Lobe (like PostgresLobe) because maybe everything the Lobe needs to answer queries is already in the Redis server. We can start by trying to implement all the abstract methods in AtomDB and eventually we change the design to implement some of these methods inside a subclass, in the case some information from the mapper, for instance, proves to be required.

@marcocapozzoli
Copy link
Collaborator

After design discussions, the design part related to mapping will be like this

class MappingType(Enum):
    SQL2Metta = "sql2metta"
    SQL2Atomese = "sql2atomese"


class IMapping(Protocol):
    @abstractmethod
    def map(self, data: dict) -> List[dict]: ...


class SQL2MettaMapping:
    def map(self, data: dict) -> List[dict]: ...


class SQL2AtomeseMapping:
    def map(self, data: dict) -> List[dict]: ...


@dataclass
class SQLTable:
    row_count: int
    table_reference: str
    table_object: Any


class DataMapper(ABC):
    def __init__(self, mapping: IMapping, **kwargs) -> None:
        self.db_client = self.connect(**kwargs)
        self.mapping = mapping

    @abstractmethod
    def connect(self, **kwargs) -> Any: ...
    
    @abstractmethod
    def disconnect(self) -> None: ...


class RelacionalDataMapper(DataMapper, ABC):
    def __init__(self, mapping_type: MappingType, **kwargs) -> None:
        mapping = self.create_mapping(mapping_type)
        super().__init__(mapping=mapping, **kwargs)

    def create_mapping(self, mapping_type: MappingType) -> IMapping:
        if mapping_type == MappingType.SQL2Atomese:
            return SQL2AtomeseMapping()
        elif mapping_type == MappingType.SQL2Metta:
            return SQL2MettaMapping()
        else:
            raise ValueError("Unknown mapping")

    def table_mapping(self, table_reference: str) -> List[dict]:
        for atom_list in self.row_mapping_iterator(table_reference):
            yield atom_list
        self.disconnect()

    @abstractmethod
    def list_tables(self) -> List[SQLTable]: ...

    @abstractmethod
    def row_mapping_iterator(table_reference: str) -> List[dict]: ...


class PostgresMapper(RelacionalDataMapper):
    def __init__(self, database, host, port, user=None, password=None, *, mapping_type: MappingType) -> None:
        kwargs = {'database': database, 'host': host, 'port': port, 'user': user, 'password': password}
        super().__init__(mapping_type=mapping_type, **kwargs)
   
    def connect(self, **kwargs) -> PostgresClient: ...
    
    def disconnect(self) -> None: ...

    def list_tables(self) -> List[SQLTable]: ...

    def row_mapping_iterator(table_reference: str) -> List[dict]: ...


# USAGE

mapper = PostgresMapper(database='mydb', host='localhost', port=5432, mapping_type=MappingType.SQL2Metta)


for table in mapper.list_tables():
    table_reference = table.table_reference
    # some logic here to decide which tables to map
    for atom_list in mapper.table_mapping(table_reference):
        # atom_list contains a list of dicts defining atoms.
        # do something with atom_list

@andre-senna
Copy link
Contributor Author

I'm OK with this design. I just suggest changing "IMapping" by "Mapping". And since you added an error message to the suggested design I need to comment on it. Please add a more useful error message. For instance, you may list the mistyped value together with all the possible values or you may tip the user to use the enum class.

@andre-senna
Copy link
Contributor Author

@marcocapozzoli
Why IMapping inherits from Protocol? We don't use Protocol in other parts of the code, so why using it specifically here?

@andre-senna andre-senna transferred this issue from singnet/das-query-engine Jun 28, 2024
@andre-senna andre-senna added the Public SHould be cc'ed to the public board label Jun 28, 2024
@andre-senna andre-senna added this to the PoC of a Postgres Lobe milestone Jun 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
EPIC Public SHould be cc'ed to the public board
Projects
Status: In Progress
Development

No branches or pull requests

2 participants