Skip to content

Commit

Permalink
ODBC-410 Optimization of the SQLForeignKeys
Browse files Browse the repository at this point in the history
With growing number of schemas on the database server calls to
SQLForeingKeys tends to become significatly and noticeably slow. The fix
work-arounds the problem in the MDEV-34015 by pushing the condition to
the JOIN clauses and by running auxialary query. This makes
SQLForeignKeys acceptable
  • Loading branch information
lawrinn committed Apr 28, 2024
1 parent 7439cac commit 2124cb5
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 87 deletions.
291 changes: 220 additions & 71 deletions driver/ma_catalog.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/************************************************************************************
Copyright (C) 2021,2023 MariaDB Corporation AB
Copyright (C) 2021,2024 MariaDB Corporation AB
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
Expand All @@ -17,6 +17,7 @@
51 Franklin St., Fifth Floor, Boston, MA 02110, USA
*************************************************************************************/
#include "ma_odbc.h"
#include <sstream>

#include "class/Protocol.h"
/*
Expand All @@ -27,6 +28,25 @@
*
* if bufferLen is -1, them buffer is assumed to be the dynamic string. Plain char buffer otherwise
*/

/* {{{ AddPvCondition - for stream */
static int AddPvCondition(MADB_Dbc *dbc, std::ostringstream& query, const char* value, SQLSMALLINT len)
{
char escaped[2 * NAME_LEN + 1];
/* We should probably compare to SQL_NTS, and throw error if just < 0, but I think this way it will work just fine */
if (len < 0)
{
len= (SQLSMALLINT)strlen(value);
}

len= (SQLSMALLINT)mysql_real_escape_string(dbc->mariadb, escaped, value, len);

query << " LIKE '" << escaped /*len*/ << "' ";

return 0;
}
/* }}} */

/* {{{ AddPvCondition */
static int AddPvCondition(MADB_Dbc *dbc, void* buffer, size_t bufferLen, char* value, SQLSMALLINT len)
{
Expand Down Expand Up @@ -82,6 +102,29 @@ static char Read_lower_case_table_names(MADB_Dbc *Dbc)
}
/* }}} */

/* {{{ AddOaCondition - stream version */
static void AddOaCondition(MADB_Dbc *Dbc, std::ostringstream& query, const char* value, SQLSMALLINT len)
{
char escaped[2 * NAME_LEN + 1];
const char *cs= "=BINARY'", *ci= "='", *compare= cs;

/* We should probably compare to SQL_NTS, and throw error if just < 0, but I think this way it will work just fine */
if (len < 0)
{
len= (SQLSMALLINT)strlen(value);
}

len= (SQLSMALLINT)mysql_real_escape_string(Dbc->mariadb, escaped, value, len);

if (Read_lower_case_table_names(Dbc))
{
compare= ci;
}
query << compare;
query.write(escaped, len).write("' ", 2);
}
/* }}} */

