Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock when archiving and avoid invalidating sites that have archiving in progress #15272

Merged
merged 9 commits into from Dec 24, 2019
2 changes: 2 additions & 0 deletions config/global.php
Expand Up @@ -216,4 +216,6 @@
'archiving.performance.logger' => null,

\Piwik\CronArchive\Performance\Logger::class => DI\object()->constructorParameter('logger', DI\get('archiving.performance.logger')),

\Piwik\Concurrency\LockBackend::class => \DI\get(\Piwik\Concurrency\LockBackend\MySqlLockBackend::class),
);
17 changes: 16 additions & 1 deletion core/Archive/ArchiveInvalidator.php
Expand Up @@ -10,6 +10,7 @@
namespace Piwik\Archive;

use Piwik\Archive\ArchiveInvalidator\InvalidationResult;
use Piwik\ArchiveProcessor\ArchivingStatus;
use Piwik\CronArchive\SitesToReprocessDistributedList;
use Piwik\DataAccess\ArchiveTableCreator;
use Piwik\DataAccess\Model;
Expand Down Expand Up @@ -54,9 +55,15 @@ class ArchiveInvalidator
*/
private $model;

public function __construct(Model $model)
/**
* @var ArchivingStatus
*/
private $archivingStatus;

public function __construct(Model $model, ArchivingStatus $archivingStatus)
{
$this->model = $model;
$this->archivingStatus = $archivingStatus;
}

public function rememberToInvalidateArchivedReportsLater($idSite, Date $date)
Expand Down Expand Up @@ -207,6 +214,8 @@ public function markArchivesAsInvalidated(array $idSites, array $dates, $period,
}

$periodDates = $this->getUniqueDates($periodDates);

$idSites = $this->removeSitesWithInProgressArchiving($idSites);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@diosmosis not sure I understand why we would filter the idSites here and not just invalidate all of them? Is this because we assume they were already invalidated / will be invalidated? Would there be any harm in still invalidating them or the issue that this would cause the remember flag to be removed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder maybe we could still invalidate archives for the other idSites as well just not remove the remember flag?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We remove them because we know they are currently being archived. We also keep the idSites in the remember flags so they will eventually be invalidated when they are not being archiving.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering if there could be meanwhile some other previously finished archives for those idSites that need to be invalidated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be, but because of the way the function parameters are (idSites + list of dates + period type), it's a bit harder to control exactly what to invalidate. We'd have to loop over idSites & dates and check each individually, and invalidate each archive individually instead of with one query (which is what we do now I think).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@diosmosis it is very important we respect these configured values. We basically need to check if the last archive was done more than the configured time ago even if the archive is invalid (unless the archive is invalid and it is a new day I think then we don't respect these values but not sure? @mattab )

and keep the locks but don't do anything different in ArchiveInvalidator?
I'm not quite fully understand this part but be good in general to maybe still invalidate an archive in ArchiveInvalidator? Not sure of consequences of not using Locks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure of consequences of not using Locks?

It's just an easy way to check if an archive is on-going no matter what machine/process it's on. It's not really necessary, but it might be useful in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reckon be good to keep it 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just an easy way to check if an archive is on-going no matter what machine/process

@diosmosis it would be big value if it would help with this issue: #8444 -> maybe it would be fixed?

unless the archive is invalid and it is a new day I think then we don't respect these values but not sure? @mattab

correct, these INI settings are only used when the date range includes Today.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wanting to make sure we're not going to work on #8444 as part of this issue :)

$this->markArchivesInvalidated($idSites, $periodDates, $segment);

$yearMonths = array_keys($periodDates);
Expand Down Expand Up @@ -427,4 +436,10 @@ private function markInvalidatedArchivesForReprocessAndPurge(array $idSites, $ye
$archivesToPurge = new ArchivesToPurgeDistributedList();
$archivesToPurge->add($yearMonths);
}

private function removeSitesWithInProgressArchiving($idSites)
{
$idSitesArchiving = $this->archivingStatus->getSitesCurrentlyArchiving();
return array_diff($idSites, $idSitesArchiving);
}
}
107 changes: 107 additions & 0 deletions core/ArchiveProcessor/ArchivingStatus.php
@@ -0,0 +1,107 @@
<?php
/**
* Piwik - free/libre analytics platform
*
* @link https://matomo.org
* @license http://www.gnu.org/licenses/gpl-3.0.html GPL v3 or later
*
*/

namespace Piwik\ArchiveProcessor;

