1111use Interop \Amqp \AmqpMessage ;
1212use Interop \Queue \PsrConsumer ;
1313use Interop \Queue \PsrContext ;
14+ use Interop \Queue \PsrMessage ;
1415use Interop \Queue \PsrProcessor ;
1516use Interop \Queue \PsrQueue ;
17+ use Psr \Log \LoggerInterface ;
1618use Psr \Log \NullLogger ;
1719
1820final class QueueConsumer
@@ -23,9 +25,9 @@ final class QueueConsumer
2325 private $ psrContext ;
2426
2527 /**
26- * @var ExtensionInterface|ChainExtension|null
28+ * @var ExtensionInterface|ChainExtension
2729 */
28- private $ extension ;
30+ private $ staticExtension ;
2931
3032 /**
3133 * [
@@ -46,6 +48,16 @@ final class QueueConsumer
4648 */
4749 private $ receiveTimeout ;
4850
51+ /**
52+ * @var ExtensionInterface|ChainExtension
53+ */
54+ private $ extension ;
55+
56+ /**
57+ * @var LoggerInterface
58+ */
59+ private $ logger ;
60+
4961 /**
5062 * @param PsrContext $psrContext
5163 * @param ExtensionInterface|ChainExtension|null $extension
@@ -59,11 +71,12 @@ public function __construct(
5971 $ receiveTimeout = 10
6072 ) {
6173 $ this ->psrContext = $ psrContext ;
62- $ this ->extension = $ extension ;
74+ $ this ->staticExtension = $ extension ?: new ChainExtension ([]) ;
6375 $ this ->idleTimeout = $ idleTimeout ;
6476 $ this ->receiveTimeout = $ receiveTimeout ;
6577
6678 $ this ->boundProcessors = [];
79+ $ this ->logger = new NullLogger ();
6780 }
6881
6982 /**
@@ -157,19 +170,19 @@ public function consume(ExtensionInterface $runtimeExtension = null)
157170 $ consumers [$ queue ->getQueueName ()] = $ this ->psrContext ->createConsumer ($ queue );
158171 }
159172
160- $ extension = $ this ->extension ?: new ChainExtension ([]);
161- if ( $ runtimeExtension) {
162- $ extension = new ChainExtension ([ $ extension , $ runtimeExtension ]);
163- }
173+ $ this ->extension = $ runtimeExtension ?
174+ new ChainExtension ([ $ this -> staticExtension , $ runtimeExtension]) :
175+ $ this -> staticExtension
176+ ;
164177
165178 $ context = new Context ($ this ->psrContext );
166- $ extension ->onStart ($ context );
179+ $ this -> extension ->onStart ($ context );
167180
168- $ logger = $ context ->getLogger () ?: new NullLogger ();
169- $ logger ->info ('Start consuming ' );
181+ $ this -> logger = $ context ->getLogger () ?: new NullLogger ();
182+ $ this -> logger ->info ('Start consuming ' );
170183
171184 if ($ this ->psrContext instanceof AmqpContext) {
172- $ callback = function (AmqpMessage $ message , AmqpConsumer $ consumer ) use ($ extension , $ logger , &$ context ) {
185+ $ callback = function (AmqpMessage $ message , AmqpConsumer $ consumer ) use (&$ context ) {
173186 $ currentProcessor = null ;
174187
175188 /** @var PsrQueue $queue */
@@ -184,13 +197,13 @@ public function consume(ExtensionInterface $runtimeExtension = null)
184197 }
185198
186199 $ context = new Context ($ this ->psrContext );
187- $ context ->setLogger ($ logger );
200+ $ context ->setLogger ($ this -> logger );
188201 $ context ->setPsrQueue ($ consumer ->getQueue ());
189202 $ context ->setPsrConsumer ($ consumer );
190203 $ context ->setPsrProcessor ($ currentProcessor );
191204 $ context ->setPsrMessage ($ message );
192205
193- $ this ->doConsume ($ extension , $ context );
206+ $ this ->doConsume ($ this -> extension , $ context );
194207
195208 return true ;
196209 };
@@ -205,7 +218,7 @@ public function consume(ExtensionInterface $runtimeExtension = null)
205218 while (true ) {
206219 try {
207220 if ($ this ->psrContext instanceof AmqpContext) {
208- $ extension ->onBeforeReceive ($ context );
221+ $ this -> extension ->onBeforeReceive ($ context );
209222
210223 if ($ context ->isExecutionInterrupted ()) {
211224 throw new ConsumptionInterruptedException ();
@@ -214,23 +227,23 @@ public function consume(ExtensionInterface $runtimeExtension = null)
214227 $ this ->psrContext ->consume ($ this ->receiveTimeout );
215228
216229 usleep ($ this ->idleTimeout * 1000 );
217- $ extension ->onIdle ($ context );
230+ $ this -> extension ->onIdle ($ context );
218231 } else {
219232 /** @var PsrQueue $queue */
220233 foreach ($ this ->boundProcessors as list ($ queue , $ processor )) {
221234 $ consumer = $ consumers [$ queue ->getQueueName ()];
222235
223236 $ context = new Context ($ this ->psrContext );
224- $ context ->setLogger ($ logger );
237+ $ context ->setLogger ($ this -> logger );
225238 $ context ->setPsrQueue ($ queue );
226239 $ context ->setPsrConsumer ($ consumer );
227240 $ context ->setPsrProcessor ($ processor );
228241
229- $ this ->doConsume ($ extension , $ context );
242+ $ this ->doConsume ($ this -> extension , $ context );
230243 }
231244 }
232245 } catch (ConsumptionInterruptedException $ e ) {
233- $ logger ->info (sprintf ('Consuming interrupted ' ));
246+ $ this -> logger ->info (sprintf ('Consuming interrupted ' ));
234247
235248 if ($ this ->psrContext instanceof AmqpContext) {
236249 foreach ($ consumers as $ consumer ) {
@@ -242,15 +255,15 @@ public function consume(ExtensionInterface $runtimeExtension = null)
242255
243256 $ context ->setExecutionInterrupted (true );
244257
245- $ extension ->onInterrupted ($ context );
258+ $ this -> extension ->onInterrupted ($ context );
246259
247260 return ;
248261 } catch (\Exception $ exception ) {
249262 $ context ->setExecutionInterrupted (true );
250263 $ context ->setException ($ exception );
251264
252265 try {
253- $ this ->onInterruptionByException ($ extension , $ context );
266+ $ this ->onInterruptionByException ($ this -> extension , $ context );
254267 } catch (\Exception $ e ) {
255268 // for some reason finally does not work here on php5.5
256269
@@ -272,55 +285,26 @@ private function doConsume(ExtensionInterface $extension, Context $context)
272285 {
273286 $ processor = $ context ->getPsrProcessor ();
274287 $ consumer = $ context ->getPsrConsumer ();
275- $ logger = $ context ->getLogger ();
288+ $ this -> logger = $ context ->getLogger ();
276289
277290 if ($ context ->isExecutionInterrupted ()) {
278291 throw new ConsumptionInterruptedException ();
279292 }
280293
281294 $ message = $ context ->getPsrMessage ();
282295 if (false == $ message ) {
283- $ extension ->onBeforeReceive ($ context );
296+ $ this -> extension ->onBeforeReceive ($ context );
284297
285298 if ($ message = $ consumer ->receive ($ this ->receiveTimeout )) {
286299 $ context ->setPsrMessage ($ message );
287300 }
288301 }
289302
290303 if ($ message ) {
291- $ logger ->info ('Message received from the queue: ' .$ context ->getPsrQueue ()->getQueueName ());
292- $ logger ->debug ('Headers: {headers} ' , ['headers ' => new VarExport ($ message ->getHeaders ())]);
293- $ logger ->debug ('Properties: {properties} ' , ['properties ' => new VarExport ($ message ->getProperties ())]);
294- $ logger ->debug ('Payload: {payload} ' , ['payload ' => new VarExport ($ message ->getBody ())]);
295-
296- $ extension ->onPreReceived ($ context );
297- if (!$ context ->getResult ()) {
298- $ result = $ processor ->process ($ message , $ this ->psrContext );
299- $ context ->setResult ($ result );
300- }
301-
302- $ extension ->onResult ($ context );
303-
304- switch ($ context ->getResult ()) {
305- case Result::ACK :
306- $ consumer ->acknowledge ($ message );
307- break ;
308- case Result::REJECT :
309- $ consumer ->reject ($ message , false );
310- break ;
311- case Result::REQUEUE :
312- $ consumer ->reject ($ message , true );
313- break ;
314- default :
315- throw new \LogicException (sprintf ('Status is not supported: %s ' , $ context ->getResult ()));
316- }
317-
318- $ logger ->info (sprintf ('Message processed: %s ' , $ context ->getResult ()));
319-
320- $ extension ->onPostReceived ($ context );
304+ $ this ->processMessage ($ consumer , $ processor , $ message , $ context );
321305 } else {
322306 usleep ($ this ->idleTimeout * 1000 );
323- $ extension ->onIdle ($ context );
307+ $ this -> extension ->onIdle ($ context );
324308 }
325309
326310 if ($ context ->isExecutionInterrupted ()) {
@@ -336,16 +320,16 @@ private function doConsume(ExtensionInterface $extension, Context $context)
336320 */
337321 private function onInterruptionByException (ExtensionInterface $ extension , Context $ context )
338322 {
339- $ logger = $ context ->getLogger ();
340- $ logger ->error (sprintf ('Consuming interrupted by exception ' ));
323+ $ this -> logger = $ context ->getLogger ();
324+ $ this -> logger ->error (sprintf ('Consuming interrupted by exception ' ));
341325
342326 $ exception = $ context ->getException ();
343327
344328 try {
345- $ extension ->onInterrupted ($ context );
329+ $ this -> extension ->onInterrupted ($ context );
346330 } catch (\Exception $ e ) {
347331 // logic is similar to one in Symfony's ExceptionListener::onKernelException
348- $ logger ->error (sprintf (
332+ $ this -> logger ->error (sprintf (
349333 'Exception thrown when handling an exception (%s: %s at %s line %s) ' ,
350334 get_class ($ e ),
351335 $ e ->getMessage (),
@@ -369,4 +353,38 @@ private function onInterruptionByException(ExtensionInterface $extension, Contex
369353
370354 throw $ exception ;
371355 }
356+
357+ private function processMessage (PsrConsumer $ consumer , PsrProcessor $ processor , PsrMessage $ message , Context $ context )
358+ {
359+ $ this ->logger ->info ('Message received from the queue: ' .$ context ->getPsrQueue ()->getQueueName ());
360+ $ this ->logger ->debug ('Headers: {headers} ' , ['headers ' => new VarExport ($ message ->getHeaders ())]);
361+ $ this ->logger ->debug ('Properties: {properties} ' , ['properties ' => new VarExport ($ message ->getProperties ())]);
362+ $ this ->logger ->debug ('Payload: {payload} ' , ['payload ' => new VarExport ($ message ->getBody ())]);
363+
364+ $ this ->extension ->onPreReceived ($ context );
365+ if (!$ context ->getResult ()) {
366+ $ result = $ processor ->process ($ message , $ this ->psrContext );
367+ $ context ->setResult ($ result );
368+ }
369+
370+ $ this ->extension ->onResult ($ context );
371+
372+ switch ($ context ->getResult ()) {
373+ case Result::ACK :
374+ $ consumer ->acknowledge ($ message );
375+ break ;
376+ case Result::REJECT :
377+ $ consumer ->reject ($ message , false );
378+ break ;
379+ case Result::REQUEUE :
380+ $ consumer ->reject ($ message , true );
381+ break ;
382+ default :
383+ throw new \LogicException (sprintf ('Status is not supported: %s ' , $ context ->getResult ()));
384+ }
385+
386+ $ this ->logger ->info (sprintf ('Message processed: %s ' , $ context ->getResult ()));
387+
388+ $ this ->extension ->onPostReceived ($ context );
389+ }
372390}
0 commit comments