Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add dynamic model creation #43

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

watkinsm
Copy link

✨ Add dynamic model creation

Enables users to dynamically create sqlmodel models, similar to Pydantic's create_model() method (https://github.com/samuelcolvin/pydantic/blob/5261fd05a0374b84ce2602d45990baf480fa2417/pydantic/main.py#L927-L984)

This can be helpful in cases where a model's fields are not known ahead of time and thus cannot be written directly into the source code, for example in a GUI application with a visual table builder

sqlmodel/main.py Outdated Show resolved Hide resolved
@shelper
Copy link

shelper commented Sep 17, 2021

@watkinsm i wonder why we cannot just just the create_model() from pydantic?
SQLModel is derived from pydantic BaseModel, would it makes more sense if we create a method to

  1. using pydantic create_model() to create pydantic model
  2. convert pydantic model to SQLModel

@shelper
Copy link

shelper commented Sep 17, 2021

@watkinsm i wonder why we cannot just just the create_model() from pydantic?
SQLModel is derived from pydantic BaseModel, would it makes more sense if we create a method to

  1. using pydantic create_model() to create pydantic model
  2. convert pydantic model to SQLModel

nevermind, doesn't seem to be a good way of doing this, just figured that python class instance conversion is not easy

@aghanti7
Copy link

Linking to my comment from another issue I had raised #50 (comment)

I was able to achieve this with the below workaround.

class UserActivityBase(SQLModel):
    # One table per user
    # Created first time when a user logs in perhaps
    created_ts: datetime.datetime = Field(primary_key=True)
    activity_type: str
    activity_value: float


async def create_user_activity_model(uuid: str, engine: AsyncEngine):
    class UserActivity(UserActivityBase, table=True):
        __tablename__ = 'user_activity_' + uuid
        __table_args__ = {'schema': 'user_schema',
                                       'keep_existing': True}

    # Create the table if needed
    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)
    return UserActivity


class UserActivityCrud(UserActivityBase):
    pass


async def get_user_activity_model(uuid: str):
    class UserActivityCrud(UserActivityBase, table=True):
        __tablename__ = 'user_activity_' + uuid
        __table_args__ = {'schema': 'user_schema',
                                       'keep_existing': True}

    return UserActivityCrud

UserActivityBase is the base SQLModel. When you want to create the table for a new user, you can call create_user_activity_model() with the user's uuid and the engine instance. Later, when you need to get the model for a particular user, just call get_user_activity_model(uuid=<uuid>).

The only thing I am facing is a warning. When you call get_user_activity_model more than once, the below warning is thrown.

/Users/ghanti/code/proj/venv/lib/python3.9/site-packages/sqlmodel/main.py:367: SAWarning: This declarative base already contains a class with the same class name and module name as app.models.UserActivityCrud, and will be replaced in the string-lookup table.

I feel this can be ignored. @tiangolo can you please confirm?

@Insighttful
Copy link

Insighttful commented Nov 5, 2022

If an existing database has already been reflected into MetaData, how can create_model be used to dynamically create a SQLModel model?

from sqlmodel import Field, Session, SQLModel, create_engine, MetaData, select, create_model

engine = create_engine(engine_url)
meta = MetaData()
meta.reflect(bind=engine)

# meta for a specific table to be dynamically created
meta_test_table = meta.tables['test_table']

@Insighttful
Copy link

Relevant: How to dynamically define an SQLModel class

The create_model function has the optional base parameter (as mentioned in the docs), which accepts any subclass (or sequence of subclasses) of the Pydantic BaseModel. The SQLModel base class happens to directly inherit from BaseModel and can thus be passed here.

However, this is not sufficient to have a model that maps to a table. The SQLModelMetaclass requires table=True to be passed as a keyword argument during subclassing of SQLModel. Luckily, there is a solution for this built into Pydantic as well.

While this is mentioned nowhere on Pydantic's documentation website, the create_model function (source here) has a cls_kwargs parameter for being able to pass arbitrary keyword arguments to the metaclass during class creation.

These two components, together with the actual field definitions, are actually all we need to dynamically create our ORM class.

@Insighttful
Copy link

The following is a proof of concept that dynamically creates SQLModel models in-memory or to the file system which leverages sqlacodegen and sqlalchemy under the hood.

Installs

pip install sqlmodel
pip install --pre sqlacodegen
OR
poetry add sqlmodel
poetry add sqlacodegen@3.0.0rc2

Usage

db_url="dialect+driver://user:password@host:port/database"
modeler = ModelSql(db_url=db_url)
models = modeler.get_models()  # usable models in-memory
modeler.save()  # usable models via an import

Code

import inspect
import importlib
import subprocess
from pathlib import Path
import importlib.metadata
from typing import List, Optional, Dict, Any, Type
from packaging.version import parse as parse_version


