Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock when archiving and avoid invalidating sites that have archiving in progress #15272

Merged
merged 9 commits into from Dec 24, 2019
2 changes: 1 addition & 1 deletion config/global.php
Expand Up @@ -217,5 +217,5 @@

\Piwik\CronArchive\Performance\Logger::class => DI\object()->constructorParameter('logger', DI\get('archiving.performance.logger')),

'Piwik\Concurrency\LockBackend' => DI\object(\Piwik\Concurrency\LockBackend\MySqlLockBackend::class)
\Piwik\Concurrency\LockBackend::class => \DI\get(\Piwik\Concurrency\LockBackend\MySqlLockBackend::class),
);
10 changes: 9 additions & 1 deletion core/Archive/ArchiveInvalidator.php
Expand Up @@ -10,6 +10,7 @@
namespace Piwik\Archive;

use Piwik\Archive\ArchiveInvalidator\InvalidationResult;
use Piwik\ArchiveProcessor\ArchivingStatus;
use Piwik\CronArchive\SitesToReprocessDistributedList;
use Piwik\DataAccess\ArchiveTableCreator;
use Piwik\DataAccess\Model;
Expand Down Expand Up @@ -54,9 +55,15 @@ class ArchiveInvalidator
*/
private $model;

public function __construct(Model $model)
/**
* @var ArchivingStatus
*/
private $archivingStatus;

public function __construct(Model $model, ArchivingStatus $archivingStatus)
{
$this->model = $model;
$this->archivingStatus = $archivingStatus;
}

public function rememberToInvalidateArchivedReportsLater($idSite, Date $date)
Expand Down Expand Up @@ -207,6 +214,7 @@ public function markArchivesAsInvalidated(array $idSites, array $dates, $period,
}

$periodDates = $this->getUniqueDates($periodDates);

$this->markArchivesInvalidated($idSites, $periodDates, $segment);

$yearMonths = array_keys($periodDates);
Expand Down
107 changes: 107 additions & 0 deletions core/ArchiveProcessor/ArchivingStatus.php
@@ -0,0 +1,107 @@
<?php
/**
* Piwik - free/libre analytics platform
*
* @link https://matomo.org
* @license http://www.gnu.org/licenses/gpl-3.0.html GPL v3 or later
*
*/

namespace Piwik\ArchiveProcessor;

use Piwik\Concurrency\Lock;
use Piwik\Concurrency\LockBackend;
use Piwik\Container\StaticContainer;
use Piwik\SettingsPiwik;

class ArchivingStatus
{
const LOCK_KEY_PREFIX = 'Archiving';
const DEFAULT_ARCHIVING_TTL = 7200; // 2 hours

/**
* @var LockBackend
*/
private $lockBackend;

/**
* @var int
*/
private $archivingTTLSecs;

/**
* @var Lock[]
*/
private $lockStack = [];

public function __construct(LockBackend $lockBackend, $archivingTTLSecs = self::DEFAULT_ARCHIVING_TTL)
{
$this->lockBackend = $lockBackend;
$this->archivingTTLSecs = $archivingTTLSecs;
}

public function archiveStarted(Parameters $params)
{
$lock = $this->makeArchivingLock($params);
$lock->acquireLock($this->getInstanceProcessId(), $this->archivingTTLSecs);
array_push($this->lockStack, $lock);
}

public function archiveFinished()
{
$lock = array_pop($this->lockStack);
$lock->unlock();
}

public function getCurrentArchivingLock()
{
if (empty($this->lockStack)) {
return null;
}
return end($this->lockStack);
}

public function getSitesCurrentlyArchiving()
{
$lockMeta = new Lock($this->lockBackend, self::LOCK_KEY_PREFIX . '.');
$acquiredLocks = $lockMeta->getAllAcquiredLockKeys();

$sitesCurrentlyArchiving = [];
foreach ($acquiredLocks as $lockKey) {
$parts = explode('.', $lockKey);
if (!isset($parts[1])) {
continue;
}
$sitesCurrentlyArchiving[] = (int) $parts[1];
}
$sitesCurrentlyArchiving = array_unique($sitesCurrentlyArchiving);
$sitesCurrentlyArchiving = array_values($sitesCurrentlyArchiving);

return $sitesCurrentlyArchiving;
}

/**
* @return Lock
*/
private function makeArchivingLock(Parameters $params)
{
$doneFlag = Rules::getDoneStringFlagFor([$params->getSite()->getId()], $params->getSegment(),
$params->getPeriod()->getLabel(), $params->getRequestedPlugin());

$lockKeyParts = [
self::LOCK_KEY_PREFIX,
$params->getSite()->getId(),

// md5 to keep it within the 70 char limit in the table
md5($params->getPeriod()->getId() . $params->getPeriod()->getRangeString() . $doneFlag),
];

$lockKeyPrefix = implode('.', $lockKeyParts);
return new Lock(StaticContainer::get(LockBackend::class), $lockKeyPrefix, $this->archivingTTLSecs);
}

private function getInstanceProcessId()
{
return SettingsPiwik::getPiwikInstanceId() . '.' . getmypid();
}
}
45 changes: 37 additions & 8 deletions core/ArchiveProcessor/Loader.php
Expand Up @@ -8,15 +8,12 @@
*/
namespace Piwik\ArchiveProcessor;

