Skip to content

Commit

Permalink
Rebase with yb::master up to commit d5d857e
Browse files Browse the repository at this point in the history
  • Loading branch information
nocaway committed Jul 20, 2023
1 parent 57688f4 commit 963f869
Show file tree
Hide file tree
Showing 13 changed files with 224 additions and 21 deletions.
66 changes: 63 additions & 3 deletions src/postgres/src/backend/catalog/yb_catalog/yb_catalog_version.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include "access/htup_details.h"
#include "access/sysattr.h"
#include "catalog/catalog.h"
#include "catalog/namespace.h"
#include "catalog/pg_authid_d.h"
#include "catalog/pg_database.h"
#include "catalog/pg_namespace_d.h"
#include "catalog/pg_proc.h"
Expand Down Expand Up @@ -78,12 +80,69 @@ uint64_t YbGetMasterCatalogVersion()

/* Modify Catalog Version */

static void
YbCallSQLIncrementCatalogVersions(bool is_breaking_change)
{
List* names =
list_make2(makeString("pg_catalog"),
makeString("yb_increment_all_db_catalog_versions"));
FuncCandidateList clist = FuncnameGetCandidates(
names,
-1 /* nargs */,
NIL /* argnames */,
false /* expand_variadic */,
false /* expand_defaults */,
false /* include_out_arguments */,
false /* missing_ok */);
/* We expect exactly one candidate. */
Assert(clist && clist->next == NULL);
Oid functionId = clist->oid;
FmgrInfo flinfo;
FunctionCallInfoBaseData fcinfo;
fmgr_info(functionId, &flinfo);
InitFunctionCallInfoData(fcinfo, &flinfo, 1, InvalidOid, NULL, NULL);
fcinfo.args[0].value = BoolGetDatum(is_breaking_change);
fcinfo.args[0].isnull = false;

// Save old values and set new values to enable the call.
bool saved = yb_non_ddl_txn_for_sys_tables_allowed;
yb_non_ddl_txn_for_sys_tables_allowed = true;
Oid save_userid;
int save_sec_context;
GetUserIdAndSecContext(&save_userid, &save_sec_context);
SetUserIdAndSecContext(BOOTSTRAP_SUPERUSERID,
SECURITY_RESTRICTED_OPERATION);
PG_TRY();
{
FunctionCallInvoke(&fcinfo);
/* Restore old values. */
yb_non_ddl_txn_for_sys_tables_allowed = saved;
SetUserIdAndSecContext(save_userid, save_sec_context);
}
PG_CATCH();
{
/* Restore old values. */
yb_non_ddl_txn_for_sys_tables_allowed = saved;
SetUserIdAndSecContext(save_userid, save_sec_context);
PG_RE_THROW();
}
PG_END_TRY();
}

static void
YbIncrementMasterDBCatalogVersionTableEntryImpl(
Oid db_oid, bool is_breaking_change)
Oid db_oid, bool is_breaking_change, bool is_global_ddl)
{
Assert(YbGetCatalogVersionType() == CATALOG_VERSION_CATALOG_TABLE);

if (is_global_ddl)
{
Assert(YBIsDBCatalogVersionMode());
/* Call yb_increment_all_db_catalog_versions(is_breaking_change). */
YbCallSQLIncrementCatalogVersions(is_breaking_change);
return;
}

YBCPgStatement update_stmt = NULL;
YBCPgTypeAttrs type_attrs = { 0 };
YBCPgExpr yb_expr;
Expand Down Expand Up @@ -166,7 +225,8 @@ YbIncrementMasterDBCatalogVersionTableEntryImpl(
RelationClose(rel);
}

bool YbIncrementMasterCatalogVersionTableEntry(bool is_breaking_change)
bool YbIncrementMasterCatalogVersionTableEntry(bool is_breaking_change,
bool is_global_ddl)
{
if (YbGetCatalogVersionType() != CATALOG_VERSION_CATALOG_TABLE)
return false;
Expand All @@ -175,7 +235,7 @@ bool YbIncrementMasterCatalogVersionTableEntry(bool is_breaking_change)
*/
YbIncrementMasterDBCatalogVersionTableEntryImpl(
YBIsDBCatalogVersionMode() ? MyDatabaseId : Template1DbOid,
is_breaking_change);
is_breaking_change, is_global_ddl);
return true;
}

