Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

adds experimental support for SSL connections

  • Loading branch information...
commit 068ab91386118cd5fd0490b64c4cded8833dcf40 1 parent 284a578
Alvaro Videla videlalvaro authored

Showing 1 changed file with 108 additions and 66 deletions. Show diff stats Hide diff stats

  1. +108 66 amqp.inc
174 amqp.inc
@@ -22,7 +22,7 @@ function methodSig($a)
22 22 else
23 23 return sprintf("%d,%d",$a[0] ,$a[1]);
24 24 }
25   -
  25 +
26 26
27 27 class AMQPException extends Exception
28 28 {
@@ -71,7 +71,7 @@ class AbstractChannel
71 71 "60,60", // Basic.deliver
72 72 "60,71", // Basic.get_ok
73 73 );
74   -
  74 +
75 75 private static $CLOSE_METHODS = array(
76 76 "10,60", // Connection.close
77 77 "20,40", // Channel.close
@@ -135,7 +135,7 @@ class AbstractChannel
135 135 "90,30" => "Channel.tx_rollback",
136 136 "90,31" => "Channel.tx_rollback_ok"
137 137 );
138   -
  138 +
139 139 protected $debug;
140 140
141 141 public function __construct($connection, $channel_id)
@@ -148,7 +148,7 @@ class AbstractChannel
148 148 $this->auto_decode = false;
149 149 $this->debug = defined('AMQP_DEBUG') ? AMQP_DEBUG : false;
150 150 }
151   -
  151 +
152 152 public function getChannelId()
153 153 {
154 154 return $this->channel_id;
@@ -159,7 +159,7 @@ class AbstractChannel
159 159 {
160 160 if(!array_key_exists($method_sig, $this->method_map))
161 161 throw new Exception("Unknown AMQP method $method_sig");
162   -
  162 +
163 163 $amqp_method = $this->method_map[$method_sig];
164 164 if($content == NULL)
165 165 return call_user_func(array($this,$amqp_method), $args);
@@ -177,7 +177,7 @@ class AbstractChannel
177 177 return array_pop($this->frame_queue);
178 178 return $this->connection->wait_channel($this->channel_id);
179 179 }
180   -
  180 +
181 181 protected function send_method_frame($method_sig, $args="")
182 182 {
183 183 $this->connection->send_channel_method_frame($this->channel_id, $method_sig, $args);
@@ -226,10 +226,10 @@ class AbstractChannel
226 226 }
227 227 }
228 228 }
229   -
  229 +
230 230 return $msg;
231 231 }
232   -
  232 +
233 233 /**
234 234 * Wait for some expected AMQP methods and dispatch to them.
235 235 * Unexpected methods are queued up for later calls to this Python
@@ -259,36 +259,36 @@ class AbstractChannel
259 259 {
260 260 debug_msg("checking queue method " . $qk);
261 261 }
262   -
  262 +
263 263 $method_sig = $queued_method[0];
264 264 if($allowed_methods==NULL || in_array($method_sig, $allowed_methods))
265 265 {
266 266 unset($this->method_queue[$qk]);
267   -
  267 +
268 268 if($this->debug)
269 269 {
270 270 debug_msg("Executing queued method: $method_sig: " . AbstractChannel::$GLOBAL_METHOD_NAMES[methodSig($method_sig)]);
271 271 }
272   -
  272 +
273 273 return $this->dispatch($queued_method[0],
274 274 $queued_method[1],
275 275 $queued_method[2]);
276 276 }
277 277 }
278   -
  278 +
279 279 // No deferred methods? wait for new ones
280 280 while(true)
281 281 {
282 282 $frm = $this->next_frame();
283 283 $frame_type = $frm[0];
284 284 $payload = $frm[1];
285   -
  285 +
286 286 if($frame_type != 1)
287 287 throw new Exception("Expecting AMQP method, received frame type: $frame_type");
288 288
289 289 if(strlen($payload) < 4)
290 290 throw new Exception("Method frame too short");
291   -
  291 +
292 292 $method_sig_array = unpack("n2", substr($payload,0,4));
293 293 $method_sig = "" . $method_sig_array[1] . "," . $method_sig_array[2];
294 294 $args = new AMQPReader(substr($payload,4));
@@ -297,20 +297,20 @@ class AbstractChannel
297 297 {
298 298 debug_msg("> $method_sig: " . AbstractChannel::$GLOBAL_METHOD_NAMES[methodSig($method_sig)]);
299 299 }
300   -
301   -
  300 +
  301 +
302 302 if(in_array($method_sig, AbstractChannel::$CONTENT_METHODS))
303 303 $content = $this->wait_content();
304 304 else
305 305 $content = NULL;
306   -
  306 +
307 307 if($allowed_methods==NULL ||
308 308 in_array($method_sig,$allowed_methods) ||
309 309 in_array($method_sig,AbstractChannel::$CLOSE_METHODS))
310 310 {
311 311 return $this->dispatch($method_sig, $args, $content);
312 312 }
313   -
  313 +
314 314 // Wasn't what we were looking for? save it for later
315 315 if($this->debug)
316 316 {
@@ -319,13 +319,41 @@ class AbstractChannel
319 319 array_push($this->method_queue,array($method_sig, $args, $content));
320 320 }
321 321 }
322   -
  322 +
  323 +}
  324 +
  325 +class AMQPSSLConnection extends AMQPConnection
  326 +{
  327 + public function __construct($host, $port,
  328 + $user, $password,
  329 + $vhost="/", $ssl_options = array(), $options = array())
  330 + {
  331 + $ssl_context = empty($ssl_options) ? null : $this->create_ssl_context($ssl_options);
  332 +
  333 + parent::__construct($host, $port, $user, $password, $vhost="/",
  334 + isset($options['insist']) ? $options['insist'] : false,
  335 + isset($options['login_method']) ? $options['login_method'] : "AMQPLAIN",
  336 + isset($options['login_response']) ? $options['login_response'] : null,
  337 + isset($options['locale']) ? $options['locale'] : "en_US",
  338 + isset($options['connection_timeout']) ? $options['connection_timeout'] : 3,
  339 + isset($options['read_write_timeout']) ? $options['read_write_timeout'] : 3,
  340 + $ssl_context);
  341 + }
  342 +
  343 + private function create_ssl_context($options)
  344 + {
  345 + $ssl_context = stream_context_create();
  346 + foreach ($options as $k => $v) {
  347 + stream_context_set_option($ssl_context, 'ssl', $k, $v);
  348 + }
  349 + return $ssl_context;
  350 + }
323 351 }
324 352
325 353 class AMQPConnection extends AbstractChannel
326 354 {
327 355 public static $AMQP_PROTOCOL_HEADER = "AMQP\x01\x01\x09\x01";
328   -
  356 +
329 357 public static $LIBRARY_PROPERTIES = array(
330 358 "library" => array('S', "PHP Simple AMQP lib"),
331 359 "library_version" => array('S', "0.1")
@@ -340,7 +368,7 @@ class AMQPConnection extends AbstractChannel
340 368 "10,60" => "_close",
341 369 "10,61" => "close_ok"
342 370 );
343   -
  371 +
344 372 public function __construct($host, $port,
345 373 $user, $password,
346 374 $vhost="/",$insist=false,
@@ -348,7 +376,8 @@ class AMQPConnection extends AbstractChannel
348 376 $login_response=NULL,
349 377 $locale="en_US",
350 378 $connection_timeout = 3,
351   - $read_write_timeout = 3)
  379 + $read_write_timeout = 3,
  380 + $context = null)
352 381 {
353 382
354 383 if($user && $password)
@@ -359,7 +388,7 @@ class AMQPConnection extends AbstractChannel
359 388 $login_response = substr($login_response->getvalue(),4); //Skip the length
360 389 } else
361 390 $login_response = NULL;
362   -
  391 +
363 392
364 393 $d = AMQPConnection::$LIBRARY_PROPERTIES;
365 394 while(true)
@@ -367,17 +396,30 @@ class AMQPConnection extends AbstractChannel
367 396 $this->channels = array();
368 397 // The connection object itself is treated as channel 0
369 398 parent::__construct($this, 0);
370   -
  399 +
371 400 $this->channel_max = 65535;
372 401 $this->frame_max = 131072;
373 402
374 403 $errstr = $errno = NULL;
375 404 $this->sock = NULL;
376   - if (!($this->sock = fsockopen($host,$port,$errno,$errstr,$connection_timeout)))
  405 +
  406 + //TODO clean up
  407 + if($context)
  408 + {
  409 + $remote = sprintf('ssl://%s:%s', $host, $port);
  410 + $this->sock = stream_socket_client($remote, $errno, $errstr, 60, STREAM_CLIENT_CONNECT, $context);
  411 + }
  412 + else
  413 + {
  414 + $remote = sprintf('tcp://%s:%s', $host, $port);
  415 + $this->sock = stream_socket_client($remote, $errno, $errstr, 60, STREAM_CLIENT_CONNECT);
  416 + }
  417 +
  418 + if (!$this->sock)
377 419 {
378 420 throw new Exception ("Error Connecting to server($errno): $errstr ");
379 421 }
380   -
  422 +
381 423 stream_set_timeout($this->sock, $read_write_timeout);
382 424 stream_set_blocking($this->sock, 1);
383 425 $this->input = new AMQPReader(null, $this->sock);
@@ -385,7 +427,7 @@ class AMQPConnection extends AbstractChannel
385 427 $this->write(AMQPConnection::$AMQP_PROTOCOL_HEADER);
386 428 $this->wait(array("10,10"));
387 429 $this->x_start_ok($d, $login_method, $login_response, $locale);
388   -
  430 +
389 431 $this->wait_tune_ok = true;
390 432 while($this->wait_tune_ok)
391 433 {
@@ -404,11 +446,11 @@ class AMQPConnection extends AbstractChannel
404 446 {
405 447 debug_msg("closing socket");
406 448 }
407   -
  449 +
408 450 @fclose($this->sock); $this->sock=NULL;
409 451 }
410 452 }
411   -
  453 +
412 454 public function __destruct()
413 455 {
414 456 if(isset($this->input))
@@ -421,7 +463,7 @@ class AMQPConnection extends AbstractChannel
421 463 {
422 464 debug_msg("closing socket");
423 465 }
424   -
  466 +
425 467 @fclose($this->sock);
426 468 }
427 469 }
@@ -432,7 +474,7 @@ class AMQPConnection extends AbstractChannel
432 474 {
433 475 debug_msg("< [hex]:\n" . hexdump($data, $htmloutput = false, $uppercase = true, $return = true));
434 476 }
435   -
  477 +
436 478 $len = strlen($data);
437 479 while(true)
438 480 {
@@ -447,7 +489,7 @@ class AMQPConnection extends AbstractChannel
447 489 break;
448 490 }
449 491 }
450   -
  492 +
451 493 protected function do_close()
452 494 {
453 495 if(isset($this->input))
@@ -456,14 +498,14 @@ class AMQPConnection extends AbstractChannel
456 498 $this->input->close();
457 499 $this->input = NULL;
458 500 }
459   -
  501 +
460 502 if($this->sock)
461 503 {
462 504 if($this->debug)
463 505 {
464 506 debug_msg("closing socket");
465 507 }
466   -
  508 +
467 509 @fclose($this->sock);
468 510 $this->sock = NULL;
469 511 }
@@ -494,7 +536,7 @@ class AMQPConnection extends AbstractChannel
494 536 $pkt->write_octet(0xCE);
495 537 $pkt = $pkt->getvalue();
496 538 $this->write($pkt);
497   -
  539 +
498 540 while($body)
499 541 {
500 542 $payload = substr($body,0, $this->frame_max-8);
@@ -504,9 +546,9 @@ class AMQPConnection extends AbstractChannel
504 546 $pkt->write_octet(3);
505 547 $pkt->write_short($channel);
506 548 $pkt->write_long(strlen($payload));
507   -
  549 +
508 550 $pkt->write($payload);
509   -
  551 +
510 552 $pkt->write_octet(0xCE);
511 553 $pkt = $pkt->getvalue();
512 554 $this->write($pkt);
@@ -537,7 +579,7 @@ class AMQPConnection extends AbstractChannel
537 579 {
538 580 debug_msg("< " . methodSig($method_sig) . ": " . AbstractChannel::$GLOBAL_METHOD_NAMES[methodSig($method_sig)]);
539 581 }
540   -
  582 +
541 583 }
542 584
543 585 /**
@@ -549,11 +591,11 @@ class AMQPConnection extends AbstractChannel
549 591 $channel = $this->input->read_short();
550 592 $size = $this->input->read_long();
551 593 $payload = $this->input->read($size);
552   -
  594 +
553 595 $ch = $this->input->read_octet();
554 596 if($ch != 0xCE)
555 597 throw new Exception(sprintf("Framing error, unexpected byte: %x", $ch));
556   -
  598 +
557 599 return array($frame_type, $channel, $payload);
558 600 }
559 601
@@ -573,7 +615,7 @@ class AMQPConnection extends AbstractChannel
573 615 //for later, when the other channel is looking for frames.
574 616 array_push($this->channels[$frame_channel]->frame_queue,
575 617 array($frame_type, $payload));
576   -
  618 +
577 619 // If we just queued up a method for channel 0 (the Connection
578 620 // itself) it's probably a close method in reaction to some
579 621 // error, so deal with it right away.
@@ -590,7 +632,7 @@ class AMQPConnection extends AbstractChannel
590 632 {
591 633 if(array_key_exists($channel_id,$this->channels))
592 634 return $this->channels[$channel_id];
593   -
  635 +
594 636 return new AMQPChannel($this->connection, $channel_id);
595 637 }
596 638
@@ -643,7 +685,7 @@ class AMQPConnection extends AbstractChannel
643 685 $method_id = $args->read_short();
644 686
645 687 $this->x_close_ok();
646   -
  688 +
647 689 throw new AMQPConnectionException($reply_code, $reply_text, array($class_id, $method_id));
648 690 }
649 691
@@ -689,7 +731,7 @@ class AMQPConnection extends AbstractChannel
689 731 {
690 732 debug_msg("Open OK! known_hosts: " . $this->known_hosts);
691 733 }
692   -
  734 +
693 735 return NULL;
694 736 }
695 737
@@ -746,10 +788,10 @@ class AMQPConnection extends AbstractChannel
746 788 implode(', ', $this->mechanisms),
747 789 implode(', ', $this->locales)));
748 790 }
749   -
  791 +
750 792 }
751   -
752   -
  793 +
  794 +
753 795 protected function x_start_ok($client_properties, $mechanism, $response, $locale)
754 796 {
755 797 $args = new AMQPWriter();
@@ -819,7 +861,7 @@ class AMQPChannel extends AbstractChannel
819 861 "90,21" => "tx_commit_ok",
820 862 "90,31" => "tx_rollback_ok"
821 863 );
822   -
  864 +
823 865 public function __construct($connection,
824 866 $channel_id=NULL,
825 867 $auto_decode=true)
@@ -829,12 +871,12 @@ class AMQPChannel extends AbstractChannel
829 871 $channel_id = $connection->get_free_channel_id();
830 872
831 873 parent::__construct($connection, $channel_id);
832   -
  874 +
833 875 if($this->debug)
834 876 {
835 877 debug_msg("using channel_id: " . $channel_id);
836 878 }
837   -
  879 +
838 880 $this->default_ticket = 0;
839 881 $this->is_open = false;
840 882 $this->active = true; // Flow control
@@ -906,11 +948,11 @@ class AMQPChannel extends AbstractChannel
906 948
907 949 $this->send_method_frame(array(20, 41));
908 950 $this->do_close();
909   -
  951 +
910 952 throw new AMQPChannelException($reply_code, $reply_text,
911 953 array($class_id, $method_id));
912 954 }
913   -
  955 +
914 956 /**
915 957 * confirm a channel close
916 958 */
@@ -949,12 +991,12 @@ class AMQPChannel extends AbstractChannel
949 991 {
950 992 return $args->read_bit();
951 993 }
952   -
  994 +
953 995 protected function x_open($out_of_band="")
954 996 {
955 997 if($this->is_open)
956 998 return;
957   -
  999 +
958 1000 $args = new AMQPWriter();
959 1001 $args->write_shortstr($out_of_band);
960 1002 $this->send_method_frame(array(20, 10), $args);
@@ -962,7 +1004,7 @@ class AMQPChannel extends AbstractChannel
962 1004 "20,11" //Channel.open_ok
963 1005 ));
964 1006 }
965   -
  1007 +
966 1008 protected function open_ok($args)
967 1009 {
968 1010 $this->is_open = true;
@@ -999,7 +1041,7 @@ class AMQPChannel extends AbstractChannel
999 1041 $this->default_ticket = $args->read_short();
1000 1042 return $this->default_ticket;
1001 1043 }
1002   -
  1044 +
1003 1045
1004 1046 /**
1005 1047 * declare exchange, create if needed
@@ -1016,7 +1058,7 @@ class AMQPChannel extends AbstractChannel
1016 1058 {
1017 1059 if($arguments==NULL)
1018 1060 $arguments = array();
1019   -
  1061 +
1020 1062 $args = new AMQPWriter();
1021 1063 if($ticket != NULL)
1022 1064 $args->write_short($ticket);
@@ -1184,7 +1226,7 @@ class AMQPChannel extends AbstractChannel
1184 1226 $queue = $args->read_shortstr();
1185 1227 $message_count = $args->read_long();
1186 1228 $consumer_count = $args->read_long();
1187   -
  1229 +
1188 1230 return array($queue, $message_count, $consumer_count);
1189 1231 }
1190 1232
@@ -1306,7 +1348,7 @@ class AMQPChannel extends AbstractChannel
1306 1348 $consumer_tag = $this->wait(array(
1307 1349 "60,21" //Channel.basic_consume_ok
1308 1350 ));
1309   -
  1351 +
1310 1352 $this->callbacks[$consumer_tag] = $callback;
1311 1353 return $consumer_tag;
1312 1354 }
@@ -1329,7 +1371,7 @@ class AMQPChannel extends AbstractChannel
1329 1371 $redelivered = $args->read_bit();
1330 1372 $exchange = $args->read_shortstr();
1331 1373 $routing_key = $args->read_shortstr();
1332   -
  1374 +
1333 1375 $msg->delivery_info = array(
1334 1376 "channel" => $this,
1335 1377 "consumer_tag" => $consumer_tag,
@@ -1343,7 +1385,7 @@ class AMQPChannel extends AbstractChannel
1343 1385 $func = $this->callbacks[$consumer_tag];
1344 1386 else
1345 1387 $func = NULL;
1346   -
  1388 +
1347 1389 if($func!=NULL)
1348 1390 call_user_func($func, $msg);
1349 1391 }
@@ -1413,13 +1455,13 @@ class AMQPChannel extends AbstractChannel
1413 1455 $args->write_bit($mandatory);
1414 1456 $args->write_bit($immediate);
1415 1457 $this->send_method_frame(array(60, 40), $args);
1416   -
  1458 +
1417 1459 $this->connection->send_content($this->channel_id, 60, 0,
1418 1460 strlen($msg->body),
1419 1461 $msg->serialize_properties(),
1420 1462 $msg->body);
1421 1463 }
1422   -
  1464 +
1423 1465
1424 1466 /**
1425 1467 * specify quality of service
@@ -1485,15 +1527,15 @@ class AMQPChannel extends AbstractChannel
1485 1527 "90,21" //Channel.tx_commit_ok
1486 1528 ));
1487 1529 }
1488   -
  1530 +
1489 1531 /**
1490 1532 * confirm a successful commit
1491 1533 */
1492 1534 protected function tx_commit_ok($args)
1493 1535 {
1494 1536 }
1495   -
1496   -
  1537 +
  1538 +
1497 1539 /**
1498 1540 * abandon the current transaction
1499 1541 */

0 comments on commit 068ab91

Please sign in to comment.
Something went wrong with that request. Please try again.