-
Notifications
You must be signed in to change notification settings - Fork 25
/
Cassandra.php
executable file
·3765 lines (3298 loc) · 94.2 KB
/
Cassandra.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
<?php
/**
* Cassandra-PHP-Client Library (CPCL).
*
* Cassandra PHP-based client library for managing and querying your Cassandra
* cluster. It's a high-level library performing all the rather complex
* low-level lifting and providing a simple to learn and use interface.
*
* Includes ideas and code snippets from PHPCassa project.
*
* Copyright (C) 2011 by Priit Kallas <kallaspriit@gmail.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
* @author Priit Kallas <kallaspriit@gmail.com>
* @package Cassandra
* @version 1.0
*/
// set the globals that the thrift library uses
$GLOBALS['THRIFT_ROOT'] = dirname(__FILE__) . '/thrift';
define('THRIFT_PATH', $GLOBALS['THRIFT_ROOT']);
// require thrift packages
require_once $GLOBALS['THRIFT_ROOT'].'/packages/cassandra/Cassandra.php';
require_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php';
require_once $GLOBALS['THRIFT_ROOT'].'/protocol/TBinaryProtocol.php';
/**
* Represents a connection to a single Cassandra node.
*
* Provides direct access to the low-level Cassandra client.
*/
class CassandraConnection {
/**
* Hostname or IP of the node.
*
* @var string
*/
protected $host;
/**
* Port of the instance, defaults to 9160.
*
* @var integer
*/
protected $port;
/**
* Should framed or buffered transport be used.
*
* @var boolean
*/
protected $useFramedTransport;
/**
* Timeout of send operations in milliseconds.
*
* @var integer
*/
protected $sendTimeoutMs;
/**
* Timeout of receive operations in milliseconds.
*
* @var integer
*/
protected $receiveTimeoutMs;
/**
* Socket to the node.
*
* @var TSocket
*/
protected $socket;
/**
* Transport method.
*
* @var TTransport
*/
protected $transport;
/**
* Used communication protocol.
*
* @var TBinaryProtocolAccelerated
*/
protected $protocol;
/**
* The low-level cassandra client.
*
* @var cassandra_CassandraClient
*/
protected $client;
/**
* Is the connection currently open.
*
* @var boolean
*/
protected $isOpen;
/**
* Constructs the connection, setting access parameters.
*
* @param string $host Hostname or IP of the node
* @param integer $port Port of the instance
* @param boolean $useFramedTransport Use framed or buffered transport
* @param integer $sendTimeoutMs Timeout of send operations in milliseconds
* @param integer $receiveTimeoutMs Timeout of receive operations
*/
public function __construct(
$host = '127.0.0.1',
$port = 9160,
$useFramedTransport = true,
$sendTimeoutMs = null,
$receiveTimeoutMs = null
) {
$this->host = $host;
$this->port = $port;
$this->useFramedTransport = $useFramedTransport;
$this->sendTimeoutMs = $sendTimeoutMs;
$this->receiveTimeoutMs = $receiveTimeoutMs;
$this->isOpen = false;
$this->socket = $this->createSocket(
$host,
$port,
$sendTimeoutMs,
$receiveTimeoutMs
);
if ($useFramedTransport) {
$this->transport = $this->createFramedTransport($this->socket);
} else {
$this->transport = $this->createBufferedTransport($this->socket);
}
$this->transport->open();
$this->isOpen = true;
$this->protocol = new TBinaryProtocolAccelerated($this->transport);
$this->client = new cassandra_CassandraClient($this->protocol);
}
/**
* Closes the connection on destruction.
*/
public function __destruct() {
$this->close();
}
/**
* Forces the connection to close.
*
* Generally there's no need to call it yourself as it will be closed on
* termination.
*/
public function close() {
if ($this->isOpen) {
$this->transport->flush();
$this->transport->close();
$this->isOpen = false;
}
}
/**
* Is the connection open.
*
* @return boolean
*/
public function isOpen() {
return $this->isOpen;
}
/**
* Returns the low-level Cassandra client used by the wrapper.
*
* @return cassandra_CassandraClient
*/
public function getClient() {
if (!$this->isOpen) {
throw new CassandraConnectionClosedException(
'The connection has been closed'
);
}
return $this->client;
}
/**
* Returns the used transport method.
*
* @return TTransport
*/
public function getTransport() {
return $this->transport;
}
/**
* Returns the used transportation protocol.
*
* @return TBinaryProtocolAccelerated
*/
public function getProtocol() {
return $this->transport;
}
/**
* Attempts to start using given keyspace.
*
* Using the keyspace is attempted three times before giving up.
*
* @param string $keyspace Name of the keyspace
* @param string $username Optional username in case authentication is used
* @param string $password Optional password
*/
public function useKeyspace(
$keyspace,
$username = null,
$password = null
) {
$attempts = 3;
$success = false;
while($attempts-- > 0 && !$success) {
try {
$this->client->set_keyspace($keyspace);
$success = true;
} catch (cassandra_InvalidRequestException $e) {
continue;
}
}
if (!$success) {
throw new CassandraSettingKeyspaceFailedException(
'Using keyspace "'.$keyspace.'" failed after several attempts'
);
}
if ($username !== null) {
$request = new cassandra_AuthenticationRequest(
array('credentials' => array($username, $password))
);
$this->client->login($request);
}
}
/**
* Creates the socket to use.
*
* @param string $host Hostname/IP
* @param integer $port Port number
* @param integer $sendTimeoutMs Send operations timeout
* @param integer $receiveTimeoutMs Receive operations timeout
* @return TSocket Initiated socket connection
*/
protected function createSocket(
$host,
$port,
$sendTimeoutMs,
$receiveTimeoutMs
) {
$socket = new TSocket($host, $port);
if ($sendTimeoutMs !== null) {
$socket->setSendTimeout($sendTimeoutMs);
}
if ($receiveTimeoutMs !== null) {
$socket->setRecvTimeout($receiveTimeoutMs);
}
return $socket;
}
/**
* Creates framed transport.
*
* @param TSocket $socket Socket to base the transport on
* @return TFramedTransport Instance of the transport
*/
protected function createFramedTransport(TSocket $socket) {
require_once THRIFT_PATH.'/transport/TFramedTransport.php';
return new TFramedTransport($socket, true, true);
}
/**
* Creates buffered transport.
*
* @param TSocket $socket Socket to base the transport on
* @return TBufferedTransport Instance of the transport
*/
protected function createBufferedTransport(TSocket $socket) {
require_once THRIFT_PATH.'/transport/TBufferedTransport.php';
return new TBufferedTransport($socket, 512, 512);
}
}
/**
* A cluster is a collection of servers and connections to them.
*
* Provides handling the pool of connections.
*/
class CassandraCluster {
/**
* Currently used keyspace name.
*
* @var string
*/
protected $keyspace;
/**
* Currently used username if using authentication.
*
* @var string
*/
protected $username;
/**
* Currently used password if using authentication.
*
* @var string
*/
protected $password;
/**
* Array of server connection information used to connect to them.
*
* @var array
*/
protected $servers = array();
/**
* Array of open connections to servers.
*
* The connections are reused if already opened.
*
* @var array
*/
protected $connections = array();
/**
* Sets the list of servers to use.
*
* You could add the servers one-by-one using
* {@see CassandraCluster::registerServer()}.
*
* @param array $servers Servers that can be connected to.
*/
public function __construct(array $servers = array()) {
foreach ($servers as $server) {
$this->registerServer(
isset($server['host']) ? $server['host'] : '127.0.0.1',
isset($server['port']) ? $server['port'] : 9160,
isset($server['use-framed-transport']) ? $server['use-framed-transport'] : true,
isset($server['send-timeout-ms']) ? $server['send-timeout-ms'] : null,
isset($server['receive-timeout-ms']) ? $server['receive-timeout-ms'] : null
);
}
}
/**
* Closes all connections on destruction.
*/
public function __destruct() {
$this->closeConnections();
}
/**
* Registers a new server in the cluster pool.
*
* This does not mean that it is connected to at once but it may be used in
* any of the requests.
*
* @param string $host Hostname or IP of the node
* @param integer $port Port of the instance
* @param boolean $useFramedTransport Use framed or buffered transport
* @param integer $sendTimeoutMs Timeout of send operations in milliseconds
* @param integer $receiveTimeoutMs Timeout of receive operations
* @return CassandraCluster Self for chaining calls
*/
public function registerServer(
$host = '127.0.0.1',
$port = 9160,
$useFramedTransport = true,
$sendTimeoutMs = null,
$receiveTimeoutMs = null
) {
$this->servers[] = array(
'host' => $host,
'port' => $port,
'use-framed-transport' => $useFramedTransport,
'send-timeout-ms' => $sendTimeoutMs,
'receive-timeout-ms' => $receiveTimeoutMs
);
return $this;
}
/**
* Starts using given keyspace for all active and future connections.
*
* @param string $keyspace Keyspace to use
* @param string $username Optional username
* @param string $password Password
* @return CassandraCluster Self for chaining calls
*/
public function useKeyspace($keyspace, $username = null, $password = null) {
$this->keyspace = $keyspace;
$this->username = $username;
$this->password = $password;
$this->getConnection();
foreach ($this->connections as $connection) {
$connection->useKeyspace(
$keyspace,
$username,
$password
);
}
return $this;
}
/**
* Returns the name of currently used keyspace.
*
* @return string
*/
public function getCurrentKeyspace() {
return $this->keyspace;
}
/**
* Returns the list of servers connection info in the pool.
*
* @return array
*/
public function getServers() {
return $this->servers;
}
/**
* Returns a connection to one of the servers.
*
* The connections are created from the server list at random and if a
* server is chosen that already has an active connection, it is reused.
*
* If a closed connection is found in the pool, it is removed and may be
* reconnected to later.
*
* It will try to connect to the servers the number of servers times two
* times before giving up.
*
* @return CassandraConnection Connection to one of the nodes
* @throws CassandraConnectionFailedException If all connections failed
*/
public function getConnection() {
if (empty($this->servers)) {
throw new CassandraConnectionFailedException(
'Unable to create connection, the cluster server pool is empty'
);
}
$serverCount = count($this->servers);
$attemptsLeft = $serverCount * 2;
while ($attemptsLeft-- > 0) {
$randomServerIndex = mt_rand(0, $serverCount - 1);
if (isset($this->connections[$randomServerIndex])) {
if (!$this->connections[$randomServerIndex]->isOpen()) {
unset($this->connections[$randomServerIndex]);
continue;
}
return $this->connections[$randomServerIndex];
} else {
$server = $this->servers[$randomServerIndex];
try {
$this->connections[$randomServerIndex] = new CassandraConnection(
$server['host'],
$server['port'],
$server['use-framed-transport'],
$server['send-timeout-ms'],
$server['receive-timeout-ms']
);
$this->connections[$randomServerIndex]->useKeyspace(
$this->keyspace,
$this->username,
$this->password
);
return $this->connections[$randomServerIndex];
} catch (TException $e) {
continue;
}
}
}
throw new CassandraConnectionFailedException(
'Connecting to any of the '.$serverCount.' nodes failed'
);
}
/**
* Closes all open connections.
*
* @return CassandraCluster Self for chaining calls
*/
public function closeConnections() {
foreach ($this->connections as $connection) {
$connection->close();
}
$this->connections = array();
return $this;
}
}
/**
* The main Cassandra client class providing means to manage keyspaces and
* column families, get info about schema, fetch and store data.
*/
class Cassandra {
/**
* Array of named singleton instances.
*
* @var array
*/
protected static $instances = array();
/**
* Array of Cassandra low-level method names that require a keyspace to
* be selected. Populated as needed.
*
* @var array
*/
protected static $keyspaceRequiredMethods;
/**
* Key tokens in get() method requiring escaping.
*
* @var array
*/
protected static $requestKeyTokens = array('.', ':', ',', '-', '|');
/**
* The Cassandra cluster to use.
*
* @var CassandraCluster
*/
protected $cluster;
/**
* Maximum number of times to retry failed calls to Cassandra.
*
* Use {@see Cassandra::setMaxCallRetries()} to change.
*
* @var integer
*/
protected $maxCallRetries = 5;
/**
* Default maximum number of columns to fetch on range queries.
*
* @var integer
*/
protected $defaultColumnCount = 100;
/**
* Array of column families.
*
* If a single column family is requested more than once during a single
* request, the CassandraColumnFamily object is created only once.
*
* @var array
*/
protected $columnFamilies = array();
/**
* Authentication details per keyspace.
*
* @var array
*/
protected $keyspaceAuthentication = array();
/**
* Should key names and values be automatically packed to correct format
* based on column metadata.
*
* @var boolean
*/
protected $autopack = true;
/**
* It is enough if a single node replies.
*
* This makes reads and writes fast, but it also means that depending on
* what else is reading and writing, it's possible that they could briefly
* give conflicting answers.
*/
const CONSISTENCY_ONE = ConsistencyLevel::ONE;
/**
* Majority of the nodes holding the data must reply.
*
* If you have replication factor of 3 then it's enough if two of the
* nodes holding the data are up and reply. You need to have a replication
* factor of atleast three for this to work differently from all and should
* use odd number for replication factor.
*/
const CONSISTENCY_QUORUM = ConsistencyLevel::QUORUM;
/**
* Only meaningful for writes and means as soon as a write is received by
* any node, the call returns success.
*
* This occurs when your client might be connecting to node 5 but the nodes
* responsible for it are 6-8. The difference between ONE and ANY
* is that with ANY, as soon as node 5 receives the write, it returns
* success (but nodes 6-8 could be down or whatever). CL::ONE means that if
* you write to node 5, either 6, 7, or 8 have to return success before
* node 5 returns success.
*/
const CONSISTENCY_ANY = ConsistencyLevel::ANY;
/**
* Returns success only if all the nodes holding the data respond.
*
* This makes sure that all the nodes get the same data, but includes a
* performance penalty and also if a single node of the replication group
* is down, it's not possible to read or write the data as the requirement
* can not be fulfilled.
*/
const CONSISTENCY_ALL = ConsistencyLevel::ALL;
/**
* Standard column type.
*/
const COLUMN_STANDARD = 'Standard';
/**
* Super column type.
*/
const COLUMN_SUPER = 'Super';
/**
* ASCII text type.
*/
const TYPE_ASCII = 'AsciiType';
/**
* Simplest binary type
*/
const TYPE_BYTES = 'BytesType';
/**
* Used for a non-time based comparison. It is compared lexically, by byte
* value.
*
* (UUID) are a standardized unique indentifier in the form of a 128 bit
* number. In it's canonical form UUIDs are represented by a 32 digit
* hexadecimal number in the form of xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.
*/
const TYPE_LEXICAL_UUID = 'LexicalUUIDType';
/**
* Used for a time based comparison. It uses a version 1 UUID.
*
* (UUID) are a standardized unique indentifier in the form of a 128 bit
* number. In it's canonical form UUIDs are represented by a 32 digit
* hexadecimal number in the form of xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.
*/
const TYPE_TIME_UUID = 'TimeUUIDType';
/**
* Long data type.
*/
const TYPE_LONG = 'LongType';
/**
* Simple integer data type.
*/
const TYPE_INTEGER = 'IntegerType';
/**
* UTF8 international text data type.
*/
const TYPE_UTF8 = 'UTF8Type';
/**
* Equality comparator used in where queries.
*/
const OP_EQ = IndexOperator::EQ;
/**
* Strict less-than comparator.
*/
const OP_LT = IndexOperator::LT;
/**
* Strict greater-than comparator.
*/
const OP_GT = IndexOperator::GT;
/**
* Less-than-equals comparator.
*/
const OP_LTE = IndexOperator::LTE;
/**
* Greater-than-equals comparator.
*/
const OP_GTE = IndexOperator::GTE;
/**
* Returns the nodes that are next to each other on the ring.
*/
const PLACEMENT_LOCAL = 'org.apache.cassandra.locator.LocalStrategy';
/**
* Simple placement strategy not taking network topology into
* account.
*/
const PLACEMENT_SIMPLE = 'org.apache.cassandra.locator.SimpleStrategy';
/**
* Network topology aware placement strategy.
*
* Allows you to configure the number of replicas per data center as
* specified in the strategy_options. Replicas are placed on different racks
* within each data center, if possible.
*/
const PLACEMENT_NETWORK
= 'org.apache.cassandra.locator.NetworkTopologyStrategy';
/**
* Keys index type, currently only one supported.
*/
const INDEX_KEYS = 0;
/**
* Sets the list of servers to use and whether keys and values should be
* automatically packed to the correct format as defined by column families
* column metadata.
*
* @param array $servers Array of server connection details.
* @param type $autopack Should keys and data be autopacked.
*/
private function __construct(array $servers = array(), $autopack = true) {
$this->cluster = new CassandraCluster($servers);
$this->autopack = $autopack;
}
/**
* Prevent users cloning the instance.
*/
public function __clone() {
trigger_error('Clone is not allowed.', E_USER_ERROR);
}
/**
* Creates a new named cassandra instance.
*
* The name can be used in {@see Cassandra::getInstance()} to fetch the
* named singleton anywhere in the project.
*
* @param array $servers List of seed servers to connect to
* @param type $name Name of the instance
* @return Cassandra New cassandra instance
*/
public static function createInstance(array $servers, $name = 'main') {
self::$instances[$name] = new self($servers);
return self::$instances[$name];
}
/**
* Returns named singleton instance.
*
* Name defaults to "main" the same as createInstance() so when using a
* single connection pool, the name needs not to be set on neither.
*
* @param string $name Name of the instance to fetch
* @return Cassandra The instance
* @throws CassandraInvalidRequestException If no such instance exists
*/
public static function getInstance($name = 'main') {
if (!isset(self::$instances[$name])) {
throw new CassandraInvalidRequestException(
'Instance called "'.$name.'" does not exist'
);
}
return self::$instances[$name];
}
/**
* Registers a keyspace with authentication info.
*
* @param string $keyspace Keyspace name
* @param string $username The username
* @param string $password Password
*/
protected function registerKeyspace(
$keyspace,
$username = null,
$password = null
) {
$this->keyspaceAuthentication[$keyspace] = array(
'username' => $username,
'password' => $password
);
}
/**
* Start using a new keyspace.
*
* If the keyspace requires authentication, the username and password of it
* should be provided the first time this method is called. The username
* and password are stored so on next calls, they are used automatically if
* exist.
*
* @param string $keyspace Keyspace name
* @param string $username The username
* @param string $password Password
* @return Cassandra Self for call chaining
*/
public function useKeyspace($keyspace, $username = null, $password = null) {
if (!empty($username)) {
$this->registerKeyspace($keyspace, $username, $password);
} else if (isset($this->keyspaceAuthentication[$keyspace])) {
$username = $this->keyspaceAuthentication[$keyspace]['username'];
$password = $this->keyspaceAuthentication[$keyspace]['password'];
}
$this->cluster->useKeyspace($keyspace, $username, $password);
return $this;
}
/**
* Returns the Cassandra cluster of servers.
*
* @return CassandraCluster
*/
public function getCluster() {
return $this->cluster;
}
/**
* Returns random connection to a node.
*
* @return CassandraConnection
*/
public function getConnection() {
return $this->cluster->getConnection();
}
/**
* Closes all open connections to nodes.
*
* Proxies the call to cluster.
*/
public function closeConnections() {
$this->cluster->closeConnections();
}
/**
* Return the low-level thrift client.
*
* @return cassandra_CassandraClient
*/
public function getClient() {
return $this->cluster->getConnection()->getClient();
}
/**
* Sets the maximum number of times a call to Cassandra will be retried
* should it fail for any reason.
*
* @param integer $retryCount Number of times to retry, defaults to 5.
* @return Cassandra Self for call chaining
*/
public function setMaxCallRetries($retryCount) {
$this->maxCallRetries = $retryCount;
return $this;
}
/**
* Sets the default number of columns to fetch at maximum.
*
* @param integer $columnCountLimit The limit
*/
public function setDefaultColumnCount($columnCountLimit) {
$this->defaultColumnCount = $columnCountLimit;
}
/**
* Makes a call to a Cassandra node.
*
* This method accepts a variable number of parameters where the first one
* is the Cassandra client method name and the rest the parameters to pass
* to it.
*
* If a call fails, it will be retried for {@see Cassandra::$maxCallRetries}
* times, backing off (waiting) a bit more each time to prevent flooding.
*
* @return mixed The returned value
* @throws CassandraInvalidRequestException If the request is invalid
* @throws CassandraMaxRetriesException If The call failed all retries
*/
public function call(/*$methodName, $arg1, $arg2 */) {
$args = func_get_args();
$methodName = array_shift($args);
$tries = $this->maxCallRetries;
$lastException = null;
$keyspaceRequiredMethods = self::getKeyspaceRequiredMethods();
if (
in_array($methodName, $keyspaceRequiredMethods)
&& $this->cluster->getCurrentKeyspace() === null
) {
throw new CassandraInvalidRequestException(
'Unable to call "'.$methodName.'", no keyspace has been set'
);
}
$try = 0;
while($tries-- > 0) {
$client = $this->getClient();
$try++;
try {
return call_user_func_array(array($client, $methodName), $args);
} catch (Exception $e) {
$lastException = $e;
usleep(0.1 * pow(2, $try) * 1000000);
}
}
throw new CassandraMaxRetriesException(
'Failed calling "'.$methodName.'" the maximum of '.
$this->maxCallRetries.' times',
$lastException->getCode(),
$lastException
);
}
/**
* Returns ow-level keyspace description as returned by Cassandra.
*
* Returns the result of "describe_keyspace" call without any processing and
* does not use cache. Generally you want to use the more friendly version
* {@see Cassandra::getKeyspaceSchema()}.
*
* If no keyspace name is defined, the currently active keyspace is used.
*
* @param string $keyspace Optional keyspace name.
* @return array Keyspace description as given by Cassandra
*/
public function describeKeyspace($keyspace = null) {
if ($keyspace === null) {