Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 37 additions & 25 deletions src/Transport/Http.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ class Http
/**
* @var string
*/
private $username = null;
private $_username = null;

/**
* @var string
*/
private $password = null;
private $_password = null;

/**
* @var string
*/
private $host = '';
private $_host = '';

/**
* @var int
*/
private $port = 0;
private $_port = 0;

/**
* @var bool
Expand All @@ -43,7 +43,7 @@ class Http
/**
* @var CurlerRolling
*/
private $curler = false;
private $_curler = false;

/**
* @var Settings
Expand All @@ -62,12 +62,24 @@ public function __construct($host, $port, $username, $password)
{
$this->setHost($host, $port);

$this->username = $username;
$this->password = $password;
$this->_username = $username;
$this->_password = $password;
$this->_settings = new Settings($this);

$this->curler = new CurlerRolling();
$this->curler->setSimultaneousLimit(10);
$this->setCurler();
}


public function setCurler() {
$this->_curler = new CurlerRolling();
$this->_curler->setSimultaneousLimit(10);
}

/**
* @return CurlerRolling
*/
public function getCurler() {
return $this->_curler;
}

/**
Expand All @@ -77,18 +89,18 @@ public function __construct($host, $port, $username, $password)
public function setHost($host, $port = -1)
{
if ($port > 0) {
$this->port = $port;
$this->_port = $port;
}

$this->host = $host;
$this->_host = $host;
}

/**
* @return string
*/
public function getUri()
{
return 'http://' . $this->host . ':' . $this->port;
return 'http://' . $this->_host . ':' . $this->_port;
}

/**
Expand All @@ -101,16 +113,16 @@ public function checkServerReplicas($list_hosts, $time_out)
// @todo add WHERE database=XXXX

$query['query'] = 'SELECT * FROM system.replicas FORMAT JSON';
$query['user'] = $this->username;
$query['password'] = $this->password;
$query['user'] = $this->_username;
$query['password'] = $this->_password;

$resultGoodHost = [];
$resultBadHost = [];

$statements = [];
foreach ($list_hosts as $host) {
$request = new Request();
$url = 'http://' . $host . ":" . $this->port . '?' . http_build_query($query);
$url = 'http://' . $host . ":" . $this->_port . '?' . http_build_query($query);

$request->url($url)
->GET()
Expand All @@ -119,11 +131,11 @@ public function checkServerReplicas($list_hosts, $time_out)
->connectTimeOut($time_out)
->setDnsCache(0);

$this->curler->addQueLoop($request);
$this->_curler->addQueLoop($request);
$statements[$host] = new \ClickHouseDB\Statement($request);
}

$this->curler->execLoopWait();
$this->_curler->execLoopWait();

foreach ($statements as $host => $statement) {
if ($statement->isError()) {
Expand Down Expand Up @@ -162,7 +174,7 @@ public function checkServerReplicas($list_hosts, $time_out)
*/
public function getHostIPs()
{
return gethostbynamel($this->host);
return gethostbynamel($this->_host);
}

/**
Expand Down Expand Up @@ -205,7 +217,7 @@ private function getUrl($params = [])
private function newRequest($extendinfo)
{
$new = new \Curler\Request();
$new->auth($this->username, $this->password)
$new->auth($this->_username, $this->_password)
->POST()
->setRequestExtendedInfo($extendinfo);

Expand Down Expand Up @@ -281,7 +293,7 @@ public function writeAsyncCSV($sql, $file_name)
});

$request->setInfile($file_name);
$this->curler->addQueLoop($request);
$this->_curler->addQueLoop($request);

return new Statement($request);
}
Expand All @@ -291,7 +303,7 @@ public function writeAsyncCSV($sql, $file_name)
*/
public function getCountPendingQueue()
{
return $this->curler->countPending();
return $this->_curler->countPending();
}

/**
Expand Down Expand Up @@ -366,7 +378,7 @@ private function prepareWrite($sql, $bindings)
*/
public function executeAsync()
{
return $this->curler->execLoopWait();
return $this->_curler->execLoopWait();
}

/**
Expand All @@ -378,7 +390,7 @@ public function executeAsync()
public function select($sql, array $bindings = [], $whereInFile = null)
{
$request = $this->prepareSelect($sql, $bindings, $whereInFile);
$code = $this->curler->execOne($request);
$code = $this->_curler->execOne($request);

return new Statement($request);
}
Expand All @@ -392,7 +404,7 @@ public function select($sql, array $bindings = [], $whereInFile = null)
public function selectAsync($sql, array $bindings = [], $whereInFile = null)
{
$request = $this->prepareSelect($sql, $bindings, $whereInFile);
$this->curler->addQueLoop($request);
$this->_curler->addQueLoop($request);
return new Statement($request);
}

Expand All @@ -405,7 +417,7 @@ public function selectAsync($sql, array $bindings = [], $whereInFile = null)
public function write($sql, array $bindings = [], $exception = true)
{
$request = $this->prepareWrite($sql, $bindings);
$code = $this->curler->execOne($request);
$code = $this->_curler->execOne($request);
$response = new Statement($request);

if ($exception) {
Expand Down