Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ElasticPopulate: run elastic index call inside current DC synchronic … #8748

Merged
merged 2 commits into from Sep 5, 2019
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -170,7 +170,7 @@ max_file_size_downloadable_from_cdn_in_KB = 1887436
exec_sphinx = true

; Should be set to false in multiple data centers environments
exec_elastic = false
exec_elastic = true

; 24 hours - 60 X 60 X 24
user_login_set_password_hash_key_validity = 86400
@@ -7,16 +7,19 @@
require_once(ROOT_DIR . '/alpha/config/kConf.php');
// ------------------------------------------------------
class OldLogRecordsFilter {
private $logId;
function __construct($logId) {
$this->logId = $logId;
}
function filter($i) {
return $i > $this->logId;
}
class OldLogRecordsFilter
{
private $logId;
function __construct($logId)
{
$this->logId = $logId;
}
function filter($i)
{
return $i > $this->logId;
}
}
KAutoloader::addClassPath(KAutoloader::buildPath(KALTURA_ROOT_PATH, "vendor", "propel", "*"));
@@ -30,27 +33,31 @@ function filter($i) {
$hostname = (isset($_SERVER["HOSTNAME"]) ? $_SERVER["HOSTNAME"] : gethostname());
$configFile = ROOT_DIR . "/configurations/elastic/populate/$hostname.ini";
if(!file_exists($configFile))
if (!file_exists($configFile))
{
KalturaLog::err("Configuration file [$configFile] not found.");
exit(-1);
KalturaLog::err("Configuration file [$configFile] not found.");
exit(-1);
}
$config = parse_ini_file($configFile);
$elasticCluster = $config['elasticCluster'];
$elasticServer = $config['elasticServer'];
$elasticPort = (isset($config['elasticPort']) ? $config['elasticPort'] : 9200);
$processScriptUpdates = (isset($config['processScriptUpdates']) ? $config['processScriptUpdates'] : false);
$systemSettings = kConf::getMap('system');
if(!$systemSettings || !$systemSettings['LOG_DIR'])
if (!$systemSettings || !$systemSettings['LOG_DIR'])
{
KalturaLog::err("LOG_DIR not found in system configuration.");
exit(-1);
KalturaLog::err("LOG_DIR not found in system configuration.");
exit(-1);
}
$pid = $systemSettings['LOG_DIR'] . '/populate_elastic.pid';
if(file_exists($pid))
if (file_exists($pid))
{
KalturaLog::err("Scheduler already running - pid[" . file_get_contents($pid) . "]");
exit(1);
KalturaLog::err("Scheduler already running - pid[" . file_get_contents($pid) . "]");
exit(1);
}
file_put_contents($pid, getmypid());
@@ -60,6 +67,7 @@ function filter($i) {
$limit = 1000;
$gap = 500;
$maxIndexHistory = 2000; //The maximum array size to save unique object ids update and their elastic log id
$sphinxLogReadConn = myDbHelper::getConnection(myDbHelper::DB_HELPER_CONN_SPHINX_LOG_READ);
@@ -68,117 +76,138 @@ function filter($i) {
$lastLogs = array();
$handledRecords = array();
$objectIdElasticLog = array();
foreach($serverLastLogs as $serverLastLog) {
$lastLogs[$serverLastLog->getDc()] = $serverLastLog;
$handledRecords[$serverLastLog->getDc()] = array();
foreach ($serverLastLogs as $serverLastLog)
{
$lastLogs[$serverLastLog->getDc()] = $serverLastLog;
$handledRecords[$serverLastLog->getDc()] = array();
}
$elasticClient = new elasticClient($elasticServer, $elasticPort); //take the server and port from config - $elasticServer , $elasticPort
while(true)
while (true)
{
if(!elasticSearchUtils::isMaster($elasticClient, $hostname))
{
KalturaLog::log('elastic server ['.$hostname.'] is not the master , sleeping for 30 seconds');
sleep(30);
//update the last log ids
$serverLastLogs = SphinxLogServerPeer::retrieveByServer($elasticCluster, $sphinxLogReadConn);
foreach($serverLastLogs as $serverLastLog)
{
$lastLogs[$serverLastLog->getDc()] = $serverLastLog;
$handledRecords[$serverLastLog->getDc()] = array();
}
SphinxLogServerPeer::clearInstancePool();
continue;
}
$elasticLogs = SphinxLogPeer::retrieveByLastId($lastLogs, $gap, $limit, $handledRecords, $sphinxLogReadConn, SphinxLogType::ELASTIC);
while(!count($elasticLogs))
{
sleep(1);
$elasticLogs = SphinxLogPeer::retrieveByLastId($lastLogs, $gap, $limit, $handledRecords, $sphinxLogReadConn, SphinxLogType::ELASTIC);
}
$ping = $elasticClient->ping();
if(!$ping)
{
KalturaLog::err('cannot connect to elastic cluster with client['.print_r($elasticClient, true).']');
sleep(5);
continue;
}
foreach($elasticLogs as $elasticLog)
{
/* @var $elasticLog SphinxLog */
$dc = $elasticLog->getDc();
$executedServerId = $elasticLog->getExecutedServerId();
$elasticLogId = $elasticLog->getId();
$serverLastLog = null;
if(isset($lastLogs[$dc])) {
$serverLastLog = $lastLogs[$dc];
} else {
$serverLastLog = new SphinxLogServer();
$serverLastLog->setServer($elasticCluster);
$serverLastLog->setDc($dc);
$lastLogs[$dc] = $serverLastLog;
}
$handledRecords[$dc][] = $elasticLogId;
KalturaLog::log("Elastic log id $elasticLogId dc [$dc] executed server id [$executedServerId] Memory: [" . memory_get_usage() . "]");
try
{
if ($skipExecutedUpdates && $executedServerId == $serverLastLog->getId())
{
KalturaLog::log ("Elastic server is initiated and the command already ran synchronously on this machine. Skipping");
}
else
{
//we save the elastic command as serialized object in the sql field
$command = $elasticLog->getSql();
$command = unserialize($command);
$index = $command['index'];
$action = $command['action'];
if ($action && ($processScriptUpdates || !($index == ElasticIndexMap::ELASTIC_ENTRY_INDEX && $action == ElasticMethodType::UPDATE)))
{
$response = $elasticClient->$action($command);
}
}
// If the record is an historical record, don't take back the last log id
if($serverLastLog->getLastLogId() < $elasticLogId) {
$serverLastLog->setLastLogId($elasticLogId);
// Clear $handledRecords from before last - gap.
foreach($serverLastLogs as $serverLastLog) {
$dc = $serverLastLog->getDc();
$threshold = $serverLastLog->getLastLogId() - $gap;
$handledRecords[$dc] = array_filter($handledRecords[$dc], array(new OldLogRecordsFilter($threshold), 'filter'));
}
}
}
catch(Exception $e)
{
KalturaLog::err($e->getMessage());
}
}
foreach ($lastLogs as $serverLastLog)
{
$serverLastLog->save(myDbHelper::getConnection(myDbHelper::DB_HELPER_CONN_SPHINX_LOG));
}
SphinxLogPeer::clearInstancePool();
kMemoryManager::clearMemory();
if (!elasticSearchUtils::isMaster($elasticClient, $hostname))
{
KalturaLog::log('elastic server [' . $hostname . '] is not the master , sleeping for 30 seconds');
sleep(30);
//update the last log ids
$serverLastLogs = SphinxLogServerPeer::retrieveByServer($elasticCluster, $sphinxLogReadConn);
foreach ($serverLastLogs as $serverLastLog)
{
$lastLogs[$serverLastLog->getDc()] = $serverLastLog;
$handledRecords[$serverLastLog->getDc()] = array();
}
SphinxLogServerPeer::clearInstancePool();
continue;
}
$elasticLogs = SphinxLogPeer::retrieveByLastId($lastLogs, $gap, $limit, $handledRecords, $sphinxLogReadConn, SphinxLogType::ELASTIC);
while (!count($elasticLogs))
{
$skipExecutedUpdates = true;
sleep(1);
$elasticLogs = SphinxLogPeer::retrieveByLastId($lastLogs, $gap, $limit, $handledRecords, $sphinxLogReadConn, SphinxLogType::ELASTIC);
}
$ping = $elasticClient->ping();
if (!$ping)
{
KalturaLog::err('cannot connect to elastic cluster with client[' . print_r($elasticClient, true) . ']');
sleep(5);
continue;
}
foreach ($elasticLogs as $elasticLog)
{
/* @var $elasticLog SphinxLog */
$dc = $elasticLog->getDc();
$executedServerId = $elasticLog->getExecutedServerId();
$elasticLogId = $elasticLog->getId();
$serverLastLog = null;
if (isset($lastLogs[$dc]))
{
$serverLastLog = $lastLogs[$dc];
}
else
{
$serverLastLog = new SphinxLogServer();
$serverLastLog->setServer($elasticCluster);
$serverLastLog->setDc($dc);
$lastLogs[$dc] = $serverLastLog;
}
$handledRecords[$dc][] = $elasticLogId;
KalturaLog::log("Elastic log id $elasticLogId dc [$dc] executed server id [$executedServerId] Memory: [" . memory_get_usage() . "]");
try
{
$objectId = $elasticLog->getObjectId();
if ($skipExecutedUpdates && $executedServerId == $serverLastLog->getId())
{
KalturaLog::log("Elastic server is initiated and the command already ran synchronously on this machine. Skipping");
}
elseif (isset($objectIdElasticLog[$objectId]) && $objectIdElasticLog[$objectId] > $elasticLogId) {
KalturaLog::log("Found newer update for the same object id, skipping [$objectId] [$elasticLogId] [{$objectIdElasticLog[$objectId]}]");
}
else
{
//we save the elastic command as serialized object in the sql field
$command = $elasticLog->getSql();
$command = unserialize($command);
$index = $command['index'];
$action = $command['action'];
if ($action && ($processScriptUpdates || !($index == ElasticIndexMap::ELASTIC_ENTRY_INDEX && $action == ElasticMethodType::UPDATE)))
{
$response = $elasticClient->$action($command);
}
unset($objectIdElasticLog[$objectId]);
if (count($objectIdElasticLog) > $maxIndexHistory)
{
reset($objectIdElasticLog);
$oldestElementKey = key($objectIdElasticLog);
unset($objectIdElasticLog[$oldestElementKey]);
}
$objectIdElasticLog[$objectId] = $elasticLogId;
}
// If the record is an historical record, don't take back the last log id
if ($serverLastLog->getLastLogId() < $elasticLogId)
{
$serverLastLog->setLastLogId($elasticLogId);
// Clear $handledRecords from before last - gap.
foreach ($serverLastLogs as $serverLastLog)
{
$dc = $serverLastLog->getDc();
$threshold = $serverLastLog->getLastLogId() - $gap;
$handledRecords[$dc] = array_filter($handledRecords[$dc], array(new OldLogRecordsFilter($threshold), 'filter'));
}
}
}
catch (Exception $e)
{
KalturaLog::err($e->getMessage());
}
}
foreach ($lastLogs as $serverLastLog)
{
$serverLastLog->save(myDbHelper::getConnection(myDbHelper::DB_HELPER_CONN_SPHINX_LOG));
}
SphinxLogPeer::clearInstancePool();
kMemoryManager::clearMemory();
}
KalturaLog::log('Done');
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.