Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run all ML handlers methods via queue #8645

Merged
merged 24 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 71 additions & 87 deletions mindsdb/api/executor/command_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
get_predictor_integration,
)
from mindsdb.interfaces.query_context.context_controller import query_context_controller
from mindsdb.interfaces.storage.model_fs import HandlerStorage
from mindsdb.interfaces.triggers.triggers_controller import TriggersController
from mindsdb.utilities.context import context as ctx
from mindsdb.utilities.functions import mark_process, resolve_model_identifier
Expand Down Expand Up @@ -172,13 +171,13 @@ def execute_command(self, statement):
sql = statement.to_string()
sql_lower = sql.lower()

if type(statement) == CreateDatabase:
if type(statement) is CreateDatabase:
return self.answer_create_database(statement)
elif type(statement) == CreateMLEngine:
elif type(statement) is CreateMLEngine:
return self.answer_create_ml_engine(statement)
elif type(statement) == DropMLEngine:
elif type(statement) is DropMLEngine:
return self.answer_drop_ml_engine(statement)
elif type(statement) == DropPredictor:
elif type(statement) is DropPredictor:
database_name = self.session.database
if len(statement.name.parts) > 1:
database_name = statement.name.parts[0].lower()
Expand All @@ -191,18 +190,18 @@ def execute_command(self, statement):
if not statement.if_exists:
raise e
return ExecuteAnswer(ANSWER_TYPE.OK)
elif type(statement) == DropTables:
elif type(statement) is DropTables:
return self.answer_drop_tables(statement)
elif type(statement) == DropDatasource or type(statement) == DropDatabase:
elif type(statement) is DropDatasource or type(statement) is DropDatabase:
return self.answer_drop_database(statement)
elif type(statement) == Describe:
elif type(statement) is Describe:
# NOTE in sql 'describe table' is same as 'show columns'
return self.answer_describe_predictor(statement)
elif type(statement) == RetrainPredictor:
elif type(statement) is RetrainPredictor:
return self.answer_retrain_predictor(statement)
elif type(statement) == FinetunePredictor:
elif type(statement) is FinetunePredictor:
return self.answer_finetune_predictor(statement)
elif type(statement) == Show:
elif type(statement) is Show:
sql_category = statement.category.lower()
if hasattr(statement, "modes"):
if isinstance(statement.modes, list) is False:
Expand Down Expand Up @@ -534,7 +533,7 @@ def execute_command(self, statement):
RollbackTransaction,
):
return ExecuteAnswer(ANSWER_TYPE.OK)
elif type(statement) == Set:
elif type(statement) is Set:
category = (statement.category or "").lower()
if category == "" and isinstance(statement.name, Identifier):
param = statement.name.parts[0].lower()
Expand Down Expand Up @@ -584,7 +583,7 @@ def execute_command(self, statement):
f"SQL statement is not processable, return OK package: {sql}"
)
return ExecuteAnswer(ANSWER_TYPE.OK)
elif type(statement) == Use:
elif type(statement) is Use:
db_name = statement.value.parts[-1]
self.change_default_db(db_name)
return ExecuteAnswer(ANSWER_TYPE.OK)
Expand All @@ -593,11 +592,11 @@ def execute_command(self, statement):
CreateAnomalyDetectionModel, # we may want to specialize these in the future
):
return self.answer_create_predictor(statement)
elif type(statement) == CreateView:
elif type(statement) is CreateView:
return self.answer_create_view(statement)
elif type(statement) == DropView:
elif type(statement) is DropView:
return self.answer_drop_view(statement)
elif type(statement) == Delete:
elif type(statement) is Delete:
if statement.table.parts[-1].lower() == "models_versions":
return self.answer_delete_model_version(statement)
if (
Expand All @@ -611,69 +610,69 @@ def execute_command(self, statement):
SQLQuery(statement, session=self.session, execute=True)
return ExecuteAnswer(ANSWER_TYPE.OK)

elif type(statement) == Insert:
elif type(statement) is Insert:
SQLQuery(statement, session=self.session, execute=True)
return ExecuteAnswer(ANSWER_TYPE.OK)
elif type(statement) == Update:
elif type(statement) is Update:
if statement.from_select is None:
if statement.table.parts[-1].lower() == "models_versions":
return self.answer_update_model_version(statement)

SQLQuery(statement, session=self.session, execute=True)
return ExecuteAnswer(ANSWER_TYPE.OK)
elif (
type(statement) == Alter
type(statement) is Alter
and ("disable keys" in sql_lower)
or ("enable keys" in sql_lower)
):
return ExecuteAnswer(ANSWER_TYPE.OK)
elif type(statement) == Select:
elif type(statement) is Select:
if statement.from_table is None:
return self.answer_single_row_select(statement)
query = SQLQuery(statement, session=self.session)
return self.answer_select(query)
elif type(statement) == Union:
elif type(statement) is Union:
query = SQLQuery(statement, session=self.session)
return self.answer_select(query)
elif type(statement) == Explain:
elif type(statement) is Explain:
return self.answer_show_columns(statement.target)
elif type(statement) == CreateTable:
elif type(statement) is CreateTable:
# TODO
return self.answer_apply_predictor(statement)
# -- jobs --
elif type(statement) == CreateJob:
elif type(statement) is CreateJob:
return self.answer_create_job(statement)
elif type(statement) == DropJob:
elif type(statement) is DropJob:
return self.answer_drop_job(statement)
# -- triggers --
elif type(statement) == CreateTrigger:
elif type(statement) is CreateTrigger:
return self.answer_create_trigger(statement)
elif type(statement) == DropTrigger:
elif type(statement) is DropTrigger:
return self.answer_drop_trigger(statement)
# -- chatbots
elif type(statement) == CreateChatBot:
elif type(statement) is CreateChatBot:
return self.answer_create_chatbot(statement)
elif type(statement) == UpdateChatBot:
elif type(statement) is UpdateChatBot:
return self.answer_update_chatbot(statement)
elif type(statement) == DropChatBot:
elif type(statement) is DropChatBot:
return self.answer_drop_chatbot(statement)
elif type(statement) == CreateKnowledgeBase:
elif type(statement) is CreateKnowledgeBase:
return self.answer_create_kb(statement)
elif type(statement) == DropKnowledgeBase:
elif type(statement) is DropKnowledgeBase:
return self.anwser_drop_kb(statement)
elif type(statement) == CreateSkill:
elif type(statement) is CreateSkill:
return self.answer_create_skill(statement)
elif type(statement) == DropSkill:
elif type(statement) is DropSkill:
return self.answer_drop_skill(statement)
elif type(statement) == UpdateSkill:
elif type(statement) is UpdateSkill:
return self.answer_update_skill(statement)
elif type(statement) == CreateAgent:
elif type(statement) is CreateAgent:
return self.answer_create_agent(statement)
elif type(statement) == DropAgent:
elif type(statement) is DropAgent:
return self.answer_drop_agent(statement)
elif type(statement) == UpdateAgent:
elif type(statement) is UpdateAgent:
return self.answer_update_agent(statement)
elif type(statement) == Evaluate:
elif type(statement) is Evaluate:
statement.data = parse_sql(statement.query_str, dialect="mindsdb")
return self.answer_evaluate_metric(statement)
else:
Expand Down Expand Up @@ -958,7 +957,7 @@ def answer_retrain_predictor(self, statement):
model_record.data_integration_ref["id"]
)
if integration is None:
raise Exception(
raise EntityNotExistsError(
"The database from which the model was trained no longer exists"
)

Expand All @@ -969,16 +968,16 @@ def answer_retrain_predictor(self, statement):

if "engine" in statement.using:
ml_integration_name = statement.using.pop("engine")
ml_handler = self.session.integration_controller.get_handler(
ml_handler = self.session.integration_controller.get_ml_handler(
ml_integration_name
)

# use current ml handler
if ml_handler is None:
integration_record = get_predictor_integration(model_record)
if integration_record is None:
raise Exception("ML engine model was trained with does not esxists")
ml_handler = self.session.integration_controller.get_handler(
raise EntityNotExistsError("ML engine model was trained with does not esxists")
ml_handler = self.session.integration_controller.get_ml_handler(
integration_record.name
)

Expand Down Expand Up @@ -1008,7 +1007,7 @@ def answer_finetune_predictor(self, statement):
raise Exception(
"The ML engine that the model was trained with does not exist."
)
ml_handler = self.session.integration_controller.get_handler(
ml_handler = self.session.integration_controller.get_ml_handler(
integration_record.name
)

Expand Down Expand Up @@ -1093,7 +1092,7 @@ def _create_integration(self, name: str, engine: str, connection_args: dict):

self.session.integration_controller.add(name, engine, connection_args)
if storage:
handler = self.session.integration_controller.get_handler(name)
handler = self.session.integration_controller.get_data_handler(name)
handler.handler_storage.import_files(storage)

def answer_create_ml_engine(self, statement: ASTNode):
Expand All @@ -1112,52 +1111,37 @@ def answer_create_ml_engine(self, statement: ASTNode):
)
if handler_module_meta is None:
raise ExecutorException(f"There is no engine '{statement.handler}'")
if handler_module_meta.get("import", {}).get("success") is not True:
msg = dedent(
f"""\
Handler '{handler_module_meta['name']}' cannot be used. Reason is:
{handler_module_meta['import']['error_message']}
"""

try:
self.session.integration_controller.add(
name=name, engine=statement.handler, connection_args=statement.params
)
is_cloud = self.session.config.get("cloud", False)
if is_cloud is False:
msg += dedent(
f"""

If the error is related to missing dependencies, then try to run one of the following commands in a shell and restart mindsdb:
If you have cloned the github repo, run "pip install '.[{handler_module_meta['name']}]'"
If you have installed via pip, run "pip install 'mindsdb[{handler_module_meta['name']}]'"
If you are using docker, run "docker exec <container_name> pip install 'mindsdb[{handler_module_meta['name']}]'"
except Exception as e:
msg = str(e)
if type(e) in (ImportError, ModuleNotFoundError):
msg = dedent(
f"""\
Handler '{handler_module_meta['name']}' cannot be used. Reason is:
{handler_module_meta['import']['error_message']}
"""
)
is_cloud = self.session.config.get("cloud", False)
if is_cloud is False:
msg += dedent(
f"""

If the error is related to missing dependencies, then try to run one of the following commands in a shell and restart mindsdb:
If you have cloned the github repo, run "pip install '.[{handler_module_meta['name']}]'"
If you have installed via pip, run "pip install 'mindsdb[{handler_module_meta['name']}]'"
If you are using docker, run "docker exec <container_name> pip install 'mindsdb[{handler_module_meta['name']}]'"
"""
)
logger.info(msg)
ast_drop = DropMLEngine(name=statement.name)
self.answer_drop_ml_engine(ast_drop)
logger.info(msg)
raise ExecutorException(msg)

integration_id = self.session.integration_controller.add(
name=name, engine=statement.handler, connection_args=statement.params
)

HandlerClass = self.session.integration_controller.handler_modules[
handler_module_meta["name"]
].Handler

if hasattr(HandlerClass, "create_engine"):
handlerStorage = HandlerStorage(integration_id)
ml_handler = HandlerClass(
engine_storage=handlerStorage,
model_storage=None,
)

try:
ml_handler.create_engine(statement.params)
except NotImplementedError:
pass
except Exception as e:
# something wrong, drop ml engine
ast_drop = DropMLEngine(name=statement.name)
self.answer_drop_ml_engine(ast_drop)
raise e

return ExecuteAnswer(ANSWER_TYPE.OK)

def answer_drop_ml_engine(self, statement: ASTNode):
Expand Down Expand Up @@ -1514,7 +1498,7 @@ def answer_create_predictor(self, statement):

ml_integration_name = statement.using.pop("engine", ml_integration_name)

ml_handler = self.session.integration_controller.get_handler(
ml_handler = self.session.integration_controller.get_ml_handler(
ml_integration_name
)

Expand Down Expand Up @@ -1625,7 +1609,7 @@ def answer_single_row_select(self, statement):
column_name = str(result)
column_alias = (
".".join(target.alias.parts)
if type(target.alias) == Identifier
if type(target.alias) is Identifier
else column_name
)
elif target_type == NullConstant:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, integration_name, ds_type, integration_controller):
self.integration_name = integration_name
self.ds_type = ds_type
self.integration_controller = integration_controller
self.integration_handler = self.integration_controller.get_handler(self.integration_name)
self.integration_handler = self.integration_controller.get_data_handler(self.integration_name)

def get_type(self):
return self.type
Expand Down
4 changes: 2 additions & 2 deletions mindsdb/api/executor/datahub/datanodes/project_datanode.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ def predict(self, model_name: str, data, version=None, params=None):
model_metadata = model_metadata['metadata']
if model_metadata['update_status'] == 'available':
raise Exception(f"model '{model_name}' is obsolete and needs to be updated. Run 'RETRAIN {model_name};'")
handler = self.integration_controller.get_handler(model_metadata['engine_name'])
return handler.predict(model_name, data, project_name=self.project.name, version=version, params=params)
ml_handler = self.integration_controller.get_ml_handler(model_metadata['engine_name'])
return ml_handler.predict(model_name, data, project_name=self.project.name, version=version, params=params)

def query(self, query=None, native_query=None, session=None):
if query is None and native_query is not None:
Expand Down
2 changes: 1 addition & 1 deletion mindsdb/api/http/namespaces/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def put(self, name):

# copy storage
if storage is not None:
handler = ca.integration_controller.get_handler(name)
handler = ca.integration_controller.get_data_handler(name)

export = decrypt(storage.encode(), secret_key)
handler.handler_storage.import_files(export)
Expand Down
2 changes: 1 addition & 1 deletion mindsdb/api/http/namespaces/databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def delete(self, database_name, table_name):
)

try:
integration_handler = session.integration_controller.get_handler(database_name)
integration_handler = session.integration_controller.get_data_handler(database_name)
except Exception:
abort(HTTPStatus.INTERNAL_SERVER_ERROR, f'Could not get database handler for {database_name}')
try:
Expand Down
13 changes: 7 additions & 6 deletions mindsdb/api/http/namespaces/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
from mindsdb_sql.parser.ast import Identifier
from mindsdb_sql.parser.dialects.mindsdb import CreateMLEngine

from mindsdb.integrations.utilities.install import install_dependencies
from mindsdb.interfaces.storage.model_fs import HandlerStorage
from mindsdb.api.http.utils import http_error
from mindsdb.api.http.namespaces.configs.handlers import ns_conf
from mindsdb.integrations.utilities.install import install_dependencies

from mindsdb.api.executor.controllers.session_controller import SessionController
from mindsdb.api.executor.command_executor import ExecuteCommands

Expand Down Expand Up @@ -144,12 +144,13 @@ def post(self, name):

session = SessionController()

base_ml_handler = session.integration_controller.get_handler(name)
byom_handler = base_ml_handler.get_ml_handler()
byom_handler.update_engine(connection_args)
base_ml_handler = session.integration_controller.get_ml_handler(name)
base_ml_handler.update_engine(connection_args)

engine_storage = HandlerStorage(base_ml_handler.integration_id)

engine_versions = [
int(x) for x in byom_handler.engine_storage.get_connection_args()['versions'].keys()
int(x) for x in engine_storage.get_connection_args()['versions'].keys()
]

return {
Expand Down
Loading
Loading