Skip to content

Commit

Permalink
chore(warehouse): added warehouse handling for s3 with glue and other…
Browse files Browse the repository at this point in the history
… improvements (#2940)
  • Loading branch information
achettyiitr committed Mar 2, 2023
1 parent 7ca1b0b commit 3495797
Show file tree
Hide file tree
Showing 23 changed files with 1,671 additions and 687 deletions.
5 changes: 3 additions & 2 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ import (
"time"
"unicode/utf8"

"github.com/tidwall/gjson"
"golang.org/x/sync/errgroup"

"github.com/tidwall/gjson"

"github.com/rudderlabs/rudder-server/admin"
"github.com/rudderlabs/rudder-server/jobsdb/internal/lock"
"github.com/rudderlabs/rudder-server/jobsdb/prebackup"
Expand Down Expand Up @@ -3018,7 +3019,7 @@ func (jd *HandleT) recoverFromCrash(owner OwnerType, goRoutineType string) {
jd.assert(count <= 1, fmt.Sprintf("count:%d > 1", count))

if count == 0 {
// Nothing to recoer
// Nothing to recover
return
}

Expand Down
6 changes: 1 addition & 5 deletions warehouse/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,7 @@ func (*WarehouseAdmin) ConfigurationTest(s ConfigurationTestInput, reply *Config
pkgLogger.Infof(`[WH Admin]: Validating warehouse destination: %s:%s`, warehouse.Type, warehouse.Destination.ID)

destinationValidator := validations.NewDestinationValidator()
req := &validations.DestinationValidationRequest{Destination: warehouse.Destination}
res, err := destinationValidator.ValidateCredentials(req)
if err != nil {
return fmt.Errorf("unable to successfully validate destination: %s credentials, err: %v", warehouse.Destination.ID, err)
}
res := destinationValidator.Validate(&warehouse.Destination)

reply.Valid = res.Success
reply.Error = res.Error
Expand Down
20 changes: 9 additions & 11 deletions warehouse/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (
"time"

"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/internal/repo"
"github.com/rudderlabs/rudder-server/warehouse/validations"
"github.com/tidwall/gjson"

"github.com/rudderlabs/rudder-server/config"
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
Expand All @@ -29,10 +29,10 @@ import (
"github.com/rudderlabs/rudder-server/utils/timeutil"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
cpclient "github.com/rudderlabs/rudder-server/warehouse/client/controlplane"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/internal/repo"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
"github.com/tidwall/gjson"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/rudderlabs/rudder-server/warehouse/validations"
)

type UploadsReqT struct {
Expand Down Expand Up @@ -813,7 +813,7 @@ func checkMapForValidKey(configMap map[string]interface{}, key string) bool {
}

if valStr, ok := value.(string); ok {
return len(valStr) != 0
return valStr != ""
}
return false
}
Expand All @@ -827,10 +827,8 @@ func validateObjectStorage(ctx context.Context, request *ObjectStorageValidation
return fmt.Errorf("unable to create file manager: \n%s", err.Error())
}

req := validations.DestinationValidationRequest{
Destination: backendconfig.DestinationT{
DestinationDefinition: backendconfig.DestinationDefinitionT{Name: request.Type},
},
req := backendconfig.DestinationT{
DestinationDefinition: backendconfig.DestinationDefinitionT{Name: request.Type},
}

filePath, err := validations.CreateTempLoadFile(&req)
Expand Down
37 changes: 19 additions & 18 deletions warehouse/integrations/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,19 @@ import (
"strings"
"time"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"

"cloud.google.com/go/bigquery"
"golang.org/x/exp/slices"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
"google.golang.org/api/option"

"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/utils/googleutils"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/warehouse/client"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
"golang.org/x/exp/slices"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)

var (
Expand Down Expand Up @@ -131,23 +130,25 @@ func (bq *HandleT) DeleteTable(tableName string) (err error) {
return
}

func (bq *HandleT) CreateTable(tableName string, columnMap map[string]string) (err error) {
func (bq *HandleT) CreateTable(tableName string, columnMap map[string]string) error {
pkgLogger.Infof("BQ: Creating table: %s in bigquery dataset: %s in project: %s", tableName, bq.namespace, bq.projectID)
sampleSchema := getTableSchema(columnMap)
metaData := &bigquery.TableMetadata{
Schema: sampleSchema,
TimePartitioning: &bigquery.TimePartitioning{},
}
tableRef := bq.db.Dataset(bq.namespace).Table(tableName)
err = tableRef.Create(bq.backgroundContext, metaData)
err := tableRef.Create(bq.backgroundContext, metaData)
if !checkAndIgnoreAlreadyExistError(err) {
return
return fmt.Errorf("create table: %w", err)
}

if !dedupEnabled() {
err = bq.createTableView(tableName, columnMap)
if err = bq.createTableView(tableName, columnMap); err != nil {
return fmt.Errorf("create view: %w", err)
}
}
return
return nil
}

func (bq *HandleT) DropTable(tableName string) (err error) {
Expand Down Expand Up @@ -529,7 +530,7 @@ func (bq *HandleT) LoadUserTables() (errorMap map[string]error) {
viewExists, _ := bq.tableExists(warehouseutils.UsersView)
if !viewExists {
pkgLogger.Infof("BQ: Creating view: %s in bigquery dataset: %s in project: %s", warehouseutils.UsersView, bq.namespace, bq.projectID)
bq.createTableView(warehouseutils.UsersTable, userColMap)
_ = bq.createTableView(warehouseutils.UsersTable, userColMap)
}

bqIdentifiesTable := bqTable(warehouseutils.IdentifiesTable)
Expand Down Expand Up @@ -721,7 +722,7 @@ func (bq *HandleT) CrashRecover(warehouse warehouseutils.Warehouse) (err error)
if err != nil {
return
}
defer bq.db.Close()
defer func() { _ = bq.db.Close() }()
bq.dropDanglingStagingTables()
return
}
Expand Down Expand Up @@ -786,7 +787,7 @@ func (bq *HandleT) IsEmpty(warehouse warehouseutils.Warehouse) (empty bool, err
if err != nil {
return
}
defer bq.db.Close()
defer func() { _ = bq.db.Close() }()

tables := []string{"tracks", "pages", "screens", "identifies", "aliases"}
for _, tableName := range tables {
Expand Down Expand Up @@ -833,7 +834,7 @@ func (bq *HandleT) TestConnection(warehouse warehouseutils.Warehouse) (err error
if err != nil {
return
}
defer bq.db.Close()
defer func() { _ = bq.db.Close() }()
return
}

Expand Down Expand Up @@ -897,7 +898,7 @@ func (bq *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema, unre
if err != nil {
return
}
defer dbClient.Close()
defer func() { _ = dbClient.Close() }()

schema = make(warehouseutils.SchemaT)
unrecognizedSchema = make(warehouseutils.SchemaT)
Expand Down Expand Up @@ -971,7 +972,7 @@ func (bq *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema, unre

func (bq *HandleT) Cleanup() {
if bq.db != nil {
bq.db.Close()
_ = bq.db.Close()
}
}

Expand Down Expand Up @@ -1108,7 +1109,7 @@ func (bq *HandleT) DownloadIdentityRules(gzWriter *misc.GZipWriter) (err error)
if err != nil {
break
}
gzWriter.WriteGZ(string(bytes) + "\n")
_ = gzWriter.WriteGZ(string(bytes) + "\n")
}

offset += batchSize
Expand Down
36 changes: 27 additions & 9 deletions warehouse/integrations/deltalake/deltalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,17 @@ import (
"strings"
"time"

"github.com/rudderlabs/rudder-server/services/stats"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"

"github.com/rudderlabs/rudder-server/warehouse/integrations/deltalake/client"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/rudderlabs/rudder-server/config"
proto "github.com/rudderlabs/rudder-server/proto/databricks"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
warehouseclient "github.com/rudderlabs/rudder-server/warehouse/client"
"github.com/rudderlabs/rudder-server/warehouse/integrations/deltalake/client"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

Expand All @@ -44,6 +41,7 @@ const (
// Reference: https://docs.oracle.com/cd/E17952_01/connector-odbc-en/connector-odbc-reference-errorcodes.html
const (
tableOrViewNotFound = "42S02"
columnNotFound = "42000"
databaseNotFound = "42000"
partitionNotFound = "42000"
)
Expand Down Expand Up @@ -811,10 +809,11 @@ func (dl *Deltalake) DropTable(tableName string) (err error) {
return
}

func (dl *Deltalake) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
func (dl *Deltalake) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) error {
var (
query string
queryBuilder strings.Builder
err error
)

queryBuilder.WriteString(fmt.Sprintf(`
Expand All @@ -833,8 +832,27 @@ func (dl *Deltalake) AddColumns(tableName string, columnsInfo []warehouseutils.C
query += ");"

dl.Logger.Infof("DL: Adding columns for destinationID: %s, tableName: %s with query: %v", dl.Warehouse.Destination.ID, tableName, query)
err = dl.ExecuteSQLClient(dl.Client, query)
return
executeResponse, err := dl.Client.Client.Execute(dl.Client.Context, &proto.ExecuteRequest{
Config: dl.Client.CredConfig,
Identifier: dl.Client.CredIdentifier,
SqlStatement: query,
})
if err != nil {
return fmt.Errorf("add columns: %w", err)
}

// Handle error in case of single column
if len(columnsInfo) == 1 {
if !checkAndIgnoreAlreadyExistError(executeResponse.GetErrorCode(), columnNotFound) {
return fmt.Errorf("add columns: %s", executeResponse.GetErrorMessage())
}
return nil
}

if executeResponse.GetErrorCode() != "" {
return fmt.Errorf("add columns: %s", executeResponse.GetErrorMessage())
}
return nil
}

// CreateSchema checks if schema exists or not. If it does not exist, it creates the schema.
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/deltalake/deltalake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func TestDeltalake_AddColumns(t *testing.T) {
ErrorCode: "42xxx",
ErrorMessage: "permission error",
},
wantError: errors.New("executing with response: permission error"),
wantError: errors.New("add columns: permission error"),
},
}

Expand Down
42 changes: 36 additions & 6 deletions warehouse/integrations/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,26 +252,56 @@ func (rs *Redshift) schemaExists(_ string) (exists bool, err error) {

func (rs *Redshift) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) error {
for _, columnInfo := range columnsInfo {
columnType := getRSDataType(columnInfo.Type)
query := fmt.Sprintf(`
ALTER TABLE
%q.%q
ADD
COLUMN %q %s;
ALTER TABLE
%q.%q
ADD
COLUMN %q %s;
`,
rs.Namespace,
tableName,
columnInfo.Name,
getRSDataType(columnInfo.Type),
columnType,
)
rs.Logger.Infof("AZ: Adding column for destinationID: %s, tableName: %s with query: %v", rs.Warehouse.Destination.ID, tableName, query)
rs.Logger.Infof("RS: Adding column for destinationID: %s, tableName: %s with query: %v", rs.Warehouse.Destination.ID, tableName, query)

if _, err := rs.DB.Exec(query); err != nil {
if CheckAndIgnoreColumnAlreadyExistError(err) {
rs.Logger.Infow("column already exists",
logfield.SourceID, rs.Warehouse.Source.ID,
logfield.SourceType, rs.Warehouse.Source.SourceDefinition.Name,
logfield.DestinationID, rs.Warehouse.Destination.ID,
logfield.DestinationType, rs.Warehouse.Destination.DestinationDefinition.Name,
logfield.WorkspaceID, rs.Warehouse.WorkspaceID,
logfield.Schema, rs.Namespace,
logfield.TableName, tableName,
logfield.ColumnName, columnInfo.Name,
logfield.ColumnType, columnType,
logfield.Error, err.Error(),
logfield.Query, query,
)
continue
}

return err
}
}
return nil
}

func CheckAndIgnoreColumnAlreadyExistError(err error) bool {
if err != nil {
if e, ok := err.(*pq.Error); ok {
if e.Code == "42701" {
return true
}
}
return false
}
return true
}

func (rs *Redshift) DeleteBy(tableNames []string, params warehouseutils.DeleteByParams) (err error) {
rs.Logger.Infof("RS: Cleaning up the following tables in redshift for RS:%s : %+v", tableNames, params)
rs.Logger.Infof("RS: Flag for enableDeleteByJobs is %t", rs.EnableDeleteByJobs)
Expand Down
Loading

0 comments on commit 3495797

Please sign in to comment.