From cab4945dc6816724481f0ce350339672b128adaa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Fri, 14 Aug 2020 11:29:20 +0200 Subject: [PATCH] fix(spanner): retry session not found for read (#2724) * fix(spanner): retry session not found for read `Session not found` errors were not retried for read operations on a single-use read-only transaction. Fixes #2718 * fix: rename method to satisfy lint check * fix: remove unused variable --- spanner/client_test.go | 47 +++++++++++++++++++ .../internal/testutil/inmem_spanner_server.go | 27 ++++++++--- spanner/transaction.go | 9 ++-- 3 files changed, 72 insertions(+), 11 deletions(-) diff --git a/spanner/client_test.go b/spanner/client_test.go index c410c9513853..c30f39e8139c 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -162,6 +162,53 @@ func TestClient_Single_SessionNotFound(t *testing.T) { } } +func TestClient_Single_Read_SessionNotFound(t *testing.T) { + t.Parallel() + + server, client, teardown := setupMockedTestServer(t) + defer teardown() + server.TestSpanner.PutExecutionTime( + MethodStreamingRead, + SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}}, + ) + ctx := context.Background() + iter := client.Single().Read(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"}) + defer iter.Stop() + rowCount := int64(0) + for { + _, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + t.Fatal(err) + } + rowCount++ + } + if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount { + t.Fatalf("row count mismatch\nGot: %v\nWant: %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount) + } +} + +func TestClient_Single_ReadRow_SessionNotFound(t *testing.T) { + t.Parallel() + + server, client, teardown := setupMockedTestServer(t) + defer teardown() + server.TestSpanner.PutExecutionTime( + MethodStreamingRead, + SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}}, + ) + ctx := context.Background() + row, err := client.Single().ReadRow(ctx, "Albums", Key{"foo"}, []string{"SingerId", "AlbumId", "AlbumTitle"}) + if err != nil { + t.Fatalf("Unexpected error for read row: %v", err) + } + if row == nil { + t.Fatal("ReadRow did not return a row") + } +} + func TestClient_Single_RetryableErrorOnPartialResultSet(t *testing.T) { t.Parallel() server, client, teardown := setupMockedTestServer(t) diff --git a/spanner/internal/testutil/inmem_spanner_server.go b/spanner/internal/testutil/inmem_spanner_server.go index de2867c6e33f..0735e095ee7d 100644 --- a/spanner/internal/testutil/inmem_spanner_server.go +++ b/spanner/internal/testutil/inmem_spanner_server.go @@ -84,6 +84,7 @@ const ( MethodExecuteSql string = "EXECUTE_SQL" MethodExecuteStreamingSql string = "EXECUTE_STREAMING_SQL" MethodExecuteBatchDml string = "EXECUTE_BATCH_DML" + MethodStreamingRead string = "EXECUTE_STREAMING_READ" ) // StatementResult represents a mocked result on the test server. The result is @@ -794,6 +795,10 @@ func (s *inMemSpannerServer) ExecuteStreamingSql(req *spannerpb.ExecuteSqlReques if err := s.simulateExecutionTime(MethodExecuteStreamingSql, req); err != nil { return err } + return s.executeStreamingSQL(req, stream) +} + +func (s *inMemSpannerServer) executeStreamingSQL(req *spannerpb.ExecuteSqlRequest, stream spannerpb.Spanner_ExecuteStreamingSqlServer) error { if req.Session == "" { return gstatus.Error(codes.InvalidArgument, "Missing session name") } @@ -917,14 +922,22 @@ func (s *inMemSpannerServer) Read(ctx context.Context, req *spannerpb.ReadReques } func (s *inMemSpannerServer) StreamingRead(req *spannerpb.ReadRequest, stream spannerpb.Spanner_StreamingReadServer) error { - s.mu.Lock() - if s.stopped { - s.mu.Unlock() - return gstatus.Error(codes.Unavailable, "server has been stopped") + if err := s.simulateExecutionTime(MethodStreamingRead, req); err != nil { + return err } - s.receivedRequests <- req - s.mu.Unlock() - return gstatus.Error(codes.Unimplemented, "Method not yet implemented") + sqlReq := &spannerpb.ExecuteSqlRequest{ + Session: req.Session, + Transaction: req.Transaction, + PartitionToken: req.PartitionToken, + ResumeToken: req.ResumeToken, + // KeySet is currently ignored. + Sql: fmt.Sprintf( + "SELECT %s FROM %s", + strings.Join(req.Columns, ", "), + req.Table, + ), + } + return s.executeStreamingSQL(sqlReq, stream) } func (s *inMemSpannerServer) BeginTransaction(ctx context.Context, req *spannerpb.BeginTransactionRequest) (*spannerpb.Transaction, error) { diff --git a/spanner/transaction.go b/spanner/transaction.go index c3e4773e2944..c12863bc8952 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -120,8 +120,8 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key return &RowIterator{err: err} } // Cloud Spanner will return "Session not found" on bad sessions. - sid, client := sh.getID(), sh.getClient() - if sid == "" || client == nil { + client := sh.getClient() + if client == nil { // Might happen if transaction is closed in the middle of a API call. return &RowIterator{err: errSessionClosed(sh)} } @@ -133,13 +133,13 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key limit = opts.Limit } } - return stream( + return streamWithReplaceSessionFunc( contextWithOutgoingMetadata(ctx, sh.getMetadata()), sh.session.logger, func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { return client.StreamingRead(ctx, &sppb.ReadRequest{ - Session: sid, + Session: t.sh.getID(), Transaction: ts, Table: table, Index: index, @@ -149,6 +149,7 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key Limit: int64(limit), }) }, + t.replaceSessionFunc, t.setTimestamp, t.release, )