Skip to content

Commit

Permalink
Add visibility tasks table for SQL (#1055)
Browse files Browse the repository at this point in the history
* Add SQL visibility_tasks table for Kafka deprecation
  • Loading branch information
wxing1292 committed Dec 9, 2020
1 parent 97c7451 commit 32924ef
Show file tree
Hide file tree
Showing 18 changed files with 645 additions and 37 deletions.
68 changes: 68 additions & 0 deletions common/persistence/sql/sqlplugin/history_visibility_tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package sqlplugin

import (
"context"
"database/sql"
)

type (
// VisibilityTasksRow represents a row in visibility_tasks table
VisibilityTasksRow struct {
ShardID int32
TaskID int64
Data []byte
DataEncoding string
}

// VisibilityTasksFilter contains the column names within visibility_tasks table that
// can be used to filter results through a WHERE clause
VisibilityTasksFilter struct {
ShardID int32
TaskID int64
}

// VisibilityTasksRangeFilter contains the column names within visibility_tasks table that
// can be used to filter results through a WHERE clause
VisibilityTasksRangeFilter struct {
ShardID int32
MinTaskID int64
MaxTaskID int64
}

// HistoryVisibilityTask is the SQL persistence interface for history visibility tasks
HistoryVisibilityTask interface {
InsertIntoVisibilityTasks(ctx context.Context, rows []VisibilityTasksRow) (sql.Result, error)
// SelectFromVisibilityTasks returns rows that match filter criteria from visibility_tasks table.
SelectFromVisibilityTasks(ctx context.Context, filter VisibilityTasksFilter) ([]VisibilityTasksRow, error)
// RangeSelectFromVisibilityTasks returns rows that match filter criteria from visibility_tasks table.
RangeSelectFromVisibilityTasks(ctx context.Context, filter VisibilityTasksRangeFilter) ([]VisibilityTasksRow, error)
// DeleteFromVisibilityTasks deletes one rows from visibility_tasks table.
DeleteFromVisibilityTasks(ctx context.Context, filter VisibilityTasksFilter) (sql.Result, error)
// RangeDeleteFromVisibilityTasks deletes one or more rows from visibility_tasks table.
RangeDeleteFromVisibilityTasks(ctx context.Context, filter VisibilityTasksRangeFilter) (sql.Result, error)
}
)
1 change: 1 addition & 0 deletions common/persistence/sql/sqlplugin/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type (
HistoryTimerTask
HistoryReplicationTask
HistoryReplicationDLQTask
HistoryVisibilityTask
}

// AdminCRUD defines admin operations for CLI and test suites
Expand Down
84 changes: 83 additions & 1 deletion common/persistence/sql/sqlplugin/mysql/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,17 @@ task_id > ? AND
task_id <= ?
ORDER BY task_id LIMIT ?`

createVisibilityTasksQuery = `INSERT INTO visibility_tasks(shard_id, task_id, data, data_encoding)
VALUES(:shard_id, :task_id, :data, :data_encoding)`

getVisibilityTaskQuery = `SELECT task_id, data, data_encoding
FROM visibility_tasks WHERE shard_id = ? AND task_id = ?`
getVisibilityTasksQuery = `SELECT task_id, data, data_encoding
FROM visibility_tasks WHERE shard_id = ? AND task_id > ? AND task_id <= ? ORDER BY shard_id, task_id`

deleteVisibilityTaskQuery = `DELETE FROM visibility_tasks WHERE shard_id = ? AND task_id = ?`
rangeDeleteVisibilityTaskQuery = `DELETE FROM visibility_tasks WHERE shard_id = ? AND task_id > ? AND task_id <= ?`

bufferedEventsColumns = `shard_id, namespace_id, workflow_id, run_id, data, data_encoding`
createBufferedEventsQuery = `INSERT INTO buffered_events(` + bufferedEventsColumns + `)
VALUES (:shard_id, :namespace_id, :workflow_id, :run_id, :data, :data_encoding)`
Expand Down Expand Up @@ -446,7 +457,7 @@ func (mdb *db) SelectFromTimerTasks(
for i := range rows {
rows[i].VisibilityTimestamp = mdb.converter.FromMySQLDateTime(rows[i].VisibilityTimestamp)
}
return rows, err
return rows, nil
}

// RangeSelectFromTimerTasks reads one or more rows from timer_tasks table
Expand Down Expand Up @@ -693,3 +704,74 @@ func (mdb *db) RangeDeleteFromReplicationDLQTasks(
filter.MaxTaskID,
)
}

// InsertIntoVisibilityTasks inserts one or more rows into visibility_tasks table
func (mdb *db) InsertIntoVisibilityTasks(
ctx context.Context,
rows []sqlplugin.VisibilityTasksRow,
) (sql.Result, error) {
return mdb.conn.NamedExecContext(ctx,
createVisibilityTasksQuery,
rows,
)
}

// SelectFromVisibilityTasks reads one or more rows from visibility_tasks table
func (mdb *db) SelectFromVisibilityTasks(
ctx context.Context,
filter sqlplugin.VisibilityTasksFilter,
) ([]sqlplugin.VisibilityTasksRow, error) {
var rows []sqlplugin.VisibilityTasksRow
if err := mdb.conn.SelectContext(ctx,
&rows,
getVisibilityTaskQuery,
filter.ShardID,
filter.TaskID,
); err != nil {
return nil, err
}
return rows, nil
}

// RangeSelectFromVisibilityTasks reads one or more rows from visibility_tasks table
func (mdb *db) RangeSelectFromVisibilityTasks(
ctx context.Context,
filter sqlplugin.VisibilityTasksRangeFilter,
) ([]sqlplugin.VisibilityTasksRow, error) {
var rows []sqlplugin.VisibilityTasksRow
if err := mdb.conn.SelectContext(ctx,
&rows,
getVisibilityTasksQuery,
filter.ShardID,
filter.MinTaskID,
filter.MaxTaskID,
); err != nil {
return nil, err
}
return rows, nil
}

// DeleteFromVisibilityTasks deletes one or more rows from visibility_tasks table
func (mdb *db) DeleteFromVisibilityTasks(
ctx context.Context,
filter sqlplugin.VisibilityTasksFilter,
) (sql.Result, error) {
return mdb.conn.ExecContext(ctx,
deleteVisibilityTaskQuery,
filter.ShardID,
filter.TaskID,
)
}

// RangeDeleteFromVisibilityTasks deletes one or more rows from visibility_tasks table
func (mdb *db) RangeDeleteFromVisibilityTasks(
ctx context.Context,
filter sqlplugin.VisibilityTasksRangeFilter,
) (sql.Result, error) {
return mdb.conn.ExecContext(ctx,
rangeDeleteVisibilityTaskQuery,
filter.ShardID,
filter.MinTaskID,
filter.MaxTaskID,
)
}
2 changes: 1 addition & 1 deletion common/persistence/sql/sqlplugin/mysql/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (mdb *db) SelectFromTasks(
if err != nil {
return nil, err
}
return rows, err
return rows, nil
}

// DeleteFromTasks deletes one or more rows from tasks table
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/sql/sqlplugin/mysql/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,5 +260,5 @@ func (mdb *db) SelectFromVisibility(
rows[i].CloseTime = &closeTime
}
}
return rows, err
return rows, nil
}
5 changes: 4 additions & 1 deletion common/persistence/sql/sqlplugin/postgresql/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,14 @@ func (pdb *db) SelectFromHistoryNode(
filter.MaxNodeID,
filter.PageSize,
)
if err != nil {
return nil, err
}
// NOTE: since we let txn_id multiple by -1 when inserting, we have to revert it back here
for index := range rows {
rows[index].TxnID = -rows[index].TxnID
}
return rows, err
return rows, nil
}

// DeleteFromHistoryNode deletes one or more rows from history_node table
Expand Down
90 changes: 87 additions & 3 deletions common/persistence/sql/sqlplugin/postgresql/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,17 @@ task_id > $3 AND
task_id <= $4
ORDER BY task_id LIMIT $5`

createVisibilityTasksQuery = `INSERT INTO visibility_tasks(shard_id, task_id, data, data_encoding)
VALUES(:shard_id, :task_id, :data, :data_encoding)`

getVisibilityTaskQuery = `SELECT task_id, data, data_encoding
FROM visibility_tasks WHERE shard_id = $1 AND task_id = $2`
getVisibilityTasksQuery = `SELECT task_id, data, data_encoding
FROM visibility_tasks WHERE shard_id = $1 AND task_id > $2 AND task_id <= $3 ORDER BY shard_id, task_id`

deleteVisibilityTaskQuery = `DELETE FROM visibility_tasks WHERE shard_id = $1 AND task_id = $2`
rangeDeleteVisibilityTaskQuery = `DELETE FROM visibility_tasks WHERE shard_id = $1 AND task_id > $2 AND task_id <= $3`

bufferedEventsColumns = `shard_id, namespace_id, workflow_id, run_id, data, data_encoding`
createBufferedEventsQuery = `INSERT INTO buffered_events(` + bufferedEventsColumns + `)
VALUES (:shard_id, :namespace_id, :workflow_id, :run_id, :data, :data_encoding)`
Expand Down Expand Up @@ -385,7 +396,7 @@ func (pdb *db) RangeSelectFromTransferTasks(
if err != nil {
return nil, err
}
return rows, err
return rows, nil
}

// DeleteFromTransferTasks deletes one or more rows from transfer_tasks table
Expand Down Expand Up @@ -446,7 +457,7 @@ func (pdb *db) SelectFromTimerTasks(
for i := range rows {
rows[i].VisibilityTimestamp = pdb.converter.FromPostgreSQLDateTime(rows[i].VisibilityTimestamp)
}
return rows, err
return rows, nil
}

// RangeSelectFromTimerTasks reads one or more rows from timer_tasks table
Expand All @@ -473,7 +484,7 @@ func (pdb *db) RangeSelectFromTimerTasks(
for i := range rows {
rows[i].VisibilityTimestamp = pdb.converter.FromPostgreSQLDateTime(rows[i].VisibilityTimestamp)
}
return rows, err
return rows, nil
}

// DeleteFromTimerTasks deletes one or more rows from timer_tasks table
Expand Down Expand Up @@ -694,3 +705,76 @@ func (pdb *db) RangeDeleteFromReplicationDLQTasks(
filter.MaxTaskID,
)
}

// InsertIntoVisibilityTasks inserts one or more rows into visibility_tasks table
func (pdb *db) InsertIntoVisibilityTasks(
ctx context.Context,
rows []sqlplugin.VisibilityTasksRow,
) (sql.Result, error) {
return pdb.conn.NamedExecContext(ctx,
createVisibilityTasksQuery,
rows,
)
}

// SelectFromVisibilityTasks reads one or more rows from visibility_tasks table
func (pdb *db) SelectFromVisibilityTasks(
ctx context.Context,
filter sqlplugin.VisibilityTasksFilter,
) ([]sqlplugin.VisibilityTasksRow, error) {
var rows []sqlplugin.VisibilityTasksRow
err := pdb.conn.SelectContext(ctx,
&rows,
getVisibilityTaskQuery,
filter.ShardID,
filter.TaskID,
)
if err != nil {
return nil, err
}
return rows, nil
}

// RangeSelectFromVisibilityTasks reads one or more rows from visibility_tasks table
func (pdb *db) RangeSelectFromVisibilityTasks(
ctx context.Context,
filter sqlplugin.VisibilityTasksRangeFilter,
) ([]sqlplugin.VisibilityTasksRow, error) {
var rows []sqlplugin.VisibilityTasksRow
err := pdb.conn.SelectContext(ctx,
&rows,
getVisibilityTasksQuery,
filter.ShardID,
filter.MinTaskID,
filter.MaxTaskID,
)
if err != nil {
return nil, err
}
return rows, nil
}

// DeleteFromVisibilityTasks deletes one or more rows from visibility_tasks table
func (pdb *db) DeleteFromVisibilityTasks(
ctx context.Context,
filter sqlplugin.VisibilityTasksFilter,
) (sql.Result, error) {
return pdb.conn.ExecContext(ctx,
deleteVisibilityTaskQuery,
filter.ShardID,
filter.TaskID,
)
}

// RangeDeleteFromVisibilityTasks deletes one or more rows from visibility_tasks table
func (pdb *db) RangeDeleteFromVisibilityTasks(
ctx context.Context,
filter sqlplugin.VisibilityTasksRangeFilter,
) (sql.Result, error) {
return pdb.conn.ExecContext(ctx,
rangeDeleteVisibilityTaskQuery,
filter.ShardID,
filter.MinTaskID,
filter.MaxTaskID,
)
}
3 changes: 0 additions & 3 deletions common/persistence/sql/sqlplugin/postgresql/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,6 @@ func (pdb *db) SelectFromTasks(
*filter.PageSize,
)
}
if err != nil {
return nil, err
}
return rows, err
}

Expand Down
2 changes: 1 addition & 1 deletion common/persistence/sql/sqlplugin/postgresql/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,5 +281,5 @@ func (pdb *db) SelectFromVisibility(
// come with lots of trailing spaces, probably due to the CHAR(64) type
rows[i].RunID = strings.TrimSpace(rows[i].RunID)
}
return rows, err
return rows, nil
}
Loading

0 comments on commit 32924ef

Please sign in to comment.