Skip to content

Commit

Permalink
Remove unnecessary Cassandra iter nil check (#1440)
Browse files Browse the repository at this point in the history
* Remove unnecessary Cassandra iter nil check since gocql will panic if iter is nil
  • Loading branch information
wxing1292 committed Apr 7, 2021
1 parent 0ff2418 commit a1a4de2
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 98 deletions.
4 changes: 0 additions & 4 deletions common/persistence/cassandra/cassandraClusterMetadata.go
Expand Up @@ -175,10 +175,6 @@ func (m *cassandraClusterMetadata) GetClusterMembers(request *p.GetClusterMember

iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()

if iter == nil {
return nil, serviceerror.NewInternal("GetClusterMembers operation failed. Not able to create query iterator.")
}

var clusterMembers []*p.ClusterMember

cqlHostID := make([]byte, 0, 16)
Expand Down
10 changes: 1 addition & 9 deletions common/persistence/cassandra/cassandraHistoryPersistence.go
Expand Up @@ -178,9 +178,6 @@ func (h *cassandraHistoryV2Persistence) ReadHistoryBranch(
query := h.session.Query(v2templateReadData, treeID, branchID, request.MinNodeID, request.MaxNodeID)

iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
return nil, serviceerror.NewInternal("ReadHistoryBranch operation failed. Not able to create query iterator.")
}
pagingToken := iter.PageState()

history := make([]*commonpb.DataBlob, 0, request.PageSize)
Expand Down Expand Up @@ -422,9 +419,7 @@ func (h *cassandraHistoryV2Persistence) GetAllHistoryTreeBranches(
query := h.session.Query(v2templateScanAllTreeBranches)

iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
return nil, serviceerror.NewInternal("GetAllHistoryTreeBranches operation failed. Not able to create query iterator.")
}

pagingToken := iter.PageState()

branches := make([]p.HistoryBranchDetail, 0, request.PageSize)
Expand Down Expand Up @@ -483,9 +478,6 @@ func (h *cassandraHistoryV2Persistence) GetHistoryTree(
var iter gocql.Iter
for {
iter = query.PageSize(pageSize).PageState(pagingToken).Iter()
if iter == nil {
return nil, serviceerror.NewInternal("GetHistoryTree operation failed. Not able to create query iterator.")
}
pagingToken = iter.PageState()

branchUUID := ""
Expand Down
Expand Up @@ -268,9 +268,6 @@ func (m *cassandraMetadataPersistenceV2) GetNamespace(request *p.GetNamespaceReq
func (m *cassandraMetadataPersistenceV2) ListNamespaces(request *p.ListNamespacesRequest) (*p.InternalListNamespacesResponse, error) {
query := m.session.Query(templateListNamespaceQueryV2, constNamespacePartition)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
return nil, serviceerror.NewInternal("ListNamespaces operation failed. Not able to create query iterator.")
}

response := &p.InternalListNamespacesResponse{}
for {
Expand Down
46 changes: 12 additions & 34 deletions common/persistence/cassandra/cassandraPersistence.go
Expand Up @@ -1654,12 +1654,8 @@ func (d *cassandraPersistence) ListConcreteExecutions(
templateListWorkflowExecutionQuery,
d.shardID,
rowTypeExecution,
).PageSize(request.PageSize).PageState(request.PageToken)

iter := query.Iter()
if iter == nil {
return nil, serviceerror.NewInternal("ListConcreteExecutions operation failed. Not able to create query iterator.")
}
)
iter := query.PageSize(request.PageSize).PageState(request.PageToken).Iter()

response := &p.InternalListConcreteExecutionsResponse{}
result := make(map[string]interface{})
Expand Down Expand Up @@ -1775,12 +1771,8 @@ func (d *cassandraPersistence) GetTransferTasks(request *p.GetTransferTasksReque
defaultVisibilityTimestamp,
request.ReadLevel,
request.MaxReadLevel,
).PageSize(request.BatchSize).PageState(request.NextPageToken)

iter := query.Iter()
if iter == nil {
return nil, serviceerror.NewInternal("GetTransferTasks operation failed. Not able to create query iterator.")
}
)
iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter()

response := &p.GetTransferTasksResponse{}
var data []byte
Expand Down Expand Up @@ -1844,12 +1836,8 @@ func (d *cassandraPersistence) GetVisibilityTasks(request *p.GetVisibilityTasksR
defaultVisibilityTimestamp,
request.ReadLevel,
request.MaxReadLevel,
).PageSize(request.BatchSize).PageState(request.NextPageToken)

iter := query.Iter()
if iter == nil {
return nil, serviceerror.NewInternal("GetVisibilityTasks operation failed. Not able to create query iterator.")
}
)
iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter()

response := &p.GetVisibilityTasksResponse{}
var data []byte
Expand Down Expand Up @@ -1921,12 +1909,10 @@ func (d *cassandraPersistence) GetReplicationTasks(
}

func (d *cassandraPersistence) populateGetReplicationTasksResponse(
query gocql.Query, operation string,
query gocql.Query,
operation string,
) (*p.GetReplicationTasksResponse, error) {
iter := query.Iter()
if iter == nil {
return nil, serviceerror.NewInternal("GetReplicationTasks operation failed. Not able to create query iterator.")
}

response := &p.GetReplicationTasksResponse{}
var data []byte
Expand Down Expand Up @@ -2378,12 +2364,8 @@ func (d *cassandraPersistence) GetTasks(request *p.GetTasksRequest) (*p.GetTasks
rowTypeTask,
request.ReadLevel,
*request.MaxReadLevel,
).PageSize(request.BatchSize)

iter := query.Iter()
if iter == nil {
return nil, serviceerror.NewInternal("GetTasks operation failed. Not able to create query iterator.")
}
)
iter := query.PageSize(request.BatchSize).Iter()

response := &p.GetTasksResponse{}
task := make(map[string]interface{})
Expand Down Expand Up @@ -2506,12 +2488,8 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *p.GetTimerIndexTasksR
rowTypeTimerRunID,
minTimestamp,
maxTimestamp,
).PageSize(request.BatchSize).PageState(request.NextPageToken)

iter := query.Iter()
if iter == nil {
return nil, serviceerror.NewInternal("GetTimerTasks operation failed. Not able to create query iterator.")
}
)
iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter()

response := &p.GetTimerIndexTasksResponse{}
var data []byte
Expand Down
11 changes: 2 additions & 9 deletions common/persistence/cassandra/cassandraQueue.go
Expand Up @@ -169,9 +169,6 @@ func (q *cassandraQueue) ReadMessages(
)

iter := query.Iter()
if iter == nil {
return nil, serviceerror.NewInternal("ReadMessages operation failed. Not able to create query iterator.")
}

var result []*persistence.QueueMessage
message := make(map[string]interface{})
Expand Down Expand Up @@ -200,12 +197,8 @@ func (q *cassandraQueue) ReadMessagesFromDLQ(
q.getDLQTypeFromQueueType(),
firstMessageID,
lastMessageID,
).PageSize(pageSize).PageState(pageToken)

iter := query.Iter()
if iter == nil {
return nil, nil, serviceerror.NewInternal(fmt.Sprintf("ReadMessagesFromDLQ operation failed. Not able to create query iterator."))
}
)
iter := query.PageSize(pageSize).PageState(pageToken).Iter()

var result []*persistence.QueueMessage
message := make(map[string]interface{})
Expand Down
35 changes: 2 additions & 33 deletions common/persistence/cassandra/cassandraVisibilityPersistence.go
Expand Up @@ -283,10 +283,6 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutions(
p.UnixNanoToDBTimestamp(request.EarliestStartTime),
p.UnixNanoToDBTimestamp(request.LatestStartTime)).Consistency(v.lowConslevel)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
// TODO: should return serviceerror.InvalidArgument if the token is invalid
return nil, serviceerror.NewInternal("ListOpenWorkflowExecutions operation failed. Not able to create query iterator.")
}

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.VisibilityWorkflowExecutionInfo, 0, request.PageSize)
Expand Down Expand Up @@ -314,10 +310,6 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutionsByType(
p.UnixNanoToDBTimestamp(request.LatestStartTime),
request.WorkflowTypeName).Consistency(v.lowConslevel)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
// TODO: should return serviceerror.InvalidArgument if the token is invalid
return nil, serviceerror.NewInternal("ListOpenWorkflowExecutionsByType operation failed. Not able to create query iterator.")
}

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.VisibilityWorkflowExecutionInfo, 0, request.PageSize)
Expand Down Expand Up @@ -345,10 +337,6 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutionsByWorkflowID(
p.UnixNanoToDBTimestamp(request.LatestStartTime),
request.WorkflowID).Consistency(v.lowConslevel)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
// TODO: should return serviceerror.InvalidArgument if the token is invalid
return nil, serviceerror.NewInternal("ListOpenWorkflowExecutionsByWorkflowID operation failed. Not able to create query iterator.")
}

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.VisibilityWorkflowExecutionInfo, 0, request.PageSize)
Expand All @@ -375,10 +363,6 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutions(
p.UnixNanoToDBTimestamp(request.EarliestStartTime),
p.UnixNanoToDBTimestamp(request.LatestStartTime)).Consistency(v.lowConslevel)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
// TODO: should return serviceerror.InvalidArgument if the token is invalid
return nil, serviceerror.NewInternal("ListClosedWorkflowExecutions operation failed. Not able to create query iterator.")
}

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.VisibilityWorkflowExecutionInfo, 0, request.PageSize)
Expand Down Expand Up @@ -406,10 +390,6 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByType(
p.UnixNanoToDBTimestamp(request.LatestStartTime),
request.WorkflowTypeName).Consistency(v.lowConslevel)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
// TODO: should return serviceerror.InvalidArgument if the token is invalid
return nil, serviceerror.NewInternal("ListClosedWorkflowExecutionsByType operation failed. Not able to create query iterator.")
}

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.VisibilityWorkflowExecutionInfo, 0, request.PageSize)
Expand Down Expand Up @@ -437,10 +417,6 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByWorkflowI
p.UnixNanoToDBTimestamp(request.LatestStartTime),
request.WorkflowID).Consistency(v.lowConslevel)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
// TODO: should return serviceerror.InvalidArgument if the token is invalid
return nil, serviceerror.NewInternal("ListClosedWorkflowExecutionsByWorkflowID operation failed. Not able to create query iterator.")
}

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.VisibilityWorkflowExecutionInfo, 0, request.PageSize)
Expand Down Expand Up @@ -468,10 +444,6 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByStatus(
p.UnixNanoToDBTimestamp(request.LatestStartTime),
request.Status).Consistency(v.lowConslevel)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
// TODO: should return serviceerror.InvalidArgument if the token is invalid
return nil, serviceerror.NewInternal("ListClosedWorkflowExecutionsByStatus operation failed. Not able to create query iterator.")
}

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.VisibilityWorkflowExecutionInfo, 0, request.PageSize)
Expand All @@ -497,12 +469,9 @@ func (v *cassandraVisibilityPersistence) GetClosedWorkflowExecution(
request.NamespaceID,
namespacePartition,
execution.GetWorkflowId(),
execution.GetRunId())

execution.GetRunId(),
)
iter := query.Iter()
if iter == nil {
return nil, serviceerror.NewInternal("GetClosedWorkflowExecution operation failed. Not able to create query iterator.")
}

wfexecution, has := readClosedWorkflowExecutionRecord(iter)
if !has {
Expand Down
3 changes: 0 additions & 3 deletions common/persistence/nosql/nosqlplugin/cassandra/gocql/query.go
Expand Up @@ -89,9 +89,6 @@ func (q *query) MapScanCAS(

func (q *query) Iter() Iter {
iter := q.gocqlQuery.Iter()
if iter == nil {
return nil
}
return iter
}

Expand Down
Expand Up @@ -153,9 +153,6 @@ func (s *session) MapExecuteBatchCAS(
defer func() { s.handleError(retError) }()

applied, iter, err := s.Value.Load().(*gocql.Session).MapExecuteBatchCAS(b.(*batch).gocqlBatch, previous)
if iter == nil {
return applied, nil, err
}
return applied, iter, err
}

Expand Down

0 comments on commit a1a4de2

Please sign in to comment.