use Piwik\Concurrency\Lock;
use Piwik\Concurrency\LockBackend;
use Piwik\Container\StaticContainer;
use Piwik\SettingsPiwik;

class ArchivingStatus
{
const LOCK_KEY_PREFIX = 'Archiving';
const DEFAULT_ARCHIVING_TTL = 7200; // 2 hours

/**
* @var LockBackend
*/
private $lockBackend;

/**
* @var int
*/
private $archivingTTLSecs;

/**
* @var Lock[]
*/
private $lockStack = [];

public function __construct(LockBackend $lockBackend, $archivingTTLSecs = self::DEFAULT_ARCHIVING_TTL)
{
$this->lockBackend = $lockBackend;
$this->archivingTTLSecs = $archivingTTLSecs;
}

public function archiveStarted(Parameters $params)
{
$lock = $this->makeArchivingLock($params);
$lock->acquireLock($this->getInstanceProcessId(), $this->archivingTTLSecs);
array_push($this->lockStack, $lock);
}

public function archiveFinished()
{
$lock = array_pop($this->lockStack);
$lock->unlock();
}

public function getCurrentArchivingLock()
{
if (empty($this->lockStack)) {
return null;
}
return end($this->lockStack);
}

public function getSitesCurrentlyArchiving()
{
$lockMeta = new Lock($this->lockBackend, self::LOCK_KEY_PREFIX . '.');
$acquiredLocks = $lockMeta->getAllAcquiredLockKeys();

$sitesCurrentlyArchiving = [];
foreach ($acquiredLocks as $lockKey) {
$parts = explode('.', $lockKey);
if (!isset($parts[1])) {
continue;
}
$sitesCurrentlyArchiving[] = (int) $parts[1];
}
$sitesCurrentlyArchiving = array_unique($sitesCurrentlyArchiving);
$sitesCurrentlyArchiving = array_values($sitesCurrentlyArchiving);

return $sitesCurrentlyArchiving;
}

/**
* @return Lock
*/
private function makeArchivingLock(Parameters $params)
{
$doneFlag = Rules::getDoneStringFlagFor([$params->getSite()->getId()], $params->getSegment(),
$params->getPeriod()->getLabel(), $params->getRequestedPlugin());

$lockKeyParts = [
self::LOCK_KEY_PREFIX,
$params->getSite()->getId(),

// md5 to keep it within the 70 char limit in the table
md5($params->getPeriod()->getId() . $params->getPeriod()->getRangeString() . $doneFlag),
];

$lockKeyPrefix = implode('.', $lockKeyParts);
return new Lock(StaticContainer::get(LockBackend::class), $lockKeyPrefix, $this->archivingTTLSecs);
}

private function getInstanceProcessId()
{
return SettingsPiwik::getPiwikInstanceId() . '.' . getmypid();
}
}
21 changes: 17 additions & 4 deletions core/ArchiveProcessor/Loader.php
Expand Up @@ -12,12 +12,16 @@
use Piwik\Cache;
use Piwik\CacheId;
use Piwik\Common;
use Piwik\Concurrency\Lock;
use Piwik\Concurrency\LockBackend;
use Piwik\Config;
use Piwik\Container\StaticContainer;
use Piwik\Context;
use Piwik\DataAccess\ArchiveSelector;
use Piwik\Date;
use Piwik\Period;
use Piwik\Piwik;
use Piwik\SettingsPiwik;

/**
* This class uses PluginsArchiver class to trigger data aggregation and create archives.
Expand Down Expand Up @@ -73,12 +77,21 @@ private function prepareArchiveImpl($pluginName)
return $idArchive;
}

list($visits, $visitsConverted) = $this->prepareCoreMetricsArchive($visits, $visitsConverted);
list($idArchive, $visits) = $this->prepareAllPluginsArchive($visits, $visitsConverted);
/** @var ArchivingStatus $archivingStatus */
$archivingStatus = StaticContainer::get(ArchivingStatus::class);
$archivingStatus->archiveStarted($this->params);

if ($this->isThereSomeVisits($visits) || PluginsArchiver::doesAnyPluginArchiveWithoutVisits()) {
return $idArchive;
try {
list($visits, $visitsConverted) = $this->prepareCoreMetricsArchive($visits, $visitsConverted);
list($idArchive, $visits) = $this->prepareAllPluginsArchive($visits, $visitsConverted);

if ($this->isThereSomeVisits($visits) || PluginsArchiver::doesAnyPluginArchiveWithoutVisits()) {
return $idArchive;
}
} finally {
$archivingStatus->archiveFinished();
}

return false;
}

