Skip to content

Commit

Permalink
feat(warehouse): error tagging (#2956)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Feb 13, 2023
1 parent df7a491 commit 3131b96
Show file tree
Hide file tree
Showing 25 changed files with 465 additions and 47 deletions.
29 changes: 29 additions & 0 deletions warehouse/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package warehouse
import (
"errors"
"fmt"
"strings"

"github.com/rudderlabs/rudder-server/warehouse/integrations/manager"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
)

var (
Expand All @@ -18,3 +22,28 @@ type InvalidDestinationCredErr struct {
func (err InvalidDestinationCredErr) Error() string {
return fmt.Sprintf("Invalid destination creds, failed for operation: %s with err: \n%s", err.Operation, err.Base.Error())
}

type ErrorHandler struct {
Manager manager.Manager
}

// MatchErrorMappings matches the error with the error mappings defined in the integrations
// and returns the corresponding joins of the matched error type
// else returns UnknownError
func (e *ErrorHandler) MatchErrorMappings(err error) Tag {
var (
errMappings []string
errString = err.Error()
)

for _, em := range e.Manager.ErrorMappings() {
if em.Format.MatchString(errString) {
errMappings = append(errMappings, string(em.Type))
}
}

if len(errMappings) > 0 {
return Tag{Name: "error_mapping", Value: strings.Join(errMappings, ",")}
}
return Tag{Name: "error_mapping", Value: string(model.UnknownError)}
}
83 changes: 83 additions & 0 deletions warehouse/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package warehouse_test

import (
"bufio"
"errors"
"os"
"path"
"path/filepath"
"testing"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"

"github.com/rudderlabs/rudder-server/warehouse"
"github.com/rudderlabs/rudder-server/warehouse/integrations/manager"
"github.com/stretchr/testify/require"
)

func TestErrorHandler_MatchErrorMappings(t *testing.T) {
readLines := func(f *os.File) ([]string, error) {
var (
lines []string

scanner = bufio.NewScanner(f)
)

for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
lines = append(lines, line)
}
return lines, scanner.Err()
}

var (
files []string
err error
)

files, err = filepath.Glob("testdata/errors/**")
require.NoError(t, err)

for _, file := range files {
file := file
_, destType := path.Split(file)

t.Run("Known errors: "+destType, func(t *testing.T) {
t.Parallel()

m, err := manager.New(destType)
require.NoError(t, err)

er := &warehouse.ErrorHandler{Manager: m}

f, err := os.Open(file)
require.NoError(t, err)

defer func() { _ = f.Close() }()

uploadsErrors, err := readLines(f)
require.NoError(t, err)

for _, uploadError := range uploadsErrors {
tag := er.MatchErrorMappings(errors.New(uploadError))
require.Equal(t, tag.Name, "error_mapping")
require.NotContains(t, tag.Value, string(model.UnknownError))
}
})

t.Run("UnKnown errors: "+destType, func(t *testing.T) {
t.Parallel()

m, err := manager.New(destType)
require.NoError(t, err)

er := &warehouse.ErrorHandler{Manager: m}
tag := er.MatchErrorMappings(errors.New("unknown error"))
require.Equal(t, tag.Name, "error_mapping")
require.Contains(t, tag.Value, string(model.UnknownError))
})
}
}
6 changes: 6 additions & 0 deletions warehouse/integrations/azure-synapse/azure-synapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
tableNameLimit = 127
)

var errorsMappings []model.JobError

var rudderDataTypesMapToMssql = map[string]string{
"int": "bigint",
"float": "decimal(28,10)",
Expand Down Expand Up @@ -905,3 +907,7 @@ func (as *HandleT) LoadTestTable(_, tableName string, payloadMap map[string]inte
func (as *HandleT) SetConnectionTimeout(timeout time.Duration) {
as.ConnectTimeout = timeout
}

func (as *HandleT) ErrorMappings() []model.JobError {
return errorsMappings
}
20 changes: 20 additions & 0 deletions warehouse/integrations/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -101,6 +102,21 @@ var partitionKeyMap = map[string]string{
warehouseutils.IdentityMergeRulesTable: "merge_property_1_type, merge_property_1_value, merge_property_2_type, merge_property_2_value",
}

var errorsMappings = []model.JobError{
{
Type: model.PermissionError,
Format: regexp.MustCompile(`googleapi: Error 403: Access Denied`),
},
{
Type: model.ResourceNotFoundError,
Format: regexp.MustCompile(`googleapi: Error 404: Not found: Dataset .*, notFound`),
},
{
Type: model.InsufficientResourceError,
Format: regexp.MustCompile(`googleapi: Error 400: Job exceeded rate limits: Your project_and_region exceeded quota for concurrent queries.`),
},
}

func getTableSchema(columns map[string]string) []*bigquery.FieldSchema {
var schema []*bigquery.FieldSchema
for columnName, columnType := range columns {
Expand Down Expand Up @@ -1175,3 +1191,7 @@ func (bq *HandleT) LoadTestTable(location, tableName string, _ map[string]interf

func (*HandleT) SetConnectionTimeout(_ time.Duration) {
}

func (bq *HandleT) ErrorMappings() []model.JobError {
return errorsMappings
}
16 changes: 16 additions & 0 deletions warehouse/integrations/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"os"
"path"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -126,6 +127,17 @@ var clickhouseDataTypesMapToRudder = map[string]string{
"SimpleAggregateFunction(anyLast, Nullable(UInt8))": "boolean",
}

var errorsMappings = []model.JobError{
{
Type: model.PermissionError,
Format: regexp.MustCompile(`code: 516, message: .*: Authentication failed: password is incorrect, or there is no user with such name`),
},
{
Type: model.InsufficientResourceError,
Format: regexp.MustCompile(`code: 241, message: Memory limit .* exceeded: would use .*, maximum: .*`),
},
}

type Clickhouse struct {
DB *sql.DB
Namespace string
Expand Down Expand Up @@ -1226,3 +1238,7 @@ func (ch *Clickhouse) LoadTestTable(_, tableName string, payloadMap map[string]i
func (ch *Clickhouse) SetConnectionTimeout(timeout time.Duration) {
ch.ConnectTimeout = timeout
}

func (ch *Clickhouse) ErrorMappings() []model.JobError {
return errorsMappings
}
12 changes: 12 additions & 0 deletions warehouse/integrations/datalake/datalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package datalake
import (
"context"
"fmt"
"regexp"
"time"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"
Expand All @@ -20,6 +21,13 @@ var (
pkgLogger logger.Logger
)

var errorsMappings = []model.JobError{
{
Type: model.PermissionError,
Format: regexp.MustCompile(`AccessDeniedException: Insufficient Lake Formation permission.*: Required Create Database on Catalog`),
},
}

func Init() {
pkgLogger = logger.NewLogger().Child("warehouse").Child("datalake")
}
Expand Down Expand Up @@ -126,3 +134,7 @@ func (*HandleT) LoadTestTable(_, _ string, _ map[string]interface{}, _ string) e

func (*HandleT) SetConnectionTimeout(_ time.Duration) {
}

func (wh *HandleT) ErrorMappings() []model.JobError {
return errorsMappings
}
18 changes: 17 additions & 1 deletion warehouse/integrations/deltalake/deltalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package deltalake
import (
"context"
"fmt"
"regexp"
"strings"
"time"

"github.com/rudderlabs/rudder-server/warehouse/integrations/deltalake/client"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"

"github.com/iancoleman/strcase"
"github.com/rudderlabs/rudder-server/warehouse/integrations/deltalake/client"

"github.com/rudderlabs/rudder-server/config"
proto "github.com/rudderlabs/rudder-server/proto/databricks"
Expand Down Expand Up @@ -99,6 +100,17 @@ var primaryKeyMap = map[string]string{
warehouseutils.DiscardsTable: "row_id",
}

var errorsMappings = []model.JobError{
{
Type: model.PermissionError,
Format: regexp.MustCompile(`UnauthorizedAccessException: PERMISSION_DENIED: User does not have READ FILES on External Location`),
},
{
Type: model.PermissionError,
Format: regexp.MustCompile(`SecurityException: User does not have permission CREATE on CATALOG`),
},
}

type Deltalake struct {
Client *client.Client
Namespace string
Expand Down Expand Up @@ -1277,3 +1289,7 @@ func appendableLTSQLStatement(namespace, tableName, stagingTableName string, col
)
return sqlStatement
}

func (dl *Deltalake) ErrorMappings() []model.JobError {
return errorsMappings
}
9 changes: 5 additions & 4 deletions warehouse/integrations/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

type ManagerI interface {
type Manager interface {
Setup(warehouse warehouseutils.Warehouse, uploader warehouseutils.UploaderI) error
CrashRecover(warehouse warehouseutils.Warehouse) (err error)
FetchSchema(warehouse warehouseutils.Warehouse) (warehouseutils.SchemaT, warehouseutils.SchemaT, error)
Expand All @@ -43,6 +43,7 @@ type ManagerI interface {
Connect(warehouse warehouseutils.Warehouse) (client.Client, error)
LoadTestTable(location, stagingTableName string, payloadMap map[string]interface{}, loadFileFormat string) error
SetConnectionTimeout(timeout time.Duration)
ErrorMappings() []model.JobError
}

type WarehouseDelete interface {
Expand All @@ -51,12 +52,12 @@ type WarehouseDelete interface {
}

type WarehouseOperations interface {
ManagerI
Manager
WarehouseDelete
}

// New is a Factory function that returns a ManagerI of a given destination-type
func New(destType string) (ManagerI, error) {
// New is a Factory function that returns a Manager of a given destination-type
func New(destType string) (Manager, error) {
switch destType {
case warehouseutils.RS:
rs := redshift.NewRedshift()
Expand Down
12 changes: 12 additions & 0 deletions warehouse/integrations/mssql/mssql.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/url"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -115,6 +116,13 @@ var partitionKeyMap = map[string]string{
warehouseutils.DiscardsTable: "row_id, column_name, table_name",
}

var errorsMappings = []model.JobError{
{
Type: model.PermissionError,
Format: regexp.MustCompile(`unable to open tcp connection with host .*: dial tcp .*: i/o timeout`),
},
}

func Connect(cred CredentialsT) (*sql.DB, error) {
// Create connection string
// url := fmt.Sprintf("server=%s;user id=%s;password=%s;port=%s;database=%s;encrypt=%s;TrustServerCertificate=true", cred.host, cred.user, cred.password, cred.port, cred.dbName, cred.sslMode)
Expand Down Expand Up @@ -959,3 +967,7 @@ func (ms *HandleT) LoadTestTable(_, tableName string, payloadMap map[string]inte
func (ms *HandleT) SetConnectionTimeout(timeout time.Duration) {
ms.ConnectTimeout = timeout
}

func (ms *HandleT) ErrorMappings() []model.JobError {
return errorsMappings
}
Loading

0 comments on commit 3131b96

Please sign in to comment.