Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReactorNettyClient requestProcessor can retain data from queries #492

Closed
simonbasle opened this issue Feb 11, 2022 · 8 comments
Closed

ReactorNettyClient requestProcessor can retain data from queries #492

simonbasle opened this issue Feb 11, 2022 · 8 comments
Assignees
Labels
type: bug A general bug
Milestone

Comments

@simonbasle
Copy link

simonbasle commented Feb 11, 2022

Bug Report

Versions

  • Driver: 0.8.12.BUILD-SNAPSHOT
  • Database: (brought by testcontainers 1.16.3)
  • Java: 1.8.0
  • OS: OSX 10.15.7

Current Behavior

Following a report from a user first in Spring Framework then in Reactor Core, I investigated a memory leak where reducing a large dataset into a single array via Flux#reduce led to OutOfMemoryError.

In addition to the reproducer in the above issues from the original author (in Kotlin and using both Spring and Spring Data R2dbc), I've managed to create a self-contained test class that can run from the r2dbc-postgresql test suite with minimum effort (see below).

In a nutshell, the reduced byte[] from previous loops are retained, preventing garbage collection through netty channels / selectors. It appears that one major component in that retention is the ReactorNettyClient requestProcessor. This EmitterProcessor instance has a single subscriber, left even when the query has completed. This is congruent with what could be observed in the OP original repro.

This could point with a pooling issue in OP's reproducer, kind of simulated here by the fact the Connection is not closed?
I'm mentioning this because at first I didn't close the Connection in my above repro and I was seeing very similar paths to GC roots for the retained byte[]...

It also appears that setting Statement#fetchSize higher than the number of returned rows makes the issue go away.
edit: fetchSize doesn't help if Connection is not closed.

Table schema

Input Code
CREATE SCHEMA IF NOT EXISTS master_data AUTHORIZATION postgres;

CREATE TABLE master_data.csv (
  id int8 NOT NULL GENERATED BY DEFAULT AS IDENTITY,
  column1 text NOT NULL,
  column2 text NOT NULL,
  column3 text NOT NULL,
  CONSTRAINT csv_pkey PRIMARY KEY (id)
);

insert into master_data.csv (id,column1,column2,column3)
  select g.id, 'value11', 'value21', 'value31'
  from generate_series(1,1000000) as g(id)

Steps to reproduce

In addition to the OP's own Kotlin reproducer here, my own self-contained reproducer is below.

At LIMIT 30000, the OOM doesn't occur. The 20s pause at the end can be leveraged to trigger a Heap Dump from eg. JVisualVm, which can be inspected for retained ErasableByteArrayOutputStreams (or their internal byte[]). At that limit, the byte[] arrays should be the top size objects.

Typically:
image

Self-contained reproducer
package io.r2dbc.postgresql;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.PostgreSQLR2DBCDatabaseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Testcontainers
public class MemoryLeakIntegrationTests {

    Logger logger = LoggerFactory.getLogger(MemoryLeakIntegrationTests.class);

    @Container
    PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:13-alpine")
        .withDatabaseName("tpp")
        .withUsername("postgres")
        .withPassword("postgres")
        .withInitScript("leak-init.sql")
        .withExposedPorts(5432);


    static class ErasableByteArrayOutputStream extends ByteArrayOutputStream {

        @Override
        public synchronized void reset() {
            super.reset();
            buf = new byte[32];
        }
    }

    @Test
    void test() throws InterruptedException {
        logger.info("Will connect to port {}", postgres.getMappedPort(5432));

        final int LIMIT = 30_000; //originally, no LIMIT => 1_000_000 elements
        //the retaining is observable via a heap dump even with a LIMIT 30_000 (although the OOM doesn't occur)

        ConnectionFactory connectionFactory = ConnectionFactories.get(PostgreSQLR2DBCDatabaseContainer.getOptions(postgres));

        for (int i = 0; i < 10; i++) {
            final int round = i;

            Mono.usingWhen(connectionFactory.create(),
                    connection -> {
                        logger.info("Connection acquired {}", connection);

                        return Flux.from(connection
                                .createStatement("SELECT master_data.csv.* FROM master_data.csv ORDER BY id LIMIT " + LIMIT)
                                .execute()
                            )
                            .publishOn(Schedulers.boundedElastic())
                            .flatMap(r -> r.map((row, rowmetadata) -> {
                                int nbColumns = rowmetadata.getColumnNames().size();
                                List<String> rowContent = new ArrayList<>(nbColumns);
                                for (int colId = 0; colId < nbColumns; colId++) {
                                    if (colId == 0) {
                                        long id = row.get(colId, Long.class);
                                        if (id % (LIMIT / 3) == 0) {
                                            System.out.println("passed id " + id);
                                        }
                                        rowContent.add("" + id);
                                    }
                                    else {
                                        rowContent.add(row.get(colId, String.class));
                                    }
                                }
                                return rowContent;
                            }))
                            .flatMapIterable(Function.identity())
                            // having a ErasableByteArrayOutputStream class is useful to spot
                            // the relevant objects in profilers, plus it allows dirty workaround with reset().
                            .reduce(new ErasableByteArrayOutputStream(), (output, el) -> {
                                try {
                                    output.write(el.getBytes(StandardCharsets.UTF_8));
                                    output.write(" ".getBytes(StandardCharsets.UTF_8));
                                }
                                catch (IOException e) {
                                    e.printStackTrace();
                                }
                                return output;
                            })
                            .map(it -> {
                                byte[] result = it.toByteArray();
                                // custom reset is a dirty workaround to avoid retaining.
                                //it.reset();
                                return result;
                            });
                    },
                
                    // connection not closed: the byte[] will all stay in memory
                    connection -> Mono.empty()
                    // on the other hand, closing the connection ensures they get GCed
//                    connection -> Mono.from(connection.close())
//                        .doFinally(sig -> {
//                            logger.info("Connection closed {}", connection);
//                        })
                )
                .doOnNext(it -> logger.info("ByteArray with size {} was created in round {}", it.length, round))
                .block(Duration.ofMinutes(5));
        }

        System.gc();
        System.out.println("Will sleep for 20s");
        // capture a Heap Dump here, or even after 2-3 rounds, to observe the issue
        Thread.sleep(20_000);
    }
}
@simonbasle
Copy link
Author