Expand Down
9 changes: 8 additions & 1 deletion core/Concurrency/Lock.php
Expand Up @@ -21,12 +21,19 @@ class Lock

private $lockKey = null;
private $lockValue = null;
private $defaultTtl = null;

public function __construct(LockBackend $backend, $lockKeyStart)
public function __construct(LockBackend $backend, $lockKeyStart, $defaultTtl = null)
{
$this->backend = $backend;
$this->lockKeyStart = $lockKeyStart;
$this->lockKey = $this->lockKeyStart;
$this->defaultTtl = $defaultTtl;
}

public function reexpireLock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be maybe reExpireLock? Not too important though...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it looks weird to me too. But so did reExpireLock and I couldn't choose :)

{
$this->expireLock($this->defaultTtl);
}

public function getNumberOfAcquiredLocks()
Expand Down
12 changes: 6 additions & 6 deletions core/Concurrency/LockBackend.php
Expand Up @@ -12,15 +12,15 @@
interface LockBackend
{
/**
* TODO
* Returns lock keys matching a pattern.
*
* @param $pattern
* @return mixed
* @return string[]
*/
public function getKeysMatchingPattern($pattern);

/**
* TODO
* Set a key value if the key is not already set.
*
* @param $lockKey
* @param $lockValue
Expand All @@ -30,15 +30,15 @@ public function getKeysMatchingPattern($pattern);
public function setIfNotExists($lockKey, $lockValue, $ttlInSeconds);

/**
* TODO
* Get the lock value for a key if any.
*
* @param $lockKey
* @return mixed
*/
public function get($lockKey);

/**
* TODO
* Delete the lock with key = $lockKey if the lock has the given value.
*
* @param $lockKey
* @param $lockValue
Expand All @@ -47,7 +47,7 @@ public function get($lockKey);
public function deleteIfKeyHasValue($lockKey, $lockValue);

/**
* TODO
* Update expiration for a lock if the lock with the specified key has the given value.
*
* @param $lockKey
* @param $lockValue
Expand Down
107 changes: 107 additions & 0 deletions core/DataAccess/ArchivingDbAdapter.php
@@ -0,0 +1,107 @@
<?php
/**
* Piwik - free/libre analytics platform
*
* @link https://matomo.org
* @license http://www.gnu.org/licenses/gpl-3.0.html GPL v3 or later
*
*/

namespace Piwik\DataAccess;

use Piwik\Concurrency\Lock;
use Piwik\Db\AdapterInterface;
use Psr\Log\LoggerInterface;

class ArchivingDbAdapter
{
/**
* @var AdapterInterface|\Zend_Db_Adapter_Abstract
*/
private $wrapped;

/**
* @var Lock
*/
private $archivingLock;

/**
* @var LoggerInterface
*/
private $logger;

public function __construct($wrapped, Lock $archivingLock = null, LoggerInterface $logger = null)
{
$this->wrapped = $wrapped;
$this->archivingLock = $archivingLock;
$this->logger = $logger;
}

public function __call($name, $arguments)
{
return call_user_func_array([$this->wrapped, $name], $arguments);
}

public function exec($sql)
{
$this->reexpireLock();
$this->logSql($sql);

return call_user_func_array([$this->wrapped, __FUNCTION__], func_get_args());
}

public function query($sql)
{
$this->reexpireLock();
$this->logSql($sql);

return call_user_func_array([$this->wrapped, __FUNCTION__], func_get_args());
}

public function fetchAll($sql)
{
$this->reexpireLock();
$this->logSql($sql);

return call_user_func_array([$this->wrapped, __FUNCTION__], func_get_args());
}

public function fetchRow($sql)
{
$this->reexpireLock();
$this->logSql($sql);

return call_user_func_array([$this->wrapped, __FUNCTION__], func_get_args());
}

public function fetchOne($sql)
{
$this->reexpireLock();
$this->logSql($sql);

return call_user_func_array([$this->wrapped, __FUNCTION__], func_get_args());
}

public function fetchAssoc($sql)
{
$this->reexpireLock();
$this->logSql($sql);

return call_user_func_array([$this->wrapped, __FUNCTION__], func_get_args());
}

private function logSql($sql)
{
// Log on DEBUG level all SQL archiving queries
if ($this->logger) {
$this->logger->debug($sql);
}
}

private function reexpireLock()
{
if ($this->archivingLock) {
$this->archivingLock->reexpireLock();
}
}
}