use Piwik\Archive;
use Piwik\Cache;
use Piwik\CacheId;
use Piwik\Common;
use Piwik\Config;
use Piwik\Container\StaticContainer;
use Piwik\Context;
use Piwik\DataAccess\ArchiveSelector;
use Piwik\Date;
use Piwik\Period;
use Piwik\Piwik;

/**
Expand Down Expand Up @@ -73,12 +70,21 @@ private function prepareArchiveImpl($pluginName)
return $idArchive;
}

list($visits, $visitsConverted) = $this->prepareCoreMetricsArchive($visits, $visitsConverted);
list($idArchive, $visits) = $this->prepareAllPluginsArchive($visits, $visitsConverted);
/** @var ArchivingStatus $archivingStatus */
$archivingStatus = StaticContainer::get(ArchivingStatus::class);
$archivingStatus->archiveStarted($this->params);

try {
list($visits, $visitsConverted) = $this->prepareCoreMetricsArchive($visits, $visitsConverted);
list($idArchive, $visits) = $this->prepareAllPluginsArchive($visits, $visitsConverted);
} finally {
$archivingStatus->archiveFinished();
}

if ($this->isThereSomeVisits($visits) || PluginsArchiver::doesAnyPluginArchiveWithoutVisits()) {
return $idArchive;
}

return false;
}