/* {{{ AddOaCondition */
static int AddOaCondition(MADB_Dbc *Dbc, void* buffer, size_t bufferLen, char* value, SQLSMALLINT len)
{
Expand Down Expand Up @@ -118,6 +161,13 @@ static int AddOaCondition(MADB_Dbc *Dbc, void* buffer, size_t bufferLen, char* v
}
/* }}} */

/* {{{ AddIdCondition - stream version */
static void AddIdCondition(std::ostringstream& query, char* value, SQLSMALLINT /*len*/)
{
query << "=`" << value << "` ";
}
/* }}} */

/* {{{ AddIdCondition */
static int AddIdCondition(void* buffer, size_t bufferLen, char* value, SQLSMALLINT len)
{
Expand Down Expand Up @@ -157,7 +207,26 @@ static int AddPvOrIdCondition(MADB_Stmt* Stmt, void *buffer, size_t bufferLen, c
/* }}} */

/* {{{ AddOaOrIdCondition */
static int AddOaOrIdCondition(MADB_Stmt* Stmt, void* buffer, size_t bufferLen, char* value, SQLSMALLINT len)
static void AddOaOrIdCondition(MADB_Stmt* Stmt, std::ostringstream& query, char* value,
SQLSMALLINT len)
{
SQLULEN MetadataId;
Stmt->Methods->GetAttr(Stmt, SQL_ATTR_METADATA_ID, &MetadataId, 0, (SQLINTEGER*)NULL);

if (MetadataId == SQL_TRUE)
{
AddIdCondition(query, value, len);
}
else
{
AddOaCondition(Stmt->Connection, query, value, len);
}
}
/* }}} */

/* {{{ AddOaOrIdCondition */
static int AddOaOrIdCondition(MADB_Stmt* Stmt, void* buffer, size_t bufferLen, char* value,
SQLSMALLINT len)
{
SQLULEN MetadataId;
Stmt->Methods->GetAttr(Stmt, SQL_ATTR_METADATA_ID, &MetadataId, 0, (SQLINTEGER*)NULL);
Expand Down Expand Up @@ -894,6 +963,11 @@ SQLRETURN MADB_StmtProcedures(MADB_Stmt *Stmt, char *CatalogName, SQLSMALLINT Na
}
/* }}} */

static void constructFKquery(std::ostringstream& query, const std::string& part1,
const char* refSchemaCond, const std::string& part2, const char* schemaCond, const std::ostringstream& part3)
{
query << part1 << refSchemaCond << part2 << schemaCond << ") " << part3.str();
}
/* {{{ SQLForeignKeys */
SQLRETURN MADB_StmtForeignKeys(MADB_Stmt *Stmt, char *PKCatalogName, SQLSMALLINT NameLength1,
char *PKSchemaName, SQLSMALLINT NameLength2, char *PKTableName,
Expand All @@ -902,19 +976,55 @@ SQLRETURN MADB_StmtForeignKeys(MADB_Stmt *Stmt, char *PKCatalogName, SQLSMALLINT
SQLSMALLINT NameLength6)
{
SQLRETURN ret= SQL_ERROR;
MADB_DynString StmtStr;
static const std::string part1("SELECT A.REFERENCED_TABLE_SCHEMA PKTABLE_CAT,NULL PKTABLE_SCHEM,"
"A.REFERENCED_TABLE_NAME PKTABLE_NAME,"
"A.REFERENCED_COLUMN_NAME PKCOLUMN_NAME,"
"A.TABLE_SCHEMA FKTABLE_CAT,NULL FKTABLE_SCHEM,"
"A.TABLE_NAME FKTABLE_NAME,A.COLUMN_NAME FKCOLUMN_NAME,"
"A.POSITION_IN_UNIQUE_CONSTRAINT KEY_SEQ,"
"CASE update_rule "
" WHEN 'RESTRICT' THEN " XSTR(SQL_RESTRICT)
" WHEN 'NO ACTION' THEN " XSTR(SQL_NO_ACTION)
" WHEN 'CASCADE' THEN " XSTR(SQL_CASCADE)
" WHEN 'SET NULL' THEN " XSTR(SQL_SET_NULL)
" WHEN 'SET DEFAULT' THEN " XSTR(SQL_SET_DEFAULT) " "
"END UPDATE_RULE,"
"CASE DELETE_RULE"
" WHEN 'RESTRICT' THEN " XSTR(SQL_RESTRICT)
" WHEN 'NO ACTION' THEN " XSTR(SQL_NO_ACTION)
" WHEN 'CASCADE' THEN " XSTR(SQL_CASCADE)
" WHEN 'SET NULL' THEN " XSTR(SQL_SET_NULL)
" WHEN 'SET DEFAULT' THEN " XSTR(SQL_SET_DEFAULT) " "
"END DELETE_RULE,"
"A.CONSTRAINT_NAME FK_NAME,"
"'PRIMARY' PK_NAME,"
XSTR(SQL_NOT_DEFERRABLE) " AS DEFERRABILITY"
" FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE A"
" JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE B"
" ON (B.TABLE_SCHEMA"),
part2(" AND B.TABLE_NAME=A.REFERENCED_TABLE_NAME"
" AND B.COLUMN_NAME=A.REFERENCED_COLUMN_NAME AND B.CONSTRAINT_NAME='PRIMARY')"
" JOIN INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS RC"
" ON (RC.CONSTRAINT_NAME=A.CONSTRAINT_NAME"
" AND RC.TABLE_NAME=A.TABLE_NAME"
" AND RC.CONSTRAINT_SCHEMA"),
currentSchemaCond("=DATABASE()"),
orderBy(" ORDER BY FKTABLE_CAT, FKTABLE_SCHEM, FKTABLE_NAME, KEY_SEQ, PKTABLE_NAME");
std::ostringstream query, part3;
const char *refSchemaCond= nullptr, *schemaCond= nullptr;
char namedRefSchemaCondition[NAME_LEN + 16/*=BINARY ''*/]= {'\0'},
namedSchemaCondition[NAME_LEN + 16/*=BINARY ''*/]={'\0'};

MADB_CLEAR_ERROR(&Stmt->Error);

/* PKTableName and FKTableName are mandatory. The requirement is only about NULL names */
if (PKTableName == NULL && FKTableName == NULL)
if (PKTableName == nullptr && FKTableName == nullptr)
{
MADB_SetError(&Stmt->Error, MADB_ERR_HY009, "PKTableName or FKTableName are required", 0);
return Stmt->Error.ReturnValue;
return MADB_SetError(&Stmt->Error, MADB_ERR_HY009, "PKTableName or FKTableName are required", 0);
}
ADJUST_LENGTH(PKSchemaName, NameLength2);
ADJUST_LENGTH(FKSchemaName, NameLength5);
if (((PKSchemaName != NULL && *PKSchemaName != '\0') || (FKSchemaName != NULL && *FKSchemaName != '\0')) &&
if (((PKSchemaName != nullptr && *PKSchemaName != '\0') || (FKSchemaName != nullptr && *FKSchemaName != '\0')) &&
SCHEMA_PARAMETER_ERRORS_ALLOWED(Stmt))
{
return MADB_SetError(&Stmt->Error, MADB_ERR_HYC00, "Schemas are not supported. Use CatalogName parameter instead", 0);
Expand All @@ -923,93 +1033,132 @@ SQLRETURN MADB_StmtForeignKeys(MADB_Stmt *Stmt, char *PKCatalogName, SQLSMALLINT
ADJUST_LENGTH(PKCatalogName, NameLength1);
ADJUST_LENGTH(PKTableName, NameLength3);
ADJUST_LENGTH(FKCatalogName, NameLength4);

ADJUST_LENGTH(FKTableName, NameLength6);

/* modified from JDBC driver */
MADB_InitDynamicString(&StmtStr,
"SELECT A.REFERENCED_TABLE_SCHEMA PKTABLE_CAT, NULL PKTABLE_SCHEM, "
"A.REFERENCED_TABLE_NAME PKTABLE_NAME, "
"A.REFERENCED_COLUMN_NAME PKCOLUMN_NAME, "
"A.TABLE_SCHEMA FKTABLE_CAT, NULL FKTABLE_SCHEM, "
"A.TABLE_NAME FKTABLE_NAME, A.COLUMN_NAME FKCOLUMN_NAME, "
"A.POSITION_IN_UNIQUE_CONSTRAINT KEY_SEQ, "
"CASE update_rule "
" WHEN 'RESTRICT' THEN " XSTR(SQL_RESTRICT)
" WHEN 'NO ACTION' THEN " XSTR(SQL_NO_ACTION)
" WHEN 'CASCADE' THEN " XSTR(SQL_CASCADE)
" WHEN 'SET NULL' THEN " XSTR(SQL_SET_NULL)
" WHEN 'SET DEFAULT' THEN " XSTR(SQL_SET_DEFAULT) " "
"END UPDATE_RULE, "
"CASE DELETE_RULE"
" WHEN 'RESTRICT' THEN " XSTR(SQL_RESTRICT)
" WHEN 'NO ACTION' THEN " XSTR(SQL_NO_ACTION)
" WHEN 'CASCADE' THEN " XSTR(SQL_CASCADE)
" WHEN 'SET NULL' THEN " XSTR(SQL_SET_NULL)
" WHEN 'SET DEFAULT' THEN " XSTR(SQL_SET_DEFAULT) " "
" END DELETE_RULE,"
"A.CONSTRAINT_NAME FK_NAME, "
"'PRIMARY' PK_NAME,"
XSTR(SQL_NOT_DEFERRABLE) " AS DEFERRABILITY "
" FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE A"
" JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE B"
" ON (B.TABLE_SCHEMA = A.REFERENCED_TABLE_SCHEMA"
" AND B.TABLE_NAME = A.REFERENCED_TABLE_NAME"
" AND B.COLUMN_NAME = A.REFERENCED_COLUMN_NAME)"
" JOIN INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS RC"
" ON (RC.CONSTRAINT_NAME = A.CONSTRAINT_NAME"
" AND RC.TABLE_NAME = A.TABLE_NAME"
" AND RC.CONSTRAINT_SCHEMA = A.TABLE_SCHEMA)"
" WHERE B.CONSTRAINT_NAME= 'PRIMARY'",
8192, 128);

/* Empty schema name means tables w/out schema. We could get here only if it is empty string, otherwise error would have been already thrown */
if (PKSchemaName != NULL || FKSchemaName != NULL)
{
MADB_DYNAPPENDCONST(&StmtStr, " AND 0");
part3 << "WHERE 1";
/* Empty schema name means tables w/out schema. We could get here only if it is empty string,
otherwise error would have been already thrown. If any of catalogs are empty string - same empty RS */
if (PKSchemaName != nullptr || FKSchemaName != nullptr || (PKCatalogName && NameLength1 == 0) ||
(FKCatalogName && NameLength4 == 0))
{
part3 << " AND 0";
constructFKquery(query, part1, currentSchemaCond.c_str(), part2, currentSchemaCond.c_str(), part3);
}
else
{
if (PKTableName != NULL)
if (PKCatalogName)
{
MADB_DYNAPPENDCONST(&StmtStr, " AND A.REFERENCED_TABLE_SCHEMA");
AddOaOrIdCondition(Stmt, namedRefSchemaCondition, sizeof(namedRefSchemaCondition), PKCatalogName,
NameLength1);
refSchemaCond= namedRefSchemaCondition;
}

if (PKCatalogName)
if (PKTableName != nullptr)
{
if (refSchemaCond == nullptr)
{
AddOaOrIdCondition(Stmt, &StmtStr, (size_t)-1, PKCatalogName, NameLength1);
refSchemaCond= currentSchemaCond.c_str();
}
else
part3 << " AND A.REFERENCED_TABLE_SCHEMA" << refSchemaCond << " AND A.REFERENCED_TABLE_NAME";
AddOaOrIdCondition(Stmt, part3, PKTableName, NameLength3);
}

if (FKCatalogName != nullptr)
{
AddOaOrIdCondition(Stmt, namedSchemaCondition, sizeof(namedSchemaCondition), FKCatalogName, NameLength4);
schemaCond= namedSchemaCondition;
}

if (FKTableName != nullptr)
{
if (schemaCond == nullptr)
{
MADB_DYNAPPENDCONST(&StmtStr, "=DATABASE()");
schemaCond= currentSchemaCond.c_str();
}
MADB_DYNAPPENDCONST(&StmtStr, " AND A.REFERENCED_TABLE_NAME");
part3 << " AND A.TABLE_SCHEMA" << schemaCond << " AND A.TABLE_NAME";

AddOaOrIdCondition(Stmt, &StmtStr, (size_t)-1, PKTableName, NameLength3);
AddOaOrIdCondition(Stmt, part3, FKTableName, NameLength6);
}

if (FKTableName != NULL)
if (refSchemaCond && schemaCond)
{
MADB_DYNAPPENDCONST(&StmtStr, " AND A.TABLE_SCHEMA");

if (FKCatalogName != NULL)
part3 << orderBy;
constructFKquery(query, part1, refSchemaCond, part2, schemaCond, part3);
}
else
{
std::ostringstream aux;
std::string field("REFERENCED_TABLE_SCHEMA");
aux << "SELECT DISTINCT ";
if (refSchemaCond)
{
AddOaOrIdCondition(Stmt, &StmtStr, (size_t)-1, FKCatalogName, NameLength4);
field= "TABLE_SCHEMA";
namedSchemaCondition[0]= '=';
namedSchemaCondition[1]= '\'';
}
else
{
MADB_DYNAPPENDCONST(&StmtStr, "=DATABASE() ");
namedRefSchemaCondition[0]= '=';
namedRefSchemaCondition[1]= '\'';
}
MADB_DYNAPPENDCONST(&StmtStr, " AND A.TABLE_NAME");
aux << field << " FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE A " << part3.str() << " AND " << field <<
" IS NOT NULL ORDER BY TABLE_SCHEMA";

AddOaOrIdCondition(Stmt, &StmtStr, (size_t)-1, FKTableName, NameLength6);
std::lock_guard<std::mutex> localScopeLock(Stmt->Connection->guard->getLock());
Stmt->Connection->guard->safeRealQuery(aux.str());
auto result= mysql_store_result(Stmt->Connection->mariadb);

if (result == nullptr)
{
//Does not matter what are they, but we need to put there something
if (refSchemaCond)
schemaCond= refSchemaCond;
else
{
refSchemaCond= schemaCond;
}
part3 << " AND 0";
constructFKquery(query, part1, refSchemaCond, part2, schemaCond, part3);
}
else
{
bool unionAll= false;
while (MYSQL_ROW row= mysql_fetch_row(result)) {
auto rowLength= mysql_fetch_lengths(result);
auto localSchemaCond= schemaCond, localRefSchemaCond= refSchemaCond;
if (unionAll)
{
query << " UNION ALL ";
}
else
{
unionAll= true;
}
// We don't need AddOaOrIdCondition here - we know how to make it
if (refSchemaCond)
{
memcpy(namedSchemaCondition + 2/*='*/, row[0], rowLength[0]);
namedSchemaCondition[rowLength[0] + 2]= '\'';
namedSchemaCondition[rowLength[0] + 3]= '\0';
localSchemaCond= namedSchemaCondition;
}
else
{
memcpy(namedRefSchemaCondition + 2/*='*/, row[0], rowLength[0]);
namedRefSchemaCondition[rowLength[0] + 2]= '\'';
namedRefSchemaCondition[rowLength[0] + 3]= '\0';
localRefSchemaCond= namedRefSchemaCondition;
}
constructFKquery(query, part1, localRefSchemaCond, part2, localSchemaCond, part3);
}
query << orderBy;
mysql_free_result(result);
}
}
MADB_DYNAPPENDCONST(&StmtStr, "ORDER BY FKTABLE_CAT, FKTABLE_SCHEM, FKTABLE_NAME, KEY_SEQ, PKTABLE_NAME");
}
ret= Stmt->Methods->ExecDirect(Stmt, StmtStr.str, SQL_NTS);

MADB_DynstrFree(&StmtStr);

return ret;
std::string finalQuery(query.str());
return Stmt->Methods->ExecDirect(Stmt, const_cast<char*>(finalQuery.c_str()),
static_cast<SQLINTEGER>(finalQuery.length()));
}
/* }}} */

Loading

0 comments on commit 2124cb5

Please sign in to comment.