Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(warehouse): glue partitions #2899

Merged
merged 16 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,6 @@ jobs:
TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_HOST: ${{ secrets.TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_HOST }}
TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_EVENTHUB_NAME: ${{ secrets.TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_EVENTHUB_NAME }}
TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_CONNECTION_STRING: ${{ secrets.TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_CONNECTION_STRING }}
TEST_S3_DATALAKE_CREDENTIALS: ${{ secrets.DATABRICKS_INTEGRATION_TEST_CREDENTIALS }}
run: make test
- uses: codecov/codecov-action@v2
113 changes: 104 additions & 9 deletions warehouse/integrations/datalake/schema-repository/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package schemarepository

import (
"fmt"
"net/url"
"regexp"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/glue"
Expand All @@ -19,8 +22,10 @@ var (
glueParquetOutputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
)

var PartitionRegex = regexp.MustCompile(`.*/(?P<name>.*)=(?P<value>.*)`)

type GlueSchemaRepository struct {
glueClient *glue.Glue
GlueClient *glue.Glue
s3bucket string
s3prefix string
Warehouse warehouseutils.Warehouse
Expand All @@ -39,7 +44,7 @@ func NewGlueSchemaRepository(wh warehouseutils.Warehouse) (*GlueSchemaRepository
if err != nil {
return nil, err
}
gl.glueClient = glueClient
gl.GlueClient = glueClient

return &gl, nil
}
Expand All @@ -59,7 +64,7 @@ func (gl *GlueSchemaRepository) FetchSchema(warehouse warehouseutils.Warehouse)
getTablesInput.NextToken = getTablesOutput.NextToken
}

getTablesOutput, err = gl.glueClient.GetTables(getTablesInput)
getTablesOutput, err = gl.GlueClient.GetTables(getTablesInput)
if err != nil {
if _, ok := err.(*glue.EntityNotFoundException); ok {
pkgLogger.Debugf("FetchSchema: database %s not found in glue. returning empty schema", warehouse.Namespace)
Expand Down Expand Up @@ -100,7 +105,7 @@ func (gl *GlueSchemaRepository) FetchSchema(warehouse warehouseutils.Warehouse)
}

func (gl *GlueSchemaRepository) CreateSchema() (err error) {
_, err = gl.glueClient.CreateDatabase(&glue.CreateDatabaseInput{
_, err = gl.GlueClient.CreateDatabase(&glue.CreateDatabaseInput{
DatabaseInput: &glue.DatabaseInput{
Name: &gl.Namespace,
},
Expand All @@ -113,18 +118,21 @@ func (gl *GlueSchemaRepository) CreateSchema() (err error) {
}

func (gl *GlueSchemaRepository) CreateTable(tableName string, columnMap map[string]string) (err error) {
tableInput := &glue.TableInput{
Name: aws.String(tableName),
PartitionKeys: gl.partitionKeys(),
}

// create table request
input := glue.CreateTableInput{
DatabaseName: aws.String(gl.Namespace),
TableInput: &glue.TableInput{
Name: aws.String(tableName),
},
TableInput: tableInput,
}

// add storage descriptor to create table request
input.TableInput.StorageDescriptor = gl.getStorageDescriptor(tableName, columnMap)

_, err = gl.glueClient.CreateTable(&input)
_, err = gl.GlueClient.CreateTable(&input)
if err != nil {
_, ok := err.(*glue.AlreadyExistsException)
if ok {
Expand Down Expand Up @@ -161,9 +169,10 @@ func (gl *GlueSchemaRepository) AddColumns(tableName string, columnsInfo []wareh

// add storage descriptor to update table request
updateTableInput.TableInput.StorageDescriptor = gl.getStorageDescriptor(tableName, tableSchema)
updateTableInput.TableInput.PartitionKeys = gl.partitionKeys()

// update table
_, err = gl.glueClient.UpdateTable(&updateTableInput)
_, err = gl.GlueClient.UpdateTable(&updateTableInput)
return
}

Expand Down Expand Up @@ -215,3 +224,89 @@ func (gl *GlueSchemaRepository) getS3LocationForTable(tableName string) string {
filePath += warehouseutils.GetTablePathInObjectStorage(gl.Namespace, tableName)
return fmt.Sprintf("%s/%s", bucketPath, filePath)
}

// RefreshPartitions takes a tableName and a list of loadFiles and refreshes all the
// partitions that are modified by the path in those loadFiles. It returns any error
// reported by Glue
func (gl *GlueSchemaRepository) RefreshPartitions(tableName string, loadFiles []warehouseutils.LoadFileT) error {
pkgLogger.Infof("Refreshing partitions for table %s with batch of %d files", tableName, len(loadFiles))

var (
locationsToPartition = make(map[string]*glue.PartitionInput)
locationFolder string
err error
partitionInputs []*glue.PartitionInput
partitionGroups map[string]string
)

for _, loadFile := range loadFiles {
if locationFolder, err = url.QueryUnescape(warehouseutils.GetS3LocationFolder(loadFile.Location)); err != nil {
return fmt.Errorf("unesscape location folder: %w", err)
}

// Skip if we are already going to process this locationFolder
if _, ok := locationsToPartition[locationFolder]; ok {
continue
}

if partitionGroups, err = warehouseutils.CaptureRegexGroup(PartitionRegex, locationFolder); err != nil {
return fmt.Errorf("capture partition regex group: %w", err)
}

locationsToPartition[locationFolder] = &glue.PartitionInput{
StorageDescriptor: &glue.StorageDescriptor{
Location: aws.String(locationFolder),
SerdeInfo: &glue.SerDeInfo{
Name: aws.String(glueSerdeName),
SerializationLibrary: aws.String(glueSerdeSerializationLib),
},
InputFormat: aws.String(glueParquetInputFormat),
OutputFormat: aws.String(glueParquetOutputFormat),
},
Values: []*string{aws.String(partitionGroups["value"])},
}
}

// Check for existing partitions. We do not want to generate unnecessary (for already existing
// partitions) changes in Glue tables (since the number of versions of a Glue table
// is limited)
for location, partition := range locationsToPartition {
_, err = gl.GlueClient.GetPartition(&glue.GetPartitionInput{
DatabaseName: aws.String(gl.Namespace),
PartitionValues: partition.Values,
TableName: aws.String(tableName),
})

if err != nil {
if _, ok := err.(*glue.EntityNotFoundException); !ok {
return fmt.Errorf("get partition: %w", err)
}

err = nil
partitionInputs = append(partitionInputs, locationsToPartition[location])
} else {
pkgLogger.Debugf("Partition %s already exists in table %s", location, tableName)
}
}
if len(partitionInputs) == 0 {
return nil
}

pkgLogger.Infof("Refreshing %d partitions", len(partitionInputs))

if _, err = gl.GlueClient.BatchCreatePartition(&glue.BatchCreatePartitionInput{
DatabaseName: aws.String(gl.Namespace),
PartitionInputList: partitionInputs,
TableName: aws.String(tableName),
}); err != nil {
return fmt.Errorf("batch create partitions: %w", err)
}

return nil
}

func (gl *GlueSchemaRepository) partitionKeys() []*glue.Column {
windowLayout := TimeWindowFormat(&gl.Warehouse)
columnName := strings.Split(windowLayout, "=")[0]
return []*glue.Column{{Name: aws.String(columnName), Type: aws.String("date")}}
}
139 changes: 139 additions & 0 deletions warehouse/integrations/datalake/schema-repository/glue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package schemarepository

import (
"context"
"encoding/json"
"fmt"
"os"
"testing"

"github.com/aws/aws-sdk-go/service/glue"
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
"github.com/rudderlabs/rudder-server/services/filemanager"

"github.com/rudderlabs/rudder-server/utils/misc"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
"github.com/stretchr/testify/require"
)

func TestGlueSchemaRepositoryRoundTrip(t *testing.T) {
type S3Credentials struct {
AccessKeyID string
AccessKey string
Region string
Bucket string
}

var (
credentialsStr string
credentials S3Credentials
err error
credentialsEnv = "S3_DATALAKE_TEST_CREDENTIALS"
testNamespace = fmt.Sprintf("test_namespace_%s", warehouseutils.RandHex())
testTable = fmt.Sprintf("test_table_%s", warehouseutils.RandHex())
testFile = "testdata/load.parquet"
testColumns = map[string]string{
"id": "string",
"received_at": "datetime",
"test_array_bool": "array(boolean)",
"test_array_datetime": "array(datetime)",
"test_array_float": "array(float)",
"test_array_int": "array(int)",
"test_array_string": "array(string)",
"test_bool": "boolean",
"test_datetime": "datetime",
"test_float": "float",
"test_int": "int",
"test_string": "string",
}
)

if credentialsStr = os.Getenv(credentialsEnv); credentialsStr == "" {
t.Skipf("Skipping %s as %s is not set", t.Name(), credentialsEnv)
}

err = json.Unmarshal([]byte(credentialsStr), &credentials)
require.NoError(t, err)

destination := backendconfig.DestinationT{
Config: map[string]interface{}{
"region": credentials.Region,
"bucketName": credentials.Bucket,
"accessKeyID": credentials.AccessKeyID,
"accessKey": credentials.AccessKey,
},
}
warehouse := warehouseutils.Warehouse{
Destination: destination,
Namespace: testNamespace,
}

misc.Init()
warehouseutils.Init()

g, err := NewGlueSchemaRepository(warehouse)
require.NoError(t, err)

t.Logf("Creating schema %s", testNamespace)
err = g.CreateSchema()
require.NoError(t, err)

t.Log("Creating already existing schema should not fail")
err = g.CreateSchema()
require.NoError(t, err)

t.Cleanup(func() {
t.Log("Cleaning up")
_, err = g.GlueClient.DeleteDatabase(&glue.DeleteDatabaseInput{
Name: &testNamespace,
})
require.NoError(t, err)
})

t.Logf("Creating table %s", testTable)
err = g.CreateTable(testTable, testColumns)
require.NoError(t, err)

t.Log("Creating already existing table should not fail")
err = g.CreateTable(testTable, testColumns)
require.NoError(t, err)

t.Log("Adding columns to table")
err = g.AddColumns(testTable, []warehouseutils.ColumnInfo{
{Name: "alter_test_bool", Type: "boolean"},
{Name: "alter_test_string", Type: "string"},
{Name: "alter_test_int", Type: "int"},
{Name: "alter_test_float", Type: "float"},
{Name: "alter_test_datetime", Type: "datetime"},
})

t.Log("Preparing load files metadata")
f, err := os.Open(testFile)
require.NoError(t, err)

t.Cleanup(func() {
_ = f.Close()
})

fmFactory := filemanager.FileManagerFactoryT{}
fm, err := fmFactory.New(&filemanager.SettingsT{
Provider: warehouseutils.S3,
Config: map[string]any{
"bucketName": credentials.Bucket,
"accessKeyID": credentials.AccessKeyID,
"secretAccessKey": credentials.AccessKey,
"region": credentials.Region,
},
})
require.NoError(t, err)

uploadOutput, err := fm.Upload(context.TODO(), f, fmt.Sprintf("rudder-test-payload/s3-datalake/%s/dt=2006-01-02/", warehouseutils.RandHex()))
require.NoError(t, err)

err = g.RefreshPartitions(testTable, []warehouseutils.LoadFileT{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we checking the side effects of refresh partitions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we have added only the round-trip flow for Glue. Since this requires an external service call to Glue. Should we mock the API call and tests this?

{
Location: uploadOutput.Location,
},
})
require.NoError(t, err)
}
19 changes: 7 additions & 12 deletions warehouse/integrations/datalake/schema-repository/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ func (*LocalSchemaRepository) CreateSchema() (err error) {

func (ls *LocalSchemaRepository) CreateTable(tableName string, columnMap map[string]string) (err error) {
// fetch schema from local db
schema, _, err := ls.FetchSchema(ls.warehouse)
if err != nil {
return err
}
schema, _, _ := ls.FetchSchema(ls.warehouse)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since FetchSchema always returns nil, ignoring it.

Copy link
Member

@lvrach lvrach Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove the err from FetchSchema then? We can re-introduce it once error is possible.

I see the schema needs to satisfy an interface.

One possible way to work around this:

func (ls *LocalSchemaRepository) localFetchSchema() warehouseutils.SchemaT {
      return ls.uploader.GetLocalSchema()
}

func (ls *LocalSchemaRepository) FetchSchema(_ warehouseutils.Warehouse) (warehouseutils.SchemaT, warehouseutils.SchemaT, error) {
   s := ls.localFetchSchema()
   return s, s, nil
}

func (ls *LocalSchemaRepository) CreateTable(tableName string, columnMap map[string]string) (err error) {
    schema:= ls.localFetchSchema(ls.warehouse)


if _, ok := schema[tableName]; ok {
return fmt.Errorf("failed to create table: table %s already exists", tableName)
Expand All @@ -52,10 +49,7 @@ func (ls *LocalSchemaRepository) CreateTable(tableName string, columnMap map[str

func (ls *LocalSchemaRepository) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
// fetch schema from local db
schema, _, err := ls.FetchSchema(ls.warehouse)
if err != nil {
return err
}
schema, _, _ := ls.FetchSchema(ls.warehouse)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since FetchSchema always returns nil, ignoring it.


// check if table exists
if _, ok := schema[tableName]; !ok {
Expand All @@ -72,10 +66,7 @@ func (ls *LocalSchemaRepository) AddColumns(tableName string, columnsInfo []ware

func (ls *LocalSchemaRepository) AlterColumn(tableName, columnName, columnType string) (err error) {
// fetch schema from local db
schema, _, err := ls.FetchSchema(ls.warehouse)
if err != nil {
return err
}
schema, _, _ := ls.FetchSchema(ls.warehouse)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since FetchSchema always returns nil, ignoring it.


// check if table exists
if _, ok := schema[tableName]; !ok {
Expand All @@ -92,3 +83,7 @@ func (ls *LocalSchemaRepository) AlterColumn(tableName, columnName, columnType s
// update schema
return ls.uploader.UpdateLocalSchema(schema)
}

func (*LocalSchemaRepository) RefreshPartitions(_ string, _ []warehouseutils.LoadFileT) error {
return nil
}
Loading