Skip to content

Commit

Permalink
Merge "rdbms: Allow PostgreSQL schema-check functions to find tempora…
Browse files Browse the repository at this point in the history
…ry tables"
  • Loading branch information
jenkins-bot authored and Gerrit Code Review committed Apr 6, 2018
2 parents 9c85176 + 43d1f70 commit 9688e5a
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 116 deletions.
228 changes: 134 additions & 94 deletions includes/libs/rdbms/database/DatabasePostgres.php
Expand Up @@ -43,6 +43,8 @@ class DatabasePostgres extends Database {
private $connectString;
/** @var string */
private $coreSchema;
/** @var string */
private $tempSchema;
/** @var string[] Map of (reserved table name => alternate table name) */
private $keywordTableMap = [];

Expand Down Expand Up @@ -73,15 +75,17 @@ public function implicitOrderby() {
}

public function hasConstraint( $name ) {
$conn = $this->getBindingHandle();

$sql = "SELECT 1 FROM pg_catalog.pg_constraint c, pg_catalog.pg_namespace n " .
"WHERE c.connamespace = n.oid AND conname = '" .
pg_escape_string( $conn, $name ) . "' AND n.nspname = '" .
pg_escape_string( $conn, $this->getCoreSchema() ) . "'";
$res = $this->doQuery( $sql );

return $this->numRows( $res );
foreach ( $this->getCoreSchemas() as $schema ) {
$sql = "SELECT 1 FROM pg_catalog.pg_constraint c, pg_catalog.pg_namespace n " .
"WHERE c.connamespace = n.oid AND conname = " .
$this->addQuotes( $name ) . " AND n.nspname = " .
$this->addQuotes( $schema );
$res = $this->doQuery( $sql );
if ( $res && $this->numRows( $res ) ) {
return true;
}
}
return false;
}

public function open( $server, $user, $password, $dbName ) {
Expand Down Expand Up @@ -428,59 +432,65 @@ public function indexInfo( $table, $index, $fname = __METHOD__ ) {

public function indexAttributes( $index, $schema = false ) {
if ( $schema === false ) {
$schema = $this->getCoreSchema();
}
/*
* A subquery would be not needed if we didn't care about the order
* of attributes, but we do
*/
$sql = <<<__INDEXATTR__
SELECT opcname,
attname,
i.indoption[s.g] as option,
pg_am.amname
FROM
(SELECT generate_series(array_lower(isub.indkey,1), array_upper(isub.indkey,1)) AS g
FROM
pg_index isub
JOIN pg_class cis
ON cis.oid=isub.indexrelid
JOIN pg_namespace ns
ON cis.relnamespace = ns.oid
WHERE cis.relname='$index' AND ns.nspname='$schema') AS s,
pg_attribute,
pg_opclass opcls,
pg_am,
pg_class ci
JOIN pg_index i
ON ci.oid=i.indexrelid
JOIN pg_class ct
ON ct.oid = i.indrelid
JOIN pg_namespace n
ON ci.relnamespace = n.oid
WHERE
ci.relname='$index' AND n.nspname='$schema'
AND attrelid = ct.oid
AND i.indkey[s.g] = attnum
AND i.indclass[s.g] = opcls.oid
AND pg_am.oid = opcls.opcmethod
$schemas = $this->getCoreSchemas();
} else {
$schemas = [ $schema ];
}

$eindex = $this->addQuotes( $index );

foreach ( $schemas as $schema ) {
$eschema = $this->addQuotes( $schema );
/*
* A subquery would be not needed if we didn't care about the order
* of attributes, but we do
*/
$sql = <<<__INDEXATTR__
SELECT opcname,
attname,
i.indoption[s.g] as option,
pg_am.amname
FROM
(SELECT generate_series(array_lower(isub.indkey,1), array_upper(isub.indkey,1)) AS g
FROM
pg_index isub
JOIN pg_class cis
ON cis.oid=isub.indexrelid
JOIN pg_namespace ns
ON cis.relnamespace = ns.oid
WHERE cis.relname=$eindex AND ns.nspname=$eschema) AS s,
pg_attribute,
pg_opclass opcls,
pg_am,
pg_class ci
JOIN pg_index i
ON ci.oid=i.indexrelid
JOIN pg_class ct
ON ct.oid = i.indrelid
JOIN pg_namespace n
ON ci.relnamespace = n.oid
WHERE
ci.relname=$eindex AND n.nspname=$eschema
AND attrelid = ct.oid
AND i.indkey[s.g] = attnum
AND i.indclass[s.g] = opcls.oid
AND pg_am.oid = opcls.opcmethod
__INDEXATTR__;
$res = $this->query( $sql, __METHOD__ );
$a = [];
if ( $res ) {
foreach ( $res as $row ) {
$a[] = [
$row->attname,
$row->opcname,
$row->amname,
$row->option ];
$res = $this->query( $sql, __METHOD__ );
$a = [];
if ( $res ) {
foreach ( $res as $row ) {
$a[] = [
$row->attname,
$row->opcname,
$row->amname,
$row->option ];
}
return $a;
}
} else {
return null;
}

return $a;
return null;
}

public function indexUnique( $table, $index, $fname = __METHOD__ ) {
Expand Down Expand Up @@ -784,9 +794,9 @@ public function duplicateTableStructure(
}

public function listTables( $prefix = null, $fname = __METHOD__ ) {
$eschema = $this->addQuotes( $this->getCoreSchema() );
$eschemas = implode( ',', array_map( [ $this, 'addQuotes' ], $this->getCoreSchemas() ) );
$result = $this->query(
"SELECT tablename FROM pg_tables WHERE schemaname = $eschema", $fname );
"SELECT DISTINCT tablename FROM pg_tables WHERE schemaname IN ($eschemas)", $fname );
$endArray = [];

foreach ( $result as $table ) {
Expand Down Expand Up @@ -977,6 +987,29 @@ public function getCoreSchema() {
return $this->coreSchema;
}

/**
* Return schema names for temporary tables and core application tables
*
* @since 1.31
* @return string[] schema names
*/
public function getCoreSchemas() {
if ( $this->tempSchema ) {
return [ $this->tempSchema, $this->getCoreSchema() ];
}

$res = $this->query(
"SELECT nspname FROM pg_catalog.pg_namespace n WHERE n.oid = pg_my_temp_schema()", __METHOD__
);
$row = $this->fetchObject( $res );
if ( $row ) {
$this->tempSchema = $row->nspname;
return [ $this->tempSchema, $this->getCoreSchema() ];
}

return [ $this->getCoreSchema() ];
}

public function getServerVersion() {
if ( !isset( $this->numericVersion ) ) {
$conn = $this->getBindingHandle();
Expand Down Expand Up @@ -1009,18 +1042,24 @@ private function relationExists( $table, $types, $schema = false ) {
$types = [ $types ];
}
if ( $schema === false ) {
$schema = $this->getCoreSchema();
$schemas = $this->getCoreSchemas();
} else {
$schemas = [ $schema ];
}
$table = $this->realTableName( $table, 'raw' );
$etable = $this->addQuotes( $table );
$eschema = $this->addQuotes( $schema );
$sql = "SELECT 1 FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n "
. "WHERE c.relnamespace = n.oid AND c.relname = $etable AND n.nspname = $eschema "
. "AND c.relkind IN ('" . implode( "','", $types ) . "')";
$res = $this->query( $sql );
$count = $res ? $res->numRows() : 0;
foreach ( $schemas as $schema ) {
$eschema = $this->addQuotes( $schema );
$sql = "SELECT 1 FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n "
. "WHERE c.relnamespace = n.oid AND c.relname = $etable AND n.nspname = $eschema "
. "AND c.relkind IN ('" . implode( "','", $types ) . "')";
$res = $this->query( $sql );
if ( $res && $res->numRows() ) {
return true;
}
}

return (bool)$count;
return false;
}

/**
Expand All @@ -1045,48 +1084,49 @@ public function triggerExists( $table, $trigger ) {
AND tgrelid=pg_class.oid
AND nspname=%s AND relname=%s AND tgname=%s
SQL;
$res = $this->query(
sprintf(
$q,
$this->addQuotes( $this->getCoreSchema() ),
$this->addQuotes( $table ),
$this->addQuotes( $trigger )
)
);
if ( !$res ) {
return null;
foreach ( $this->getCoreSchemas() as $schema ) {
$res = $this->query(
sprintf(
$q,
$this->addQuotes( $schema ),
$this->addQuotes( $table ),
$this->addQuotes( $trigger )
)
);
if ( $res && $res->numRows() ) {
return true;
}
}
$rows = $res->numRows();

return $rows;
return false;
}

public function ruleExists( $table, $rule ) {
$exists = $this->selectField( 'pg_rules', 'rulename',
[
'rulename' => $rule,
'tablename' => $table,
'schemaname' => $this->getCoreSchema()
'schemaname' => $this->getCoreSchemas()
]
);

return $exists === $rule;
}

public function constraintExists( $table, $constraint ) {
$sql = sprintf( "SELECT 1 FROM information_schema.table_constraints " .
"WHERE constraint_schema = %s AND table_name = %s AND constraint_name = %s",
$this->addQuotes( $this->getCoreSchema() ),
$this->addQuotes( $table ),
$this->addQuotes( $constraint )
);
$res = $this->query( $sql );
if ( !$res ) {
return null;
foreach ( $this->getCoreSchemas() as $schema ) {
$sql = sprintf( "SELECT 1 FROM information_schema.table_constraints " .
"WHERE constraint_schema = %s AND table_name = %s AND constraint_name = %s",
$this->addQuotes( $schema ),
$this->addQuotes( $table ),
$this->addQuotes( $constraint )
);
$res = $this->query( $sql );
if ( $res && $res->numRows() ) {
return true;
}
}
$rows = $res->numRows();

return $rows;
return false;
}

/**
Expand Down
48 changes: 26 additions & 22 deletions includes/libs/rdbms/field/PostgresField.php
Expand Up @@ -38,30 +38,34 @@ static function fromText( DatabasePostgres $db, $table, $field ) {
SQL;

$table = $db->remappedTableName( $table );
$res = $db->query(
sprintf( $q,
$db->addQuotes( $db->getCoreSchema() ),
$db->addQuotes( $table ),
$db->addQuotes( $field )
)
);
$row = $db->fetchObject( $res );
if ( !$row ) {
return null;
foreach ( $db->getCoreSchemas() as $schema ) {
$res = $db->query(
sprintf( $q,
$db->addQuotes( $schema ),
$db->addQuotes( $table ),
$db->addQuotes( $field )
)
);
$row = $db->fetchObject( $res );
if ( !$row ) {
continue;
}
$n = new PostgresField;
$n->type = $row->typname;
$n->nullable = ( $row->attnotnull == 'f' );
$n->name = $field;
$n->tablename = $table;
$n->max_length = $row->attlen;
$n->deferrable = ( $row->deferrable == 't' );
$n->deferred = ( $row->deferred == 't' );
$n->conname = $row->conname;
$n->has_default = ( $row->atthasdef === 't' );
$n->default = $row->adsrc;

return $n;
}
$n = new PostgresField;
$n->type = $row->typname;
$n->nullable = ( $row->attnotnull == 'f' );
$n->name = $field;
$n->tablename = $table;
$n->max_length = $row->attlen;
$n->deferrable = ( $row->deferrable == 't' );
$n->deferred = ( $row->deferred == 't' );
$n->conname = $row->conname;
$n->has_default = ( $row->atthasdef === 't' );
$n->default = $row->adsrc;

return $n;
return null;
}

function name() {
Expand Down

0 comments on commit 9688e5a

Please sign in to comment.