Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export cassandra test setup definitions for reuse #2581

Merged
merged 2 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 35 additions & 53 deletions common/persistence/tests/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,48 +31,30 @@ import (

"github.com/stretchr/testify/suite"

"go.temporal.io/server/common/config"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/persistence/cassandra"
"go.temporal.io/server/common/persistence/serialization"
_ "go.temporal.io/server/common/persistence/sql/sqlplugin/mysql"
"go.temporal.io/server/common/resolver"
"go.temporal.io/server/common/shuffle"
"go.temporal.io/server/environment"
)

// TODO merge the initialization with existing persistence setup
const (
testCassandraClusterName = "temporal_cassandra_cluster"
func setUpCassandraTest(t *testing.T) (CassandraTestData, func()) {
var testData CassandraTestData
testData.Cfg = NewCassandraConfig()
testData.Logger = log.NewZapLogger(zaptest.NewLogger(t))
SetUpCassandraDatabase(testData.Cfg, testData.Logger)
SetUpCassandraSchema(testData.Cfg, testData.Logger)

testCassandraUser = "temporal"
testCassandraPassword = "temporal"
testCassandraDatabaseNamePrefix = "test_"
testCassandraDatabaseNameSuffix = "temporal_persistence"
)

type cassandraTestData struct {
cfg *config.Cassandra
factory *cassandra.Factory
logger log.Logger
}

func setUpCassandraTest(t *testing.T) (cassandraTestData, func()) {
var testData cassandraTestData
testData.cfg = newCassandraConfig()
testData.logger = log.NewZapLogger(zaptest.NewLogger(t))
SetUpCassandraDatabase(testData.cfg, testData.logger)
SetUpCassandraSchema(testData.cfg, testData.logger)

testData.factory = cassandra.NewFactory(
*testData.cfg,
testData.Factory = cassandra.NewFactory(
*testData.Cfg,
resolver.NewNoopResolver(),
testCassandraClusterName,
testData.logger,
testData.Logger,
)

tearDown := func() {
testData.factory.Close()
TearDownCassandraKeyspace(testData.cfg)
testData.Factory.Close()
TearDownCassandraKeyspace(testData.Cfg)
}

return testData, tearDown
Expand All @@ -82,82 +64,82 @@ func TestCassandraExecutionMutableStateStoreSuite(t *testing.T) {
testData, tearDown := setUpCassandraTest(t)
defer tearDown()

shardStore, err := testData.factory.NewShardStore()
shardStore, err := testData.Factory.NewShardStore()
if err != nil {
t.Fatalf("unable to create Cassandra DB: %v", err)
}
executionStore, err := testData.factory.NewExecutionStore()
executionStore, err := testData.Factory.NewExecutionStore()
if err != nil {
t.Fatalf("unable to create Cassandra DB: %v", err)
}

s := NewExecutionMutableStateSuite(t, shardStore, executionStore, testData.logger)
s := NewExecutionMutableStateSuite(
t,
shardStore,
executionStore,
serialization.NewSerializer(),
testData.Logger)
suite.Run(t, s)
}

func TestCassandraExecutionMutableStateTaskStoreSuite(t *testing.T) {
testData, tearDown := setUpCassandraTest(t)
defer tearDown()

shardStore, err := testData.factory.NewShardStore()
shardStore, err := testData.Factory.NewShardStore()
if err != nil {
t.Fatalf("unable to create Cassandra DB: %v", err)
}
executionStore, err := testData.factory.NewExecutionStore()
executionStore, err := testData.Factory.NewExecutionStore()
if err != nil {
t.Fatalf("unable to create Cassandra DB: %v", err)
}

s := NewExecutionMutableStateTaskSuite(t, shardStore, executionStore, testData.logger)
s := NewExecutionMutableStateTaskSuite(
t,
shardStore,
executionStore,
serialization.NewSerializer(),
testData.Logger,
)
suite.Run(t, s)
}

func TestCassandraHistoryStoreSuite(t *testing.T) {
testData, tearDown := setUpCassandraTest(t)
defer tearDown()

store, err := testData.factory.NewExecutionStore()
store, err := testData.Factory.NewExecutionStore()
if err != nil {
t.Fatalf("unable to create Cassandra DB: %v", err)
}

s := NewHistoryEventsSuite(t, store, testData.logger)
s := NewHistoryEventsSuite(t, store, testData.Logger)
suite.Run(t, s)
}

func TestCassandraTaskQueueSuite(t *testing.T) {
testData, tearDown := setUpCassandraTest(t)
defer tearDown()

taskQueueStore, err := testData.factory.NewTaskStore()
taskQueueStore, err := testData.Factory.NewTaskStore()
if err != nil {
t.Fatalf("unable to create Cassandra DB: %v", err)
}

s := NewTaskQueueSuite(t, taskQueueStore, testData.logger)
s := NewTaskQueueSuite(t, taskQueueStore, testData.Logger)
suite.Run(t, s)
}

func TestCassandraTaskQueueTaskSuite(t *testing.T) {
testData, tearDown := setUpCassandraTest(t)
defer tearDown()

taskQueueStore, err := testData.factory.NewTaskStore()
taskQueueStore, err := testData.Factory.NewTaskStore()
if err != nil {
t.Fatalf("unable to create Cassandra DB: %v", err)
}

s := NewTaskQueueTaskSuite(t, taskQueueStore, testData.logger)
s := NewTaskQueueTaskSuite(t, taskQueueStore, testData.Logger)
suite.Run(t, s)
}

// newCassandraConfig returns a new Cassandra config for test
func newCassandraConfig() *config.Cassandra {
return &config.Cassandra{
User: testCassandraUser,
Password: testCassandraPassword,
Hosts: environment.GetCassandraAddress(),
Port: environment.GetCassandraPort(),
Keyspace: testCassandraDatabaseNamePrefix + shuffle.String(testCassandraDatabaseNameSuffix),
}
}
29 changes: 29 additions & 0 deletions common/persistence/tests/cassandra_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
"go.temporal.io/server/common/persistence/cassandra"
"go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"go.temporal.io/server/common/resolver"
"go.temporal.io/server/common/shuffle"
"go.temporal.io/server/environment"
)

const (
Expand All @@ -48,6 +50,22 @@ const (
testCassandraVisibilitySchema = "../../../schema/cassandra/visibility/schema.cql"
)

// TODO merge the initialization with existing persistence setup
const (
testCassandraClusterName = "temporal_cassandra_cluster"

testCassandraUser = "temporal"
testCassandraPassword = "temporal"
testCassandraDatabaseNamePrefix = "test_"
testCassandraDatabaseNameSuffix = "temporal_persistence"
)

type CassandraTestData struct {
Cfg *config.Cassandra
Factory *cassandra.Factory
Logger log.Logger
}

func SetUpCassandraDatabase(cfg *config.Cassandra, logger log.Logger) {
adminCfg := *cfg
// NOTE need to connect with empty name to create new database
Expand Down Expand Up @@ -176,3 +194,14 @@ func GetSchemaFiles(schemaDir string, logger log.Logger) []string {

return retVal
}

// NewCassandraConfig returns a new Cassandra config for test
func NewCassandraConfig() *config.Cassandra {
return &config.Cassandra{
User: testCassandraUser,
Password: testCassandraPassword,
Hosts: environment.GetCassandraAddress(),
Port: environment.GetCassandraPort(),
Keyspace: testCassandraDatabaseNamePrefix + shuffle.String(testCassandraDatabaseNameSuffix),
}
}
5 changes: 3 additions & 2 deletions common/persistence/tests/execution_mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,18 @@ func NewExecutionMutableStateSuite(
t *testing.T,
shardStore p.ShardStore,
executionStore p.ExecutionStore,
serializer serialization.Serializer,
logger log.Logger,
) *ExecutionMutableStateSuite {
return &ExecutionMutableStateSuite{
Assertions: require.New(t),
ShardManager: p.NewShardManager(
shardStore,
serialization.NewSerializer(),
serializer,
),
ExecutionManager: p.NewExecutionManager(
executionStore,
serialization.NewSerializer(),
serializer,
logger,
dynamicconfig.GetIntPropertyFn(4*1024*1024),
),
Expand Down
35 changes: 18 additions & 17 deletions common/persistence/tests/execution_mutable_state_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,18 @@ func NewExecutionMutableStateTaskSuite(
t *testing.T,
shardStore p.ShardStore,
executionStore p.ExecutionStore,
serializer serialization.Serializer,
logger log.Logger,
) *ExecutionMutableStateTaskSuite {
return &ExecutionMutableStateTaskSuite{
Assertions: require.New(t),
ShardManager: p.NewShardManager(
shardStore,
serialization.NewSerializer(),
serializer,
),
ExecutionManager: p.NewExecutionManager(
executionStore,
serialization.NewSerializer(),
serializer,
logger,
dynamicconfig.GetIntPropertyFn(4*1024*1024),
),
Expand Down Expand Up @@ -130,7 +131,7 @@ func (s *ExecutionMutableStateTaskSuite) TearDownTest() {

func (s *ExecutionMutableStateTaskSuite) TestAddGetTransferTasks_Multiple() {
numTasks := 20
transferTasks := s.addRandomTasks(
transferTasks := s.AddRandomTasks(
tasks.CategoryTransfer,
numTasks,
func(workflowKey definition.WorkflowKey, taskID int64, visibilityTimestamp time.Time) tasks.Task {
Expand All @@ -142,8 +143,8 @@ func (s *ExecutionMutableStateTaskSuite) TestAddGetTransferTasks_Multiple() {
},
)

transferTasks, inclusiveMinTaskKey, exclusiveMaxTaskKey := s.randomPaginateRange(transferTasks)
loadedTasks := s.paginateTasks(
transferTasks, inclusiveMinTaskKey, exclusiveMaxTaskKey := s.RandomPaginateRange(transferTasks)
loadedTasks := s.PaginateTasks(
tasks.CategoryTransfer,
inclusiveMinTaskKey,
exclusiveMaxTaskKey,
Expand All @@ -154,7 +155,7 @@ func (s *ExecutionMutableStateTaskSuite) TestAddGetTransferTasks_Multiple() {

func (s *ExecutionMutableStateTaskSuite) TestAddGetTimerTasks_Multiple() {
numTasks := 20
timerTasks := s.addRandomTasks(
timerTasks := s.AddRandomTasks(
tasks.CategoryTimer,
numTasks,
func(workflowKey definition.WorkflowKey, taskID int64, visibilityTimestamp time.Time) tasks.Task {
Expand All @@ -166,8 +167,8 @@ func (s *ExecutionMutableStateTaskSuite) TestAddGetTimerTasks_Multiple() {
},
)

timerTasks, inclusiveMinTaskKey, exclusiveMaxTaskKey := s.randomPaginateRange(timerTasks)
loadedTasks := s.paginateTasks(
timerTasks, inclusiveMinTaskKey, exclusiveMaxTaskKey := s.RandomPaginateRange(timerTasks)
loadedTasks := s.PaginateTasks(
tasks.CategoryTimer,
inclusiveMinTaskKey,
exclusiveMaxTaskKey,
Expand All @@ -178,7 +179,7 @@ func (s *ExecutionMutableStateTaskSuite) TestAddGetTimerTasks_Multiple() {

func (s *ExecutionMutableStateTaskSuite) TestAddGetReplicationTasks_Multiple() {
numTasks := 20
replicationTasks := s.addRandomTasks(
replicationTasks := s.AddRandomTasks(
tasks.CategoryReplication,
numTasks,
func(workflowKey definition.WorkflowKey, taskID int64, visibilityTimestamp time.Time) tasks.Task {
Expand All @@ -190,8 +191,8 @@ func (s *ExecutionMutableStateTaskSuite) TestAddGetReplicationTasks_Multiple() {
},
)

replicationTasks, inclusiveMinTaskKey, exclusiveMaxTaskKey := s.randomPaginateRange(replicationTasks)
loadedTasks := s.paginateTasks(
replicationTasks, inclusiveMinTaskKey, exclusiveMaxTaskKey := s.RandomPaginateRange(replicationTasks)
loadedTasks := s.PaginateTasks(
tasks.CategoryReplication,
inclusiveMinTaskKey,
exclusiveMaxTaskKey,
Expand All @@ -202,7 +203,7 @@ func (s *ExecutionMutableStateTaskSuite) TestAddGetReplicationTasks_Multiple() {

func (s *ExecutionMutableStateTaskSuite) TestAddGetVisibilityTasks_Multiple() {
numTasks := 20
visibilityTasks := s.addRandomTasks(
visibilityTasks := s.AddRandomTasks(
tasks.CategoryVisibility,
numTasks,
func(workflowKey definition.WorkflowKey, taskID int64, visibilityTimestamp time.Time) tasks.Task {
Expand All @@ -214,8 +215,8 @@ func (s *ExecutionMutableStateTaskSuite) TestAddGetVisibilityTasks_Multiple() {
},
)

visibilityTasks, inclusiveMinTaskKey, exclusiveMaxTaskKey := s.randomPaginateRange(visibilityTasks)
loadedTasks := s.paginateTasks(
visibilityTasks, inclusiveMinTaskKey, exclusiveMaxTaskKey := s.RandomPaginateRange(visibilityTasks)
loadedTasks := s.PaginateTasks(
tasks.CategoryVisibility,
inclusiveMinTaskKey,
exclusiveMaxTaskKey,
Expand All @@ -224,7 +225,7 @@ func (s *ExecutionMutableStateTaskSuite) TestAddGetVisibilityTasks_Multiple() {
s.Equal(visibilityTasks, loadedTasks)
}

func (s *ExecutionMutableStateTaskSuite) addRandomTasks(
func (s *ExecutionMutableStateTaskSuite) AddRandomTasks(
category tasks.Category,
numTasks int,
newTaskFn func(definition.WorkflowKey, int64, time.Time) tasks.Task,
Expand Down Expand Up @@ -253,7 +254,7 @@ func (s *ExecutionMutableStateTaskSuite) addRandomTasks(
return randomTasks
}

func (s *ExecutionMutableStateTaskSuite) paginateTasks(
func (s *ExecutionMutableStateTaskSuite) PaginateTasks(
category tasks.Category,
inclusiveMinTaskKey tasks.Key,
exclusiveMaxTaskKey tasks.Key,
Expand All @@ -280,7 +281,7 @@ func (s *ExecutionMutableStateTaskSuite) paginateTasks(
return loadedTasks
}

func (s *ExecutionMutableStateTaskSuite) randomPaginateRange(
func (s *ExecutionMutableStateTaskSuite) RandomPaginateRange(
createdTasks []tasks.Task,
) ([]tasks.Task, tasks.Key, tasks.Key) {
numTasks := len(createdTasks)
Expand Down
Loading