Expand Down Expand Up @@ -158,17 +164,20 @@ protected function isArchivingForcedToTrigger()
* Returns the idArchive if the archive is available in the database for the requested plugin.
* Returns false if the archive needs to be processed.
*
* (public for tests)
*
* @return array
*/
protected function loadExistingArchiveIdFromDb()
public function loadExistingArchiveIdFromDb()
{
$noArchiveFound = array(false, false, false);

if ($this->isArchivingForcedToTrigger()) {
return $noArchiveFound;
}

$idAndVisits = ArchiveSelector::getArchiveIdAndVisits($this->params);
$minDatetimeArchiveProcessedUTC = $this->getMinTimeArchiveProcessed();
$idAndVisits = ArchiveSelector::getArchiveIdAndVisits($this->params, $minDatetimeArchiveProcessedUTC);

if (!$idAndVisits) {
return $noArchiveFound;
Expand All @@ -177,6 +186,26 @@ protected function loadExistingArchiveIdFromDb()
return $idAndVisits;
}

/**
* Returns the minimum archive processed datetime to look at. Only public for tests.
*
* @return int|bool Datetime timestamp, or false if must look at any archive available
*/
protected function getMinTimeArchiveProcessed()
{
$endDateTimestamp = self::determineIfArchivePermanent($this->params->getDateEnd());
if ($endDateTimestamp) {
// past archive
return $endDateTimestamp;
}
$dateStart = $this->params->getDateStart();
$period = $this->params->getPeriod();
$segment = $this->params->getSegment();
$site = $this->params->getSite();
// in-progress archive
return Rules::getMinTimeProcessedForInProgressArchive($dateStart, $period, $segment, $site);
}

protected static function determineIfArchivePermanent(Date $dateEnd)
{
$now = time();
Expand Down
2 changes: 1 addition & 1 deletion core/ArchiveProcessor/Rules.php
Expand Up @@ -112,7 +112,7 @@ public static function getDoneFlags(array $plugins, Segment $segment)
return $doneFlags;
}

public static function getMinTimeProcessedForTemporaryArchive(
public static function getMinTimeProcessedForInProgressArchive(
Date $dateStart, \Piwik\Period $period, Segment $segment, Site $site)
{
$todayArchiveTimeToLive = self::getPeriodArchiveTimeToLiveDefault($period->getLabel());
Expand Down
9 changes: 8 additions & 1 deletion core/Concurrency/Lock.php
Expand Up @@ -23,12 +23,19 @@ class Lock

private $lockKey = null;
private $lockValue = null;
private $defaultTtl = null;

public function __construct(LockBackend $backend, $lockKeyStart)
public function __construct(LockBackend $backend, $lockKeyStart, $defaultTtl = null)
{
$this->backend = $backend;
$this->lockKeyStart = $lockKeyStart;
$this->lockKey = $this->lockKeyStart;
$this->defaultTtl = $defaultTtl;
}

public function reexpireLock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be maybe reExpireLock? Not too important though...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it looks weird to me too. But so did reExpireLock and I couldn't choose :)

{
$this->expireLock($this->defaultTtl);
}

public function getNumberOfAcquiredLocks()
Expand Down
12 changes: 6 additions & 6 deletions core/Concurrency/LockBackend.php
Expand Up @@ -12,15 +12,15 @@
interface LockBackend
{
/**
* TODO
* Returns lock keys matching a pattern.
*
* @param $pattern
* @return mixed
* @return string[]
*/
public function getKeysMatchingPattern($pattern);

/**
* TODO
* Set a key value if the key is not already set.
*
* @param $lockKey
* @param $lockValue
Expand All @@ -30,15 +30,15 @@ public function getKeysMatchingPattern($pattern);
public function setIfNotExists($lockKey, $lockValue, $ttlInSeconds);

/**
* TODO
* Get the lock value for a key if any.
*
* @param $lockKey
* @return mixed
*/
public function get($lockKey);

/**
* TODO
* Delete the lock with key = $lockKey if the lock has the given value.
*
* @param $lockKey
* @param $lockValue
Expand All @@ -47,7 +47,7 @@ public function get($lockKey);
public function deleteIfKeyHasValue($lockKey, $lockValue);

/**
* TODO
* Update expiration for a lock if the lock with the specified key has the given value.
*
* @param $lockKey
* @param $lockValue
Expand Down
7 changes: 6 additions & 1 deletion core/DataAccess/ArchiveSelector.php
Expand Up @@ -61,14 +61,19 @@ public static function getArchiveIdAndVisits(ArchiveProcessor\Parameters $params

$numericTable = ArchiveTableCreator::getNumericTable($dateStart);

$minDatetimeIsoArchiveProcessedUTC = null;
if ($minDatetimeArchiveProcessedUTC) {
$minDatetimeIsoArchiveProcessedUTC = Date::factory($minDatetimeArchiveProcessedUTC)->getDatetime();
}

$requestedPlugin = $params->getRequestedPlugin();
$segment = $params->getSegment();
$plugins = array("VisitsSummary", $requestedPlugin);

$doneFlags = Rules::getDoneFlags($plugins, $segment);
$doneFlagValues = Rules::getSelectableDoneFlagValues();

$results = self::getModel()->getArchiveIdAndVisits($numericTable, $idSite, $period, $dateStartIso, $dateEndIso, $doneFlags, $doneFlagValues);
$results = self::getModel()->getArchiveIdAndVisits($numericTable, $idSite, $period, $dateStartIso, $dateEndIso, $minDatetimeIsoArchiveProcessedUTC, $doneFlags, $doneFlagValues);

if (empty($results)) {
return false;
Expand Down