cc @mp911de, @typik89

@mp911de mp911de self-assigned this Feb 16, 2022
@mp911de mp911de added type: bug A general bug and removed status: waiting-for-triage An issue we've not yet triaged labels Feb 16, 2022
@mp911de mp911de added this to the 0.9.1.RELEASE milestone Feb 16, 2022
mp911de added a commit that referenced this issue Feb 16, 2022
Reuse connection-closed exception factory method.

[#492]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
mp911de added a commit that referenced this issue Feb 16, 2022
… while emitting requests.

Once the conversation is accepted, we no longer need to check on a new backend message whether the connection is closed as a channelInactive()/connection.close() signal terminates conversations anyway.

[#492]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
mp911de added a commit that referenced this issue Feb 16, 2022
Reuse connection-closed exception factory method.

[#492]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
mp911de added a commit that referenced this issue Feb 16, 2022
… while emitting requests.

Once the conversation is accepted, we no longer need to check on a new backend message whether the connection is closed as a channelInactive()/connection.close() signal terminates conversations anyway.

[#492]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
mp911de added a commit that referenced this issue Feb 16, 2022
Reuse connection-closed exception factory method.

[#492]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
mp911de added a commit that referenced this issue Feb 16, 2022
… while emitting requests.

Once the conversation is accepted, we no longer need to check on a new backend message whether the connection is closed as a channelInactive()/connection.close() signal terminates conversations anyway.

[#492]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
@mp911de
Copy link
Collaborator

mp911de commented Feb 16, 2022

@typik89 we applied a change to the driver that seems to resolve the issue. Since we weren't able to fully confirm that the fix is working, can you retest against the latest snapshots r2dbc-postgresql-0.8.12.BUILD-20220216.140312-3.jar and let us know the outcome?

@typik89
Copy link

typik89 commented Feb 16, 2022

I run 5 requests sequentially and created a heap dump after this. It seems that only one object holds. It's better than It was, but It still looks strange.
Снимок1602
Also I have errors in log:

2022-02-16 23:02:31.078 ERROR 17928 --- [actor-tcp-nio-1] io.netty.util.ResourceLeakDetector : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:401)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)

2022-02-16 23:02:46.589 ERROR 17928 --- [ctor-http-nio-3] io.netty.util.ResourceLeakDetector : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.SimpleLeakAwareByteBuf.unwrappedDerived(SimpleLeakAwareByteBuf.java:143)
io.netty.buffer.SimpleLeakAwareByteBuf.readRetainedSlice(SimpleLeakAwareByteBuf.java:67)
io.r2dbc.postgresql.message.backend.DataRow.decodeColumn(DataRow.java:142)
io.r2dbc.postgresql.message.backend.DataRow.decode(DataRow.java:132)

@mp911de
Copy link
Collaborator

mp911de commented Feb 17, 2022

Regarding the referenced byte array, I think I need additional insights from the Reactor team.

The other issue that is related to the lingering ByteBuf is with us already for quite some time and we're not able to pinpoint it really.

mp911de added a commit that referenced this issue Feb 17, 2022
Windowed fluxes now properly discard ref-counted objects avoiding memory leaks upon cancellation.

[#492]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
mp911de added a commit that referenced this issue Feb 17, 2022
Windowed fluxes now properly discard ref-counted objects avoiding memory leaks upon cancellation.

[#492]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
@mp911de
Copy link
Collaborator

mp911de commented Feb 17, 2022

The leaking DataRow is a consequence of a missing or wrong context propagation in the windowUntil(…) operator that reacts differently depending on where the dropped-items hook is registered. The leak showed only up in cancelation scenarios and is fixed now.

@simonbasle
Copy link
Author

I've been running the sample with 500_000 elements in db and 5 loops with a 20s sleep at the end, as it is enough to see the issue in heap dumps. with snapshots, I indeed see a big improvement.

with 0.8.11:
image

with 0.8.12:
image

there was a lingering array retained by MonoReduceSeed, but that is because reduce is used and thus the ByteArrayOutputStream is retained by the publisher'sinitialSupplier lambda.

image

If we use reduceWith to explicitly make the construction of the ByteArrayOutputStream lazy, no more byte[] are retained:

.reduceWith({ ByteArrayOutputStream() }) { output, el ->
    output.write(el.toString().toByteArray())
    output.write(" ".toByteArray())
    output
}

which leads to this state:
image

@mp911de
Copy link
Collaborator

mp911de commented Feb 17, 2022

Sounds as if we could close this ticket. Any objections?

@typik89
Copy link

typik89 commented Feb 17, 2022

yes, reducewith helps.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A general bug
Projects
None yet
Development

No branches or pull requests

3 participants