Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue with ManyToManyConcurrentArrayQueue #234

Closed
danjmarques opened this issue Mar 18, 2021 · 1 comment
Closed

Issue with ManyToManyConcurrentArrayQueue #234

danjmarques opened this issue Mar 18, 2021 · 1 comment

Comments

@danjmarques
Copy link

I'm going to assume this is due to my misunderstanding of the semantics of ManyToManyConcurrentArrayQueue, but the note at the top of the javadoc for that class doesn't seem to describe what I'm seeing.

The following example fails about 20 percent of the time, when numOfProducers is > 1. It fails because the consumer thread throws "java.lang.IllegalStateException: Queue is full" when adding an element to queue1. As should be apparent in the code, semantically/logically if queue1 is full, queue2 must be empty, and so there can be no object that the consumer thread could be adding.

public void test2() throws InterruptedException {
       final AtomicBoolean failed = new AtomicBoolean(false);
       final int numOfProducers = 2;
       final int itemsPerProducer = 10_000;
       final int capacity = 16;

       // Total "space" in the system is 2 * capacity
       var queue1 = new ManyToManyConcurrentArrayQueue<Foo>(capacity);
       var queue2 = new ManyToManyConcurrentArrayQueue<Foo>(capacity);

       // There are only capacity objects in existence.
       for(int i = 0; i < capacity; i++){
           queue1.add(new Foo());
       }

       // Start the specified number of producers -
       // they busy spin waiting to poll an object
       // from queue1.  Once they grab one, the immediately
       // add it to queue2.
       for(int i = 0; i < numOfProducers; i++){
           new Thread(()->{
               int p = 0;
               while(p < itemsPerProducer){
                   Foo o = queue1.poll();
                   if (o != null) {
                       p++;
                       queue2.add(o);
                   }
               }
           }).start();;
       }

       // Start the single consumer thread, it busy spins
       // waiting to poll an object from queue2.
       // Once it grabs one, it immediately adds it on queue 1.
       Thread ct = new Thread( () -> {
           try {
               int c = 0;
               while (c < numOfProducers * itemsPerProducer) {
                   Foo o = queue2.poll();
                   if (o != null) {
                       c++;
                       queue1.add(o);
                   }
               }
           }
           catch (Throwable t){
               failed.set(true);
               throw t;
           }
       });
       ct.start();
       ct.join();
       assertFalse(failed.get());
   }
@mjpt777
Copy link
Contributor

mjpt777 commented Mar 21, 2021

Read this issue to get a better understanding of how the queues work. #216

@mjpt777 mjpt777 closed this as completed Mar 21, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants