Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datanode returns list of lists instead of dict. #8745

Merged
merged 3 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions mindsdb/api/executor/command_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2024,7 +2024,9 @@ def answer_update_model_version(self, statement):
where=statement.where,
)

models, _ = project_datanode.query(query=query, session=self.session)
data, columns_info = project_datanode.query(query=query, session=self.session)
col_names = [col['name'] for col in columns_info]
models = [dict(zip(col_names, item)) for item in data]

# get columns for update
kwargs = {}
Expand Down Expand Up @@ -2053,7 +2055,9 @@ def answer_delete_model_version(self, statement):
where=statement.where,
)

models, _ = project_datanode.query(query=query, session=self.session)
data, columns_info = project_datanode.query(query=query, session=self.session)
col_names = [col['name'] for col in columns_info]
models = [dict(zip(col_names, item)) for item in data]

self.session.model_controller.delete_model_version(models)
return ExecuteAnswer(ANSWER_TYPE.OK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1043,4 +1043,4 @@ def query(self, query: ASTNode, session=None):

columns_info = [{"name": k, "type": v} for k, v in data.dtypes.items()]

return data.to_dict(orient="records"), columns_info
return data.to_dict(orient="split")['data'], columns_info
Original file line number Diff line number Diff line change
Expand Up @@ -190,5 +190,5 @@ def query(self, query=None, native_query=None, session=None):
}
for k, v in df.dtypes.items()
]
data = df.to_dict(orient='records')
data = df.to_dict(orient='split')['data']
return data, columns_info
10 changes: 4 additions & 6 deletions mindsdb/api/executor/datahub/datanodes/project_datanode.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from copy import deepcopy

import pandas as pd

from mindsdb_sql import parse_sql
from mindsdb_sql.parser.ast import (
BinaryOperation,
Expand Down Expand Up @@ -75,7 +73,7 @@ def query(self, query=None, native_query=None, session=None):
if kb_table:
# this is the knowledge db
kb_table.update_query(query)
return pd.DataFrame(), []
return [], []

raise NotImplementedError(f"Can't update object: {query_table}")

Expand All @@ -85,7 +83,7 @@ def query(self, query=None, native_query=None, session=None):
if kb_table:
# this is the knowledge db
kb_table.delete_query(query)
return pd.DataFrame(), []
return [], []

raise NotImplementedError(f"Can't delete object: {query_table}")

Expand Down Expand Up @@ -147,7 +145,7 @@ def query(self, query=None, native_query=None, session=None):
for k, v in df.dtypes.items()
]

return df.to_dict(orient='records'), columns_info
return df.to_dict(orient='split')['data'], columns_info

kb_table = session.kb_controller.get_table(query_table, self.project.id)
if kb_table:
Expand All @@ -161,7 +159,7 @@ def query(self, query=None, native_query=None, session=None):
for k, v in df.dtypes.items()
]

return df.to_dict(orient='records'), columns_info
return df.to_dict(orient='split')['data'], columns_info

raise EntityNotExistsError(f"Can't select from {query_table} in project")
else:
Expand Down
3 changes: 2 additions & 1 deletion mindsdb/api/executor/sql_query/steps/fetch_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def call(self, step):
table_alias=table_alias[2],
database=table_alias[0]
))
result.add_records(data)
for record in data:
result.add_record_raw(record)

return result
4 changes: 2 additions & 2 deletions mindsdb/interfaces/query_context/context_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def _result_callback(self, l_query: LastQuery,
if len(data) == 0:
return

df = pd.DataFrame(data)
df = pd.DataFrame(data, columns=[col['name'] for col in columns_info])
values = {}
# get max values
for info in l_query.get_last_columns():
Expand Down Expand Up @@ -161,7 +161,7 @@ def __get_init_last_values(self, l_query: LastQuery, dn, session) -> dict:
if len(data) == 0:
value = None
else:
value = list(data[0].values())[0]
value = data[0][0]
if value is not None:
last_values[info['table_name']] = {info['column_name']: value}

Expand Down
5 changes: 4 additions & 1 deletion tests/unit/executor_test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,10 @@ def native_query_f(query):
for table, df in tables.items():
con.register(table, df)
try:
result_df = con.execute(query).fetchdf()
con.execute(query)
columns = [c[0] for c in con.description]
result_df = pd.DataFrame(con.fetchall(), columns=columns)

result_df = result_df.replace({np.nan: None})
except Exception:
# it can be not supported command like update or insert
Expand Down
21 changes: 21 additions & 0 deletions tests/unit/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,27 @@ def test_last_in_job(self, data_handler, scheduler):
assert 'a > 2' in sql
assert "b = 'b'" in sql

@patch('mindsdb.integrations.handlers.postgres_handler.Handler')
def test_duplicated_cols(self, data_handler):
df1 = pd.DataFrame([
{'id': 1, 'a': 1},
{'id': 2, 'a': 2},
{'id': 3, 'a': 3},
])
df2 = pd.DataFrame([
{'id': 1, 'a': 10},
{'id': 2, 'a': 20},
])
self.set_handler(data_handler, name='pg', tables={'tbl1': df1, 'tbl2': df2})

ret = self.run_sql('''
select * from pg.tbl1 as a
join pg.tbl2 as b on a.id=b.id
''')

first_row = ret.to_dict('split')['data'][0]
assert first_row == [1, 1, 1, 10]


class TestJobs(BaseExecutorDummyML):

Expand Down
Loading