Expand Down
28 changes: 28 additions & 0 deletions src/postgres/src/backend/executor/ybcModifyTable.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
#include "access/htup_details.h"
#include "access/sysattr.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_authid_d.h"
#include "catalog/pg_auth_members_d.h"
#include "catalog/pg_tablespace_d.h"
#include "catalog/pg_type.h"
#include "catalog/pg_yb_role_profile.h"
#include "catalog/pg_yb_role_profile_d.h"
Expand Down Expand Up @@ -203,6 +207,30 @@ static void YBCExecWriteStmt(YBCPgStatement ybc_stmt,
// TODO(shane) also update the shared memory catalog version here.
YbUpdateCatalogCacheVersion(YbGetCatalogCacheVersion() + 1);
}

if (YBIsDBCatalogVersionMode() &&
RelationGetForm(rel)->relisshared &&
RelationSupportsSysCache(RelationGetRelid(rel)) &&
!(*YBCGetGFlags()->ysql_disable_global_impact_ddl_statements))
{
/* NOTE: relisshared implies that rel is a system relation. */
Assert(IsSystemRelation(rel));
Assert(/* pg_authid */
RelationGetRelid(rel) == AuthIdRelationId ||
RelationGetRelid(rel) == AuthIdRolnameIndexId ||

/* pg_auth_members */
RelationGetRelid(rel) == AuthMemRelationId ||
RelationGetRelid(rel) == AuthMemRoleMemIndexId ||
RelationGetRelid(rel) == AuthMemMemRoleIndexId ||

/* pg_database */
RelationGetRelid(rel) == DatabaseRelationId ||

/* pg_tablespace */
RelationGetRelid(rel) == TableSpaceRelationId);
YbSetIsGlobalDDL();
}
}

/*
Expand Down
20 changes: 16 additions & 4 deletions src/postgres/src/backend/libpq/pqcomm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1051,11 +1051,9 @@ pq_getbyte(void)
* Same as pq_getbyte() except we don't advance the pointer.
* --------------------------------
*/
int
pq_peekbyte(void)
static int
pq_peekbyte_impl(void)
{
Assert(PqCommReadingMsg);

while (PqRecvPointer >= PqRecvLength)
{
if (pq_recvbuf()) /* If nothing in buffer, then recv some */
Expand All @@ -1064,6 +1062,20 @@ pq_peekbyte(void)
return (unsigned char) PqRecvBuffer[PqRecvPointer];
}

int
pq_peekbyte(void)
{
Assert(PqCommReadingMsg);

return pq_peekbyte_impl();
}

int
yb_pq_peekbyte_no_msg_reading_status_check(void)
{
return pq_peekbyte_impl();
}

/* --------------------------------
* pq_getbyte_if_available - get a single byte from connection,
* if available
Expand Down
4 changes: 0 additions & 4 deletions src/postgres/src/backend/postmaster/postmaster.c
Original file line number Diff line number Diff line change
Expand Up @@ -614,10 +614,6 @@ PostmasterMain(int argc, char *argv[])

IsPostmasterEnvironment = true;

if (YBIsEnabledInPostgresEnvVar()) {
YBCStatementTimeoutPtr = &StatementTimeout;
}

/*
* Start our win32 signal implementation
*/
Expand Down
29 changes: 29 additions & 0 deletions src/postgres/src/backend/tcop/postgres.c
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,33 @@ SocketBackend(StringInfo inBuf)
*/
if (pq_getmessage(inBuf, maxmsglen))
return EOF; /* suitable message already logged */

if (IsYugaByteEnabled())
{
switch(qtype)
{
case 'E':
switch (yb_pg_batch_detection_mechanism)
{
case ASSUME_ALL_BATCH_EXECUTIONS:
YbSetIsBatchedExecution(true);
break;
case DETECT_BY_PEEKING:
if (!YbIsBatchedExecution() &&
yb_pq_peekbyte_no_msg_reading_status_check() !=
'S')
YbSetIsBatchedExecution(true);
break;
}
break;
case 'S':
YbSetIsBatchedExecution(false);
break;
default:
break;
}
}

RESUME_CANCEL_INTERRUPTS();

return qtype;
Expand All @@ -493,7 +520,9 @@ ReadCommand(StringInfo inBuf)
if (whereToSendOutput == DestRemote)
result = SocketBackend(inBuf);
else
{
result = InteractiveBackend(inBuf);
}
return result;
}

Expand Down
14 changes: 8 additions & 6 deletions src/postgres/src/backend/tcop/pquery.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
*/
Portal ActivePortal = NULL;

int yb_pg_batch_detection_mechanism;

