Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Observable.concat() with param Observable.groupBy, only items in first group emitted correct, other items do not emited group by group #7562

Closed
mrz2012 opened this issue May 15, 2023 · 2 comments
Labels

Comments

@mrz2012
Copy link

mrz2012 commented May 15, 2023

Flowable has the same problem too.
vesion i use: 1.3.6
here is the code sample


public class ConcatGroupTest {

    static class Employee {

        public Integer departmentId;

        public String name;

        public Employee(Integer departmentId, String name) {
            this.departmentId = departmentId;
            this.name = name;
        }

        @Override
        public String toString() {
            return "Employee{" + "departmentId=" + departmentId + ", name='" + name + '\'' + '}';
        }

    }

    public static void main(String[] args) {
        List<Employee> list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            list.add(new Employee(i % 5, String.valueOf(i)));
        }
        Observable<Employee> values = Observable.fromIterable(list);
        Observable.concat(values.groupBy(e -> e.departmentId))
                .subscribe(System.out::println);
    }

}

the result on console print

Employee{departmentId=0, name='0'}
Employee{departmentId=0, name='5'}
Employee{departmentId=1, name='1'}
Employee{departmentId=2, name='2'}
Employee{departmentId=3, name='3'}
Employee{departmentId=4, name='4'}
Employee{departmentId=1, name='6'}
Employee{departmentId=2, name='7'}
Employee{departmentId=3, name='8'}
Employee{departmentId=4, name='9'}

@akarnokd
Copy link
Member

You are experiencing the so-called group abandonment. Groups produced by groupBy require an immediate subscription otherwise it can lead to hangs (with Flowable) or excess memory usage (with Observable). concat delays these subscriptions to one at a time.

From the javadoc

Note also that ignoring groups or subscribing later (i.e., on another thread) will result in so-called group abandonment where a group will only contain one element and the group will be re-created over and over as new upstream items trigger a new group. The behavior is a trade-off between no-dataloss, upstream cancellation and excessive group creation.

So you'll have to merge the groups or collect each group then re-emit them later

values.groupBy(e -> e.departmentId)
        .flatMapSingle(v -> v.toList())
        .concatMapIterable(v -> v)
        .subscribe(System.out::println);

@mrz2012
Copy link
Author

mrz2012 commented May 15, 2023

i get it, thanks for your reply!

@akarnokd akarnokd closed this as completed Feb 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants