Permalink
Browse files

Fix SqlMessageExchanger : make $channel property static as it is inst…

…anciated many times in the many registries. Filter message array to reduce duplication.
  • Loading branch information...
1 parent d333730 commit b8449cbd772e08339c435a6b252f733dcf9746be @cdujeu cdujeu committed Sep 30, 2016
Showing with 66 additions and 34 deletions.
  1. +66 −34 core/src/plugins/mq.sql/SqlMessageExchanger.php
@@ -46,9 +46,9 @@ class SqlMessageExchanger extends Plugin implements IMessageExchanger
*/
public function init(ContextInterface $ctx, $options = [])
{
- parent::init($ctx, $options);
- $this->sqlDriver = $this->sqlDriver = OptionsHelper::cleanDibiDriverParameters($options["SQL_DRIVER"]);
- }
+ parent::init($ctx, $options);
+ $this->sqlDriver = OptionsHelper::cleanDibiDriverParameters($options["SQL_DRIVER"]);
+ }
public function performChecks()
{
@@ -63,7 +63,7 @@ public function performChecks()
/**
* @var array
*/
- private $channels;
+ private static $channels;
private $clientsGCTime = 10;
private $sqlDriver;
@@ -74,30 +74,30 @@ public function performChecks()
*/
public function loadChannel($channelName, $create = false)
{
- if (isSet($this->channels) && is_array($this->channels[$channelName])) {
+ if (isSet(self::$channels) && is_array(self::$channels[$channelName])) {
return;
}
if (is_file($this->getPluginWorkDir()."/queues/channel-$channelName")) {
- if(!isset($this->channels)) $this->channels = array();
+ if(!isset(self::$channels)) self::$channels = array();
$data = FileHelper::loadSerialFile($this->getPluginWorkDir() . "/queues/channel-$channelName");
if (is_array($data)) {
if(!is_array($data["MESSAGES"])) $data["MESSAGES"] = array();
if(!is_array($data["CLIENTS"])) $data["CLIENTS"] = array();
- $this->channels[$channelName] = $data;
+ self::$channels[$channelName] = $data;
return;
}
}
if ($create) {
- if(!isSet($this->channels)) $this->channels = array();
- $this->channels[$channelName] = array("CLIENTS" => array(),
+ if(!isSet(self::$channels)) self::$channels = array();
+ self::$channels[$channelName] = array("CLIENTS" => array(),
"MESSAGES" => array());
}
}
public function __destruct()
{
- if (isSet($this->channels) && is_array($this->channels)) {
- foreach ($this->channels as $channelName => $data) {
+ if (isSet(self::$channels) && is_array(self::$channels)) {
+ foreach (self::$channels as $channelName => $data) {
if (is_array($data)) {
FileHelper::saveSerialFile($this->getPluginWorkDir() . "/queues/channel-$channelName", $data);
}
@@ -128,13 +128,16 @@ public function suscribeToChannel(ContextInterface $ctx, $channelName, $clientId
$USER_ID = "shared";
}
if($GROUP_PATH == null) $GROUP_PATH = false;
- $this->channels[$channelName]["CLIENTS"][$clientId] = array(
+ self::$channels[$channelName]["CLIENTS"][$clientId] = array(
"ALIVE" => time(),
"USER_ID" => $USER_ID,
"GROUP_PATH" => $GROUP_PATH
);
+ if(strpos($channelName, "nodes:") === 0 && $channelName !== "nodes:*"){
+ $this->suscribeToChannel($ctx, "nodes:*", $clientId);
+ }
/*
- foreach ($this->channels[$channelName]["MESSAGES"] as &$object) {
+ foreach (self::$channels[$channelName]["MESSAGES"] as &$object) {
$object->messageRC[$clientId] = $clientId;
}
*/
@@ -149,13 +152,13 @@ public function suscribeToChannel(ContextInterface $ctx, $channelName, $clientId
public function unsuscribeFromChannel(ContextInterface $ctx, $channelName, $clientId)
{
$this->loadChannel($channelName);
- if(!isSet($this->channels) || !isSet($this->channels[$channelName])) return;
- if(!array_key_exists($clientId, $this->channels[$channelName]["CLIENTS"])) return;
- unset($this->channels[$channelName]["CLIENTS"][$clientId]);
- foreach ($this->channels[$channelName]["MESSAGES"] as $index => &$object) {
+ if(!isSet(self::$channels) || !isSet(self::$channels[$channelName])) return;
+ if(!array_key_exists($clientId, self::$channels[$channelName]["CLIENTS"])) return;
+ unset(self::$channels[$channelName]["CLIENTS"][$clientId]);
+ foreach (self::$channels[$channelName]["MESSAGES"] as $index => &$object) {
unset($object->messageRC[$clientId]);
if (count($object->messageRC)== 0) {
- unset($this->channels[$channelName]["MESSAGES"][$index]);
+ unset(self::$channels[$channelName]["MESSAGES"][$index]);
}
}
}
@@ -168,12 +171,12 @@ public function unsuscribeFromChannel(ContextInterface $ctx, $channelName, $clie
public function publishToChannel(ContextInterface $ctx, $channelName, $messageObject)
{
$this->loadChannel($channelName);
- if(!isSet($this->channels) || !isSet($this->channels[$channelName])) return;
- if(!count($this->channels[$channelName]["CLIENTS"])) return;
- $clientIds = array_keys($this->channels[$channelName]["CLIENTS"]);
+ if(!isSet(self::$channels) || !isSet(self::$channels[$channelName])) return;
+ if(!count(self::$channels[$channelName]["CLIENTS"])) return;
+ $clientIds = array_keys(self::$channels[$channelName]["CLIENTS"]);
$messageObject->messageRC = array_combine($clientIds, $clientIds);
$messageObject->messageTS = microtime();
- $this->channels[$channelName]["MESSAGES"][] = $messageObject;
+ self::$channels[$channelName]["MESSAGES"][] = $messageObject;
}
/**
@@ -188,13 +191,13 @@ public function publishToChannel(ContextInterface $ctx, $channelName, $messageOb
public function consumeInstantChannel(ContextInterface $ctx, $channelName, $clientId, $userId, $userGroup)
{
$this->loadChannel($channelName);
- if(!isSet($this->channels) || !isSet($this->channels[$channelName])) {
+ if(!isSet(self::$channels) || !isSet(self::$channels[$channelName])) {
return null;
}
// Check dead clients
- if (is_array($this->channels[$channelName]["CLIENTS"])) {
+ if (is_array(self::$channels[$channelName]["CLIENTS"])) {
$toRemove = array();
- foreach ($this->channels[$channelName]["CLIENTS"] as $cId => $cData) {
+ foreach (self::$channels[$channelName]["CLIENTS"] as $cId => $cData) {
$cAlive = $cData["ALIVE"];
if( $cId != $clientId && time() - $cAlive > $this->clientsGCTime * 60) $toRemove[] = $cId;
}
@@ -204,14 +207,14 @@ public function consumeInstantChannel(ContextInterface $ctx, $channelName, $clie
}
}
}
- if (!array_key_exists($clientId, $this->channels[$channelName]["CLIENTS"])) {
+ if (!array_key_exists($clientId, self::$channels[$channelName]["CLIENTS"])) {
// Auto Suscribe
$this->suscribeToChannel($ctx, $channelName, $clientId);
}
- $this->channels[$channelName]["CLIENTS"][$clientId]["ALIVE"] = time();
+ self::$channels[$channelName]["CLIENTS"][$clientId]["ALIVE"] = time();
$result = array();
- foreach ($this->channels[$channelName]["MESSAGES"] as $index => $object) {
+ foreach (self::$channels[$channelName]["MESSAGES"] as $index => $object) {
if (!isSet($object->messageRC[$clientId])) {
continue;
}
@@ -226,11 +229,14 @@ public function consumeInstantChannel(ContextInterface $ctx, $channelName, $clie
$result[] = $object;
unset($object->messageRC[$clientId]);
if (count($object->messageRC) <= 0) {
- unset($this->channels[$channelName]["MESSAGES"][$index]);
+ unset(self::$channels[$channelName]["MESSAGES"][$index]);
} else {
- $this->channels[$channelName]["MESSAGES"][$index] = $object;
+ self::$channels[$channelName]["MESSAGES"][$index] = $object;
}
}
+ if(strpos($channelName, "nodes:")===0 && $channelName !== "nodes:*"){
+ $result = $result + $this->consumeInstantChannel($ctx, "nodes:*", $clientId, $userId, $userGroup);
+ }
return $result;
}
@@ -297,12 +303,38 @@ public function publishWorkerMessage(ContextInterface $ctx, $channel, $message)
public function publishInstantMessage(ContextInterface $ctx, $channel, $message)
{
$this->loadChannel($channel);
- if(!isSet($this->channels) || !isSet($this->channels[$channel])) return;
- if(!count($this->channels[$channel]["CLIENTS"])) return;
- $clientIds = array_keys($this->channels[$channel]["CLIENTS"]);
+ if(!isSet(self::$channels) || !isSet(self::$channels[$channel])) return;
+ if(!count(self::$channels[$channel]["CLIENTS"])) return;
+ $clientIds = array_keys(self::$channels[$channel]["CLIENTS"]);
+
+ $compareStringNoRC = json_encode($message);
$message->messageRC = array_combine($clientIds, $clientIds);
+ // Remove similar messages
+ $compareString = json_encode($message);
+
+ $clean = array_values(array_filter(self::$channels[$channel]["MESSAGES"], function($stdClass) use ($compareString, $compareStringNoRC, $message){
+ $classCopy = new \stdClass();
+ $classCopyNoRC = new \stdClass();
+ $vars = get_object_vars($stdClass);
+ foreach($vars as $key => $var){
+ if($key === "messageTS") continue;
+ $classCopy->$key = $var;
+ if($key !== "messageRC") {
+ $classCopyNoRC->$key = $var;
+ }
+ }
+ if (json_encode($classCopy) === $compareString){
+ return false;
+ }
+ if(json_encode($classCopyNoRC) === $compareStringNoRC && empty(array_diff($stdClass->messageRC, $message->messageRC))){
+ // Existing copy has same content, and less RC than new one.
+ return false;
+ }
+ return true;
+ }));
$message->messageTS = microtime();
- $this->channels[$channel]["MESSAGES"][] = $message;
+ $clean[] = $message;
+ self::$channels[$channel]["MESSAGES"] = $clean;
}
}

0 comments on commit b8449cb

Please sign in to comment.