static void ProcessQuery(PlannedStmt *plan,
const char *sourceText,
ParamListInfo params,
Expand Down Expand Up @@ -1236,13 +1238,13 @@ PortalRunMulti(Portal portal,
if (altdest->mydest == DestRemoteExecute)
altdest = None_Receiver;

if (IsYugaByteEnabled())
if (IsYugaByteEnabled() &&
!IsTransactionBlock() &&
!YbIsBatchedExecution() &&
list_length(portal->stmts) == 1)
{
if (!IsTransactionBlock() && list_length(portal->stmts) == 1)
{
PlannedStmt *pstmt = linitial_node(PlannedStmt, portal->stmts);
is_single_row_modify_txn = YBCIsSingleRowModify(pstmt);
}
PlannedStmt *pstmt = linitial_node(PlannedStmt, portal->stmts);
is_single_row_modify_txn = YBCIsSingleRowModify(pstmt);
}

/*
Expand Down
13 changes: 13 additions & 0 deletions src/postgres/src/backend/utils/cache/relcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -1727,6 +1727,12 @@ YBUpdateRelationsAttributes(bool sys_relations_update_required)
SysScanDesc scandesc = systable_beginscan(
attrel, InvalidOid, false /* indexOk */, NULL, 0, NULL);
YbAttrProcessorState state = {0};
MemoryContext per_tuple_memory_context =
(*YBCGetGFlags()->ysql_disable_per_tuple_memory_context_in_update_relattrs) ?
NULL : AllocSetContextCreate(GetCurrentMemoryContext(),
"PerTupleContext", ALLOCSET_DEFAULT_SIZES);
if (per_tuple_memory_context)
MemoryContextSwitchTo(per_tuple_memory_context);
HeapTuple htup;
while (HeapTupleIsValid(htup = systable_getnext(scandesc)))
{
Expand All @@ -1736,8 +1742,15 @@ YBUpdateRelationsAttributes(bool sys_relations_update_required)
YbStartNewAttrProcessing(
&state, sys_relations_update_required, attrel, htup);
}
if (per_tuple_memory_context)
MemoryContextReset(per_tuple_memory_context);
}
YbCompleteAttrProcessing(&state);
if (per_tuple_memory_context)
{
MemoryContextSwitchTo(per_tuple_memory_context->parent);
MemoryContextDelete(per_tuple_memory_context);
}
systable_endscan(scandesc);
table_close(attrel, AccessShareLock);
}
Expand Down
30 changes: 30 additions & 0 deletions src/postgres/src/backend/utils/misc/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
/* Yugabyte includes */
#include "commands/copy.h"
#include "executor/ybcModifyTable.h"
#include "tcop/pquery.h"
#include "pg_yb_utils.h"

#ifndef PG_KRB_SRVTAB
Expand Down Expand Up @@ -582,6 +583,13 @@ static struct config_enum_entry recovery_init_sync_method_options[] = {
{NULL, 0, false}
};

const struct config_enum_entry yb_pg_batch_detection_mechanism_options[] = {
{"detect_by_peeking", DETECT_BY_PEEKING, false},
{"assume_all_batch_executions", ASSUME_ALL_BATCH_EXECUTIONS, false},
{"ignore_batch_delete_and_update_may_fail", IGNORE_BATCH_DELETE_AND_UPDATE_MAY_FAIL, false},
{NULL, 0, false}
};

static struct config_enum_entry shared_memory_options[] = {
#ifndef WIN32
{"sysv", SHMEM_TYPE_SYSV, false},
Expand Down Expand Up @@ -5720,6 +5728,28 @@ static struct config_enum ConfigureNamesEnum[] =
NULL, NULL, NULL
},

{
{"yb_pg_batch_detection_mechanism", PGC_SIGHUP, COMPAT_OPTIONS_CLIENT,
gettext_noop("The drivers use message protocol to communicate "
"with PG. The driver does not inform PG in advance "
"about a Batch execution. We need to identify a batch "
"because in that case the single-shard optimization "
"should be disabled. Postgres drivers pipeline "
"messages and we exploit this to peek the message "
"following 'Execute' to detect a batch. This may "
"lead to some unforeseen bugs, so this GUC provides "
"a way to disable the single-shard optimization "
"completely or go back to the behavior before "
"#16446 was fixed."),
NULL,
GUC_SUPERUSER_ONLY
},
&yb_pg_batch_detection_mechanism,
DETECT_BY_PEEKING,
yb_pg_batch_detection_mechanism_options,
NULL, NULL, NULL
},

/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL
Expand Down
Loading

0 comments on commit 963f869

Please sign in to comment.