Skip to content

Commit

Permalink
Added directive max-connections-per-server to MongoClient and Memcach…
Browse files Browse the repository at this point in the history
…eClient.
  • Loading branch information
kakserpom committed Feb 5, 2011
1 parent acf533a commit 44f54dc
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 125 deletions.
27 changes: 16 additions & 11 deletions app-clients/MemcacheClient.php
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ protected function getConfigDefaults() {
// default port
'port' => 11211,
// @todo add description
'prefix' => ''
'prefix' => '',
'maxconnectionsperserver' => 32,
);
}

Expand Down Expand Up @@ -98,6 +99,7 @@ public function set($key, $value, $exp = 0, $onResponse = NULL) {

if ($onResponse !== NULL) {
$sess->onResponse[] = $onResponse;
unset($this->servConnFree[$sess->addr][$connId]);
}

$flags = 0;
Expand Down Expand Up @@ -254,22 +256,19 @@ public function stats($onResponse, $server = NULL) {
*/
public function getConnection($addr) {
if (isset($this->servConn[$addr])) {
foreach ($this->servConn[$addr] as $k => &$c) {
if (!isset($this->sessions[$c])) {
unset($this->servConn[$addr][$k]);
continue;
}

if (
(!$this->sessions[$c]->finished)
&& (!sizeof($this->sessions[$c]->onResponse))
) {
while (($c = array_pop($this->servConnFree[$addr])) !== null) {
if (isset($this->sessions[$c])) {
return $c;
}
}
if (sizeof($this->servConn[$addr]) >= $this->config->maxconnectionsperserver->value) {
return $this->servConn[$addr][array_rand($this->servConn[$addr])];
}
} else {
$this->servConn[$addr] = array();
$this->servConnFree[$addr] = array();
}


$e = explode(':', $addr);

Expand All @@ -278,6 +277,7 @@ public function getConnection($addr) {
$this->sessions[$connId] = new MemcacheSession($connId, $this);
$this->sessions[$connId]->addr = $addr;
$this->servConn[$addr][$connId] = $connId;
$this->servConnFree[$addr][$connId] = $connId;

return $connId;
}
Expand Down Expand Up @@ -376,6 +376,7 @@ class MemcacheSession extends SocketSession {
public $valueSize = 0; // size of received part of the value
public $error; // error message
public $key; // current incoming key
public $addr; // address of server

/**
* Called when new data received
Expand Down Expand Up @@ -420,6 +421,9 @@ public function stdin($buf) {
}

$f = array_shift($this->onResponse);
if ((!$this->finished) && (!sizeof($this->onResponse))) {
$this->appInstance->servConnFree[$this->addr][$this->connId] = $this->connId;
}

if ($f) {
call_user_func($f, $this);
Expand Down Expand Up @@ -463,6 +467,7 @@ public function onFinish() {
$this->finished = TRUE;

unset($this->appInstance->servConn[$this->addr][$this->connId]);
unset($this->appInstance->servConnFree[$this->addr][$this->connId]);
unset($this->appInstance->sessions[$this->connId]);
}

Expand Down
30 changes: 14 additions & 16 deletions app-clients/MongoClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class MongoClient extends AsyncServer {
public $sessions = array(); // Active sessions
public $servers = array(); // Array of servers
public $servConn = array(); // Active connections
public $servConnFree = array(); // Free connections
public $requests = array(); // Pending requests
public $cursors = array(); // Active cursors
public $lastReqId = 0; // ID of the last request
Expand All @@ -29,7 +30,6 @@ class MongoClient extends AsyncServer {
const OP_KILL_CURSORS = 2007;

/**/
public $dtags_enabled = false; // enables tags for distibution
public $cache; // object of MemcacheClient

/**
Expand All @@ -42,7 +42,9 @@ protected function getConfigDefaults() {
// default server list
'servers' => 'mongo://127.0.0.1',
// default port
'port' => 27017
'port' => 27017,

'maxconnectionsperserver' => 32,
);
}

Expand Down Expand Up @@ -120,6 +122,7 @@ public function request($key, $opcode, $data, $reply = false) {

if ($reply) {
$sess->busy = true;
unset($this->servConnFree[$sess->url][$connId]);
}

return $this->lastReqId;
Expand Down Expand Up @@ -931,16 +934,17 @@ public function __get($name) {
*/
public function getConnection($url) {
if (isset($this->servConn[$url])) {
foreach ($this->servConn[$url] as &$c) {
if (
isset($this->sessions[$c])
&& !$this->sessions[$c]->busy
) {
while (($c = array_pop($this->servConnFree[$url])) !== null) {
if (isset($this->sessions[$c])) {
return $c;
}
}
if (sizeof($this->servConn[$url]) >= $this->config->maxconnectionsperserver->value) {
return $this->servConn[$url][array_rand($this->servConn[$url])];
}
} else {
$this->servConn[$url] = array();
$this->servConnFree[$url] = array();
}

$u = parse_url($url);
Expand All @@ -966,6 +970,7 @@ public function getConnection($url) {
}

$this->servConn[$url][$connId] = $connId;
$this->servConnFree[$url][$connId] = $connId;

if ($session->user !== NULL) {
$this->getNonce(array(
Expand Down Expand Up @@ -998,15 +1003,6 @@ function($result) use ($session) {
* @return integer Connection's ID.
*/
public function getConnectionByKey($key) {
if (
($this->dtags_enabled)
&& (($sp = strpos($name, '[')) !== false)
&& (($ep = strpos($name, ']')) !== false)
&& ($ep > $sp)
) {
$key = substr($key, $sp+1, $ep-$sp-1);
}

srand(crc32($key));
$url = array_rand($this->servers);
srand();
Expand Down Expand Up @@ -1109,6 +1105,7 @@ public function stdin($buf) {
}

$this->busy = false;
$this->servConnFree[$this->url][$this->connId] = $this->connId;

if (
isset($this->appInstance->requests[$id][2])
Expand Down Expand Up @@ -1147,6 +1144,7 @@ public function onFinish() {
$this->finished = true;

unset($this->servConn[$this->url][$this->connId]);
unset($this->servConnFree[$this->url][$this->connId]);
unset($this->appInstance->sessions[$this->connId]);
}

Expand Down
98 changes: 0 additions & 98 deletions app-examples/ExampleWithMongoDB.php

This file was deleted.

0 comments on commit 44f54dc

Please sign in to comment.