Skip to content

Commit

Permalink
Don't update the index if it hasn't changed
Browse files Browse the repository at this point in the history
If the document doesn't change then don't push a new copy of it into the
index.  Elasticsearch doesn't do this by default and updates are very
expensive.  This should prevent any that aren't really required.  I have an
instinct that says this is a pretty significant fraction of updates at WMF.
Instinct only, though.  We graph it in ganglia so we can be sure if it is
worth it.

Requires ruflin/Elastica#629 or we'll lose retry
on conflict support.

Change-Id: I533e08ac15d5e48d1de8b36e3b86a419f339be38
  • Loading branch information
Nik Everett committed Jun 17, 2014
1 parent 2f69736 commit 1c76661
Showing 1 changed file with 48 additions and 16 deletions.
64 changes: 48 additions & 16 deletions includes/Updater.php
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,18 @@ public function updatePages( $pages, $shardTimeout, $clientSideTimeout, $flags )

OtherIndexJob::queueIfRequired( $this->pagesToTitles( $pages ), true );

$allDocuments = array_fill_keys( Connection::getAllIndexTypes(), array() );
$allData = array_fill_keys( Connection::getAllIndexTypes(), array() );
foreach ( $this->buildDocumentsForPages( $pages, $flags ) as $document ) {
$suffix = Connection::getIndexSuffixForNamespace( $document->get( 'namespace' ) );
$allDocuments[$suffix][] = $document;
$allData[$suffix][] = $this->docToScript( $document );
// To quickly switch back to sending doc as upsert instead of script, remove the line above
// and switch to the one below:
// $allData[$suffix][] = $document;
}
$count = 0;
foreach( $allDocuments as $indexType => $documents ) {
$this->sendDocuments( $indexType, $documents, $shardTimeout );
$count += count( $documents );
foreach( $allData as $indexType => $data ) {
$this->sendData( $indexType, $data, $shardTimeout );
$count += count( $data );
}

return $count;
Expand Down Expand Up @@ -212,16 +215,16 @@ public function deletePages( $titles, $ids, $clientSideTimeout = null, $indexTyp
}

/**
* @param string $indexType type of index to which to send $documents
* @param array $documents documents to send
* @param string $indexType type of index to which to send $data
* @param array(\Elastica\Script or \Elastica\Document) $data documents to send
* @param null|string $shardTimeout How long should elaticsearch wait for an offline
* shard. Defaults to null, meaning don't wait. Null is more efficient when sending
* multiple pages because Cirrus will use Elasticsearch's bulk API. Timeout is in
* Elasticsearch's time format.
* @return bool True if nothing happened or we successfully indexed, false on failure
*/
private function sendDocuments( $indexType, $documents, $shardTimeout ) {
$documentCount = count( $documents );
private function sendData( $indexType, $data, $shardTimeout ) {
$documentCount = count( $data );
if ( $documentCount === 0 ) {
return true;
}
Expand All @@ -237,8 +240,7 @@ private function sendDocuments( $indexType, $documents, $shardTimeout ) {
$bulk->setShardTimeout( $shardTimeout );
}
$bulk->setType( $pageType );
// addDocuments (notice plural) is the bulk api
$bulk->addDocuments( $documents );
$bulk->addData( $data, 'update' );
$bulk->send();
} catch ( \Elastica\Exception\Bulk\ResponseException $e ) {
if ( !$this->bulkResponseExceptionIsJustDocumentMissing( $e,
Expand All @@ -253,9 +255,9 @@ private function sendDocuments( $indexType, $documents, $shardTimeout ) {
return true;
} else {
$this->failure( $exception );
$documentIds = array_map( function( $doc ) {
return $doc->getId();
}, $documents );
$documentIds = array_map( function( $d ) {
return $d->getId();
}, $data );
wfDebugLog( 'CirrusSearchChangeFailed', 'Update for doc ids: ' .
implode( ',', $documentIds ) . '; error message was: ' . $exception->getMessage() );
return false;
Expand Down Expand Up @@ -291,7 +293,6 @@ private function buildDocumentsForPages( $pages, $flags ) {
) );
// Everything as sent as an update to prevent overwriting fields maintained in other processes like
// addLocalSiteToOtherIndex and removeLocalSiteFromOtherIndex.
$doc->setOpType( 'update' );
// But we need a way to index documents that don't already exist. We're willing to upsert any full
// documents or any documents that we've been explicitly told it is ok to index when they aren't full.
// This is typically just done during the first phase of the initial index build.
Expand All @@ -300,7 +301,6 @@ private function buildDocumentsForPages( $pages, $flags ) {
// regular types or lists of objects and lists are overwritten.
$doc->setDocAsUpsert( $fullDocument || $indexOnSkip );
$doc->setRetryOnConflict( $wgCirrusSearchUpdateConflictRetryCount );
$documents[] = $doc;
wfProfileOut( __METHOD__ . '-basic' );

if ( !$skipParse ) {
Expand Down Expand Up @@ -332,13 +332,45 @@ private function buildDocumentsForPages( $pages, $flags ) {
if ( !$skipLinks ) {
wfRunHooks( 'CirrusSearchBuildDocumentLinks', array( $doc, $title ) );
}

$documents[] = $doc;
}

wfRunHooks( 'CirrusSearchBuildDocumentFinishBatch', array( $pages ) );

return $documents;
}

private function docToScript( $doc ) {
$scriptText = <<<MVEL
changed = false;
MVEL;
$params = $doc->getParams();
foreach ( $doc->getData() as $key => $value ) {
$scriptText .= <<<MVEL
if ( ctx._source.$key != $key ) {
changed = true;
ctx._source.$key = $key;
}
MVEL;
$params[ $key ] = $value;
}
$scriptText .= <<<MVEL
if ( !changed ) {
ctx.op = "none";
}
MVEL;
$script = new \Elastica\Script( $scriptText, $params );
if ( $doc->getDocAsUpsert() ) {
$script->setUpsert( $doc );
}

return $script;
}

/**
* Fetch page's content and parser output, using the parser cache if we can
*
Expand Down

0 comments on commit 1c76661

Please sign in to comment.