class ModelSql:
    """
    A wrapper class for dynamically generating in-memory models from a database using SQLModel and sqlacodegen.
    Please reference sqlacodegen and sqlalchemy for in-depth documentation.

    Sqlacodegen is in need of help. It's a great tool, please consider lending a hand to the project:
    https://github.com/agronholm/sqlacodegen

    Args:
        db_url (Optional[str]): The database URL. Defaults to None.
        generator (str): The code generator to use. Defaults to "sqlmodels".
        options (Optional[List[str]]): Additional options for the code generator. Defaults to None.
        only (Optional[List[str]]): List of table names to include. Defaults to None.

    Example:
        > db_url="dialect+driver://user:password@host:port/database"
        > modeler = ModelSql(db_url=db_url)
        > models = modeler.get_models()
        > modeler.save()

    TODO: Implement support for other code generators supported by sqlacodegen.
    """

    def __init__(
        self,
        db_url: Optional[str] = None,
        generator: str = "sqlmodels",
        options: Optional[List[str]] = None,
        only: Optional[List[str]] = None,
    ) -> None:
        self.db_url = db_url
        self.generator = generator
        self.options = options
        self.only = only
        self.model_base_cls = None
        self.code = ""
        self.models: Dict[str, Any] = {}
        self._workflow()

    def _workflow(self) -> None:
        """
        Execute the workflow for generating models from the database.
        """
        self._check_min_module_version("sqlacodegen", "3.0.0rc2")

        if self.generator == "sqlmodels":
            self._check_min_module_version("sqlmodel", "0.0.8")
            module = importlib.import_module("sqlmodel")
            self.model_base_cls = getattr(module, "SQLModel")

        self._generate_code()

        if self.generator == "sqlmodels":
            self._compile_models()

    def _generate_code(self) -> None:
        """
        Generate the code using sqlacodegen.
        """
        cmd = ["sqlacodegen", "--generator", self.generator, self.db_url]

        if self.options:
            cmd.extend(["--option", option] for option in self.options)

        if self.only:
            cmd.extend(["--tables", ",".join(self.only)])

        process = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True)
        stdout, _ = process.communicate()
        self.code = stdout

    def _compile_models(self) -> None:
        """
        Compile the generated code and populate the models dictionary.
        """
        compiled_code = compile(self.code, "<string>", "exec")
        module_dict: Dict[str, Any] = {}
        exec(compiled_code, module_dict)
        self.models = module_dict

    @staticmethod
    def _check_min_module_version(module_name: str, min_version: str) -> None:
        """
        Checks if the specified module has a minimum required version.

        Args:
            module_name (str): The name of the module to check.
            min_version (str): The minimum required version in string format.

        Raises:
            ValueError: If the module version is lower than the minimum required version.
            ModuleNotFoundError: If the module is not installed.
            RuntimeError: If an error occurs while checking the module version.
        """
        try:
            module_version = importlib.metadata.version(module_name)
            if parse_version(module_version) < parse_version(min_version):
                raise ValueError(
                    f"{module_name} version {min_version} or greater is required, but found version {module_version}."
                )
        except importlib.metadata.PackageNotFoundError as e:
            raise ModuleNotFoundError(f"{module_name} is not installed.") from e
        except Exception as e:
            raise RuntimeError(
                f"An error occurred while checking the version of {module_name}: {str(e)}"
            ) from e

    def get_models(self, only: Optional[List[str]] = None) -> List[Type[Any]]:
        """
        Get a list of generated models.

        Args:
            only (Optional[List[str]]): List of model names to include. Defaults to None.

        Returns:
            List[Type[Any]]: List of generated model classes.
        """
        if only is None:
            return [
                model_cls
                for model_cls in self.models.values()
                if inspect.isclass(model_cls) and issubclass(model_cls, self.model_base_cls)
            ]
        else:
            return [
                model_cls
                for model_cls in self.models.values()
                if (
                    inspect.isclass(model_cls)
                    and issubclass(model_cls, self.model_base_cls)
                    and model_cls.__name__ in only
                )
            ]

    def save(
        self,
        filename: str = "models",
        to_path: Optional[str] = None
    ) -> None:
        """
        Save the generated models to files.

        Args:
            filename (str): Name of the file. Defaults to "models" (".py" extension will be added if not present).
            to_path (Optional[str]): Path to save the file. Defaults to None (current working directory).
        """
        to_path = Path(to_path) if to_path else Path.cwd()

        filename += "" if filename.endswith(".py") else ".py"

        with open(to_path / filename, "w") as file:
            file.write(self.code)

Ref my SO question and answer here: https://stackoverflow.com/a/76523940/1663382

@tiangolo tiangolo added the feature New feature or request label Oct 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature or request investigate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants