This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 22
/
partition_validator.go
83 lines (67 loc) · 2.66 KB
/
partition_validator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package validators
import (
"fmt"
"github.com/lyft/datacatalog/pkg/errors"
datacatalog "github.com/lyft/datacatalog/protos/gen"
"google.golang.org/grpc/codes"
)
const (
partitionKeyName = "partitionKey"
partitionValueName = "partitionValue"
)
func ValidatePartitions(datasetPartitionKeys []string, artifactPartitions []*datacatalog.Partition) error {
if len(datasetPartitionKeys) != len(artifactPartitions) {
return errors.NewDataCatalogErrorf(codes.InvalidArgument, "Partition key mismatch, dataset keys: %+v, artifact Partitions: %+v", datasetPartitionKeys, artifactPartitions)
}
// Not all datasets need to be partitioned
if len(datasetPartitionKeys) == 0 {
return nil
}
// compare the contents of the datasetkeys and artifact keys
partitionErrors := make([]error, 0)
keyMismatch := false
partitionKeyMatches := make(map[string]bool, len(artifactPartitions))
for _, datasetPartitionKey := range datasetPartitionKeys {
partitionKeyMatches[datasetPartitionKey] = false
}
for idx, artifactPartition := range artifactPartitions {
if artifactPartition == nil {
partitionErrors = append(partitionErrors, NewMissingArgumentError(fmt.Sprintf("%v[%v]", partitionKeyName, idx)))
continue
}
if err := ValidateEmptyStringField(partitionKeyName, artifactPartition.Key); err != nil {
partitionErrors = append(partitionErrors, NewMissingArgumentError(fmt.Sprintf("%v[%v]", partitionKeyName, idx)))
} else if err := ValidateEmptyStringField(partitionValueName, artifactPartition.Value); err != nil {
partitionErrors = append(partitionErrors, NewMissingArgumentError(fmt.Sprintf("%v[%v]", partitionValueName, idx)))
} else {
_, ok := partitionKeyMatches[artifactPartition.Key]
if ok {
partitionKeyMatches[artifactPartition.Key] = true
} else {
keyMismatch = true
}
}
}
if keyMismatch {
partitionErrors = append(partitionErrors, errors.NewDataCatalogErrorf(codes.InvalidArgument, "Artifact partition assignment does not match dataset partition keys: %v", partitionKeyMatches))
}
if len(partitionErrors) > 0 {
return errors.NewCollectedErrors(codes.InvalidArgument, partitionErrors)
}
return nil
}
// Validate that the partition keys are unique strings
func ValidateUniquePartitionKeys(partitionKeys []string) error {
invalidPartitionKeys := false
partitionKeySet := make(map[string]uint8, len(partitionKeys))
for _, partitionKey := range partitionKeys {
partitionKeySet[partitionKey]++
if partitionKeySet[partitionKey] > 1 {
invalidPartitionKeys = true
}
}
if invalidPartitionKeys {
return NewInvalidArgumentError(partitionKeyName, fmt.Sprintf("Keys are not unique, occurrence count: %+v", partitionKeySet))
}
return nil
}