Skip to content

Bad state: Cannot add event after closing when reading large table #398

@davidmartos96

Description

@davidmartos96

When doing a SELECT * of a large table the library throws with:

Unhandled exception:
Bad state: Cannot add event after closing
#0      _MultiStreamController.addErrorSync (dart:async/stream_impl.dart:1131:24)
#1      _readMessages.<anonymous closure>.<anonymous closure>.handleChunk (package:postgres/src/v3/protocol.dart:77:20)
<asynchronous suspension>

It seems to happen from version 3.4.0 onwards.
One interesting thing I noticed while testing different versions is that version 3.4.0 is almost twice as slow than 3.3.0 when batching inserting many rows, necessary to trigger this issue.

Here is the reproduction code:

import 'package:postgres/postgres.dart';

Future<void> createTableAndPopulate(Connection conn) async {
  final sw = Stopwatch()..start();

  await conn.execute('''
    CREATE TABLE IF NOT EXISTS large_table (
      id SERIAL PRIMARY KEY,
      c1 INTEGER NOT NULL,
      c2 INTEGER NOT NULL,
      c3 TEXT NOT NULL,
      c4 TEXT NOT NULL,
      c5 TEXT NOT NULL,
      c6 TEXT NOT NULL,
      c7 TEXT NOT NULL,
      c8 TEXT NOT NULL,
      c9 TEXT NOT NULL,
      c10 TEXT NOT NULL
    )
  ''');

  final numBatches = 20;
  final batchSize = 5000;

  for (var i = 0; i < numBatches; i++) {
    print("Batch $i of $numBatches");
    final values = List.generate(
        batchSize,
        (i) => [
              i,
              i * 2,
              'value $i',
              'value $i',
              'value $i',

              'value $i',
              'value $i',
              'value $i',
              'value $i',
              'value $i',
            ]);

    final allArgs = values.expand((e) => e).toList();
    final valuesPart = List.generate(
            batchSize,
            (i) =>
                '(${List.generate(10, (j) => '\$${i * 10 + j + 1}').join(', ')})')
        .join(', ');

    final stmt =
        'INSERT INTO large_table (c1, c2, c3, c4, c5, c6, c7, c8, c9, c10) VALUES $valuesPart';
    await conn.execute(
      stmt,
      parameters: allArgs,
    );
  }

  print('Inserted ${numBatches * batchSize} rows in ${sw.elapsed}');
}

void main() async {
  final conn = await Connection.open(
    Endpoint(
      host: 'localhost',
      database: 'large',
      username: 'postgres',
      password: 'changeme',
      port: 5434,
    ),
    settings: ConnectionSettings(sslMode: SslMode.disable),
  );
  print('has connection!');

  await conn.execute('DROP SCHEMA IF EXISTS public CASCADE');
  await conn.execute('CREATE SCHEMA public');
  await createTableAndPopulate(conn);

  final rows = await conn.execute('SELECT * FROM large_table');
  print("SELECTED ROWS ${rows.length}");

  await conn.close();
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions