Skip to content

Commit

Permalink
fixes for issue #98 - make microtime usage super consistent
Browse files Browse the repository at this point in the history
  • Loading branch information
mmucklo committed Jul 12, 2019
1 parent e3298a7 commit 32f3de9
Show file tree
Hide file tree
Showing 18 changed files with 102 additions and 53 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,5 @@
### 5.1.0
* Issue #98 fix microtime issues in locales that do not use '.' for the decimal point.
### 5.0.1
* Pulling in a Query Builder fix from a fork https://github.com/quitoque/QueueBundle/commit/00f788e84765df2dcb81a19907463dd8eace566d
### 5.0.0
Expand Down
2 changes: 1 addition & 1 deletion DependencyInjection/Compiler/WorkerCompilerPass.php
Expand Up @@ -168,7 +168,7 @@ protected function addLiveJobs(ContainerBuilder $container)
/**
* @param $managerType
*
* @return null|string
* @return string|null
*/
protected function getDirectory($managerType)
{
Expand Down
2 changes: 1 addition & 1 deletion DependencyInjection/Configuration.php
Expand Up @@ -233,7 +233,7 @@ protected function addPriority()
// BC layer for symfony/config 4.1 and older
$rootNode = $treeBuilder->root('priority');
}
$rootNode
$rootNode
->addDefaultsIfNotSet()
->children()
->integerNode('max')
Expand Down
2 changes: 1 addition & 1 deletion Doctrine/DoctrineRunManager.php
Expand Up @@ -39,7 +39,7 @@ public function getRepository()
}

/**
* @return null|string
* @return string|null
*/
public function getRunArchiveClass()
{
Expand Down
15 changes: 5 additions & 10 deletions Manager/RunManager.php
Expand Up @@ -68,9 +68,10 @@ public function recordHeartbeat(Run $run, $start, Job $job = null)
$jobId = $job->getId();
}

$run->setLastHeartbeatAt(Util::getMicrotimeDateTime());
$heartbeat = microtime(true);
$run->setLastHeartbeatAt(Util::getMicrotimeFloatDateTime($heartbeat));
$run->setCurrentJobId($jobId);
$run->setElapsed(microtime(true) - $start);
$run->setElapsed($heartbeat - $start);
$this->persistRun($run);
}

Expand Down Expand Up @@ -107,10 +108,7 @@ public function runStart($start, $maxCount = null, $duration = null, $processTim
$runClass = $this->getRunClass();
/** @var Run $run */
$run = new $runClass();
$startDate = \DateTime::createFromFormat('U.u', $formattedStart = number_format($start, 6, '.', ''), new \DateTimeZone(date_default_timezone_get()));
if (false === $startDate) {
throw new \RuntimeException("Could not create date from $start formatted as $formattedStart");
}
$startDate = Util::getMicrotimeFloatDateTime($start);
$run->setLastHeartbeatAt($startDate);
$run->setStartedAt($startDate);
if (null !== $maxCount) {
Expand All @@ -135,10 +133,7 @@ public function runStart($start, $maxCount = null, $duration = null, $processTim
public function runStop(Run $run, $start)
{
$end = microtime(true);
$endedTime = \DateTime::createFromFormat('U.u', $end, new \DateTimeZone(date_default_timezone_get()));
if ($endedTime) {
$run->setEndedAt($endedTime);
}
$run->setEndedAt(Util::getMicrotimeFloatDateTime($end));
$run->setElapsed($end - $start);
$this->persistRun($run, 'remove');
}
Expand Down
4 changes: 2 additions & 2 deletions Manager/WorkerManager.php
Expand Up @@ -91,7 +91,7 @@ public function log($level, $msg, array $context = [])
* @param null $methodName
* @param bool $prioritize
*
* @return null|Job
* @return Job|null
*/
public function run($workerName = null, $methodName = null, $prioritize = true, $runId = null)
{
Expand Down Expand Up @@ -146,7 +146,7 @@ public function runJob(Job $job)
/** @var Worker $worker */
$worker = $this->getWorker($job->getWorkerName());
$this->log('debug', "Start: {$job->getClassName()}->{$job->getMethod()}", $job->getArgs());
$job->setStartedAt(Util::getMicrotimeDateTime());
$job->setStartedAt(Util::getMicrotimeFloatDateTime($start));
$job->setMessage(null);
$worker->setCurrentJob($job);
$result = call_user_func_array(array($worker, $job->getMethod()), $job->getArgs());
Expand Down
2 changes: 1 addition & 1 deletion Model/MicrotimeTrait.php
Expand Up @@ -10,7 +10,7 @@ public function setWhenAt(\DateTime $whenAt)
{
parent::setWhenAt($whenAt);

return $this->setWhenUs(Util::getMicrotimeDecimalFormat($whenAt));
return $this->setWhenUs(Util::getMicrotimeIntegerFormat($whenAt));
}

/**
Expand Down
8 changes: 5 additions & 3 deletions Model/Worker.php
Expand Up @@ -48,13 +48,15 @@ public function getJobManager()
public function at($time = null, $batch = false, $priority = null)
{
$timeU = $time;
$localeInfo = localeconv();
$decimalPoint = isset($localeInfo['decimal_point']) ? $localeInfo['decimal_point'] : '.';
if (null === $time) {
$timeU = Util::getMicrotimeStr();
} elseif (false === strpos(strval($time), '.')) {
$timeU = strval($time).'.000000';
} elseif (false === strpos(strval($time), $decimalPoint)) {
$timeU = strval($time).$decimalPoint.'000000';
}

$dateTime = \DateTime::createFromFormat('U.u', (string) $timeU, new \DateTimeZone(date_default_timezone_get()));
$dateTime = \DateTime::createFromFormat('U'.$decimalPoint.'u', (string) $timeU, new \DateTimeZone(date_default_timezone_get()));
if (!$dateTime) {
throw new \InvalidArgumentException("Invalid time: $time".($timeU != $time ? " - (micro: $timeU)" : ''));
}
Expand Down
10 changes: 5 additions & 5 deletions ORM/JobManager.php
Expand Up @@ -115,8 +115,8 @@ public function pruneArchivedJobs(\DateTime $olderThan)
{
return $this->removeOlderThan(
$this->getJobArchiveClass(),
'updatedAt',
$olderThan
'updatedAt',
$olderThan
);
}

Expand Down Expand Up @@ -255,7 +255,7 @@ public function getJobQueryBuilder($workerName = null, $methodName = null, $prio
protected function addStandardPredicates(QueryBuilder $queryBuilder, $status = BaseJob::STATUS_NEW)
{
$dateTime = Util::getMicrotimeDateTime();
$decimal = Util::getMicrotimeDecimalFormat($dateTime);
$microtimeInteger = Util::getMicrotimeIntegerFormat($dateTime);

$queryBuilder
->andWhere('j.status = :status')->setParameter('status', $status)
Expand All @@ -267,7 +267,7 @@ protected function addStandardPredicates(QueryBuilder $queryBuilder, $status = B
$queryBuilder->expr()->isNull('j.expiresAt'),
$queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
))
->setParameter('whenUs', $decimal)
->setParameter('whenUs', $microtimeInteger)
->setParameter('expiresAt', $dateTime);
}

Expand Down Expand Up @@ -321,7 +321,7 @@ protected function findRefresh($id)
*
* @param \Dtc\QueueBundle\Model\Job $job
*
* @return null|Job
* @return Job|null
*/
public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
{
Expand Down
10 changes: 5 additions & 5 deletions Redis/JobManager.php
Expand Up @@ -26,7 +26,7 @@ protected function transferQueues()
// Drains from WhenAt queue into Prioirty Queue
$whenQueue = $this->getWhenQueueCacheKey();
$priorityQueue = $this->getPriorityQueueCacheKey();
$microtime = Util::getMicrotimeDecimal();
$microtime = Util::getMicrotimeInteger();
while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) {
$jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
if (is_string($jobMessage)) {
Expand Down Expand Up @@ -85,7 +85,7 @@ protected function batchFoundJob(Job $job, $foundJobCacheKey, $foundJobMessage)
$foundWhen = $foundJob->getWhenUs();

// Fix this using bcmath
$curtimeU = Util::getMicrotimeDecimal();
$curtimeU = Util::getMicrotimeInteger();
$newFoundWhen = null;
if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) >= 1) {
$newFoundWhen = $when;
Expand Down Expand Up @@ -183,7 +183,7 @@ public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
// Add to whenAt or priority queue? /// optimizaiton...
$whenUs = $job->getWhenUs();
if (!$whenUs) {
$whenUs = Util::getMicrotimeDecimal();
$whenUs = Util::getMicrotimeInteger();
$job->setWhenUs($whenUs);
}

Expand Down Expand Up @@ -287,7 +287,7 @@ public function getJob($workerName = null, $methodName = null, $prioritize = tru
$jobId = $this->redis->zPop($queue);
} else {
$queue = $this->getWhenQueueCacheKey();
$microtime = Util::getMicrotimeDecimal();
$microtime = Util::getMicrotimeInteger();
$jobId = $this->redis->zPopByMaxScore($queue, $microtime);
}

Expand Down Expand Up @@ -315,7 +315,7 @@ protected function retrieveJob($jobId)

public function getWaitingJobCount($workerName = null, $methodName = null)
{
$microtime = Util::getMicrotimeDecimal();
$microtime = Util::getMicrotimeInteger();
$count = $this->redis->zCount($this->getWhenQueueCacheKey(), 0, $microtime);

if (null !== $this->maxPriority) {
Expand Down
21 changes: 21 additions & 0 deletions Resources/docker/php/ubuntu/Dockerfile
@@ -0,0 +1,21 @@
# For building PHP from source for debugging purposes (specifically used in troubleshooting Issue #98)
FROM ubuntu:19.10
ENV DEBIAN_FRONTEND=noninteractive
RUN apt update
RUN apt install -y apt-utils
RUN apt install -y perl-modules
RUN apt upgrade -y
RUN apt install -y libfreetype6-dev
RUN apt install -y git
RUN apt install -y vim
RUN apt install -y libzip-dev libldap2-dev libxml2-dev libpng-dev libicu-dev libbz2-dev libtidy-dev
RUN apt install -y libmemcached-dev
RUN apt install -y libssl-dev
RUN apt install -y build-essential
RUN apt install -y autoconf automake re2c libtool bison
RUN apt install -y curl wget
RUN apt install -y netcat
RUN git clone https://git.php.net/repository/php-src.git
RUN apt install -y sendmail gawk
RUN apt install -y libcurl4-openssl-dev zlibc libgd-dev libfreetype6 libjpeg9 libgdbm-dev libsodium-dev mysql-common mysql-client postgresql-11 libreadline5 libreadline-dev
RUN apt install -y libsqlite3-dev
10 changes: 5 additions & 5 deletions Run/Loop.php
Expand Up @@ -114,8 +114,8 @@ public function runJobById($start, $jobId)
/**
* @param float $start
* @param int $nanoSleep
* @param null|int $maxCount
* @param null|int $duration
* @param int|null $maxCount
* @param int|null $duration
*/
public function runLoop($start, $workerName, $methodName, $maxCount, $duration = null, $nanoSleep = 500000000)
{
Expand Down Expand Up @@ -146,8 +146,8 @@ public function runLoop($start, $workerName, $methodName, $maxCount, $duration =

/**
* @param int $nanoSleep
* @param null|int $maxCount
* @param null|int $duration
* @param int|null $maxCount
* @param int|null $duration
*
* @throws \InvalidArgumentException
*/
Expand Down Expand Up @@ -192,7 +192,7 @@ protected function validateNanoSleep($nanoSleep)
/**
* @param int|null $duration
*
* @return null|\DateTime
* @return \DateTime|null
*/
protected function getEndTime(Run $run, $duration)
{
Expand Down
6 changes: 3 additions & 3 deletions Tests/Doctrine/DoctrineJobManagerTest.php
Expand Up @@ -873,7 +873,7 @@ public function testBatchJobs()

$time1 = new \DateTime('@'.time());
$job2 = $worker->batchLater(0)->fibonacci(1);
$time2 = Util::getDateTimeFromDecimalFormat(Util::getMicrotimeDecimal());
$time2 = Util::getDateTimeFromDecimalFormat(Util::getMicrotimeInteger());

self::assertEquals($job1, $job2);
self::assertGreaterThanOrEqual($time1, $job2->getWhenAt());
Expand All @@ -890,9 +890,9 @@ public function testBatchJobs()

$job1 = $worker->later(100)->setPriority(3)->fibonacci(1);
$priority1 = $job1->getPriority();
$time1 = Util::getDateTimeFromDecimalFormat(Util::getMicrotimeDecimal());
$time1 = Util::getDateTimeFromDecimalFormat(Util::getMicrotimeInteger());
$job2 = $worker->batchLater(0)->setPriority(1)->fibonacci(1);
$time2 = Util::getDateTimeFromDecimalFormat(Util::getMicrotimeDecimal());
$time2 = Util::getDateTimeFromDecimalFormat(Util::getMicrotimeInteger());
self::assertEquals($job1, $job2);
self::assertNotEquals($priority1, $job2->getPriority());

Expand Down
8 changes: 4 additions & 4 deletions Tests/Manager/WorkerManagerTest.php
Expand Up @@ -63,7 +63,7 @@ public function testRun()
self::assertEquals(
BaseJob::STATUS_SUCCESS,
$job->getStatus(),
'Worker run should be successful'
'Worker run should be successful'
);
}

Expand All @@ -80,7 +80,7 @@ public function testErrorRun()
self::assertEquals(
BaseJob::STATUS_EXCEPTION,
$job->getStatus(),
'Worker run should be not successful'
'Worker run should be not successful'
);
self::assertNotEmpty($job->getMessage(), 'Error message should not be empty');
}
Expand All @@ -96,13 +96,13 @@ public function testRunJob()
self::assertEquals(
BaseJob::STATUS_SUCCESS,
$job->getStatus(),
'Worker run should be successful'
'Worker run should be successful'
);

self::assertEquals(
'20: 6765',
file_get_contents($this->worker->getFilename()),
'Result of fibonacciFile() must match'
'Result of fibonacciFile() must match'
);
}
}
8 changes: 4 additions & 4 deletions Tests/Model/WorkerTest.php
Expand Up @@ -140,15 +140,15 @@ protected function assertJob(Job $job, $time, $method, $priority = null)
self::assertEquals(
$time,
$job->getWhenAt()->getTimestamp(),
'Job start time should equals'
'Job start time should equals'
);
}

if (null !== $priority) {
self::assertEquals(
$priority,
$job->getPriority(),
'Priority should be the same.'
'Priority should be the same.'
);
} else {
self::assertNull($job->getPriority(), 'Priority should be null');
Expand All @@ -157,12 +157,12 @@ protected function assertJob(Job $job, $time, $method, $priority = null)
self::assertEquals(
$this->worker->getName(),
$job->getWorkerName(),
'Worker should be the same'
'Worker should be the same'
);
self::assertEquals(
$method,
$job->getMethod(),
'Worker method should be the same'
'Worker method should be the same'
);

// Make sure param gets saved
Expand Down
6 changes: 3 additions & 3 deletions Tests/Redis/JobManagerTest.php
Expand Up @@ -164,7 +164,7 @@ public function testBatchJobs()
$job1 = $worker->later(100)->fibonacci(1);
$time1 = new \DateTime('@'.time());
$job2 = $worker->batchLater(0)->fibonacci(1);
$time2 = Util::getDateTimeFromDecimalFormat(Util::getMicrotimeDecimal());
$time2 = Util::getDateTimeFromDecimalFormat(Util::getMicrotimeInteger());

self::assertEquals($job1->getId(), $job2->getId());
self::assertGreaterThanOrEqual($time1, $job2->getWhenAt());
Expand All @@ -184,9 +184,9 @@ public function testBatchJobs()
if (null !== self::$jobManager->getMaxPriority()) {
$job1 = $worker->later(100)->setPriority(3)->fibonacci(1);
$priority1 = $job1->getPriority();
$time1 = Util::getDateTimeFromDecimalFormat(Util::getMicrotimeDecimal());
$time1 = Util::getDateTimeFromDecimalFormat(Util::getMicrotimeInteger());
$job2 = $worker->batchLater(0)->setPriority(1)->fibonacci(1);
$time2 = Util::getDateTimeFromDecimalFormat(Util::getMicrotimeDecimal());
$time2 = Util::getDateTimeFromDecimalFormat(Util::getMicrotimeInteger());
self::assertEquals($job1->getId(), $job2->getId());
self::assertNotEquals($priority1, $job2->getPriority());

Expand Down
1 change: 0 additions & 1 deletion Tests/Run/LoopTest.php
Expand Up @@ -2,7 +2,6 @@

namespace Dtc\QueueBundle\Tests\Run;

use Dtc\QueueBundle\Beanstalkd\Job;
use Dtc\QueueBundle\Manager\WorkerManager;
use Dtc\QueueBundle\ODM\JobManager;
use Dtc\QueueBundle\Run\Loop;
Expand Down

0 comments on commit 32f3de9

Please sign in to comment.