Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(warehouse): glue partitions #2899

Merged
merged 16 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,6 @@ jobs:
TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_HOST: ${{ secrets.TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_HOST }}
TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_EVENTHUB_NAME: ${{ secrets.TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_EVENTHUB_NAME }}
TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_CONNECTION_STRING: ${{ secrets.TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_CONNECTION_STRING }}
TEST_S3_DATALAKE_CREDENTIALS: ${{ secrets.TEST_S3_DATALAKE_CREDENTIALS }}
run: make test
- uses: codecov/codecov-action@v2
163 changes: 150 additions & 13 deletions warehouse/integrations/datalake/schema-repository/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package schemarepository

import (
"fmt"
"net/url"
"regexp"

"github.com/rudderlabs/rudder-server/utils/logger"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/glue"
Expand All @@ -19,12 +23,18 @@ var (
glueParquetOutputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
)

var (
PartitionFolderRegex = regexp.MustCompile(`.*/(?P<name>.*)=(?P<value>.*)$`)
PartitionWindowRegex = regexp.MustCompile(`^(?P<name>.*)=(?P<value>.*)$`)
)

type GlueSchemaRepository struct {
glueClient *glue.Glue
GlueClient *glue.Glue
s3bucket string
s3prefix string
Warehouse warehouseutils.Warehouse
Namespace string
Logger logger.Logger
}

func NewGlueSchemaRepository(wh warehouseutils.Warehouse) (*GlueSchemaRepository, error) {
Expand All @@ -33,13 +43,14 @@ func NewGlueSchemaRepository(wh warehouseutils.Warehouse) (*GlueSchemaRepository
s3prefix: warehouseutils.GetConfigValue(warehouseutils.AWSS3Prefix, wh),
Warehouse: wh,
Namespace: wh.Namespace,
Logger: pkgLogger,
}

glueClient, err := getGlueClient(wh)
if err != nil {
return nil, err
}
gl.glueClient = glueClient
gl.GlueClient = glueClient

return &gl, nil
}
Expand All @@ -59,10 +70,10 @@ func (gl *GlueSchemaRepository) FetchSchema(warehouse warehouseutils.Warehouse)
getTablesInput.NextToken = getTablesOutput.NextToken
}

getTablesOutput, err = gl.glueClient.GetTables(getTablesInput)
getTablesOutput, err = gl.GlueClient.GetTables(getTablesInput)
if err != nil {
if _, ok := err.(*glue.EntityNotFoundException); ok {
pkgLogger.Debugf("FetchSchema: database %s not found in glue. returning empty schema", warehouse.Namespace)
gl.Logger.Debugf("FetchSchema: database %s not found in glue. returning empty schema", warehouse.Namespace)
err = nil
}
return schema, unrecognizedSchema, err
Expand Down Expand Up @@ -100,31 +111,39 @@ func (gl *GlueSchemaRepository) FetchSchema(warehouse warehouseutils.Warehouse)
}

func (gl *GlueSchemaRepository) CreateSchema() (err error) {
_, err = gl.glueClient.CreateDatabase(&glue.CreateDatabaseInput{
_, err = gl.GlueClient.CreateDatabase(&glue.CreateDatabaseInput{
DatabaseInput: &glue.DatabaseInput{
Name: &gl.Namespace,
},
})
if _, ok := err.(*glue.AlreadyExistsException); ok {
pkgLogger.Infof("Skipping database creation : database %s already exists", gl.Namespace)
gl.Logger.Infof("Skipping database creation : database %s already exists", gl.Namespace)
err = nil
}
return
}

func (gl *GlueSchemaRepository) CreateTable(tableName string, columnMap map[string]string) (err error) {
partitionKeys, err := gl.partitionColumns()
if err != nil {
return fmt.Errorf("partition keys: %w", err)
}

tableInput := &glue.TableInput{
Name: aws.String(tableName),
PartitionKeys: partitionKeys,
}

// create table request
input := glue.CreateTableInput{
DatabaseName: aws.String(gl.Namespace),
TableInput: &glue.TableInput{
Name: aws.String(tableName),
},
TableInput: tableInput,
}

// add storage descriptor to create table request
input.TableInput.StorageDescriptor = gl.getStorageDescriptor(tableName, columnMap)

_, err = gl.glueClient.CreateTable(&input)
_, err = gl.GlueClient.CreateTable(&input)
if err != nil {
_, ok := err.(*glue.AlreadyExistsException)
if ok {
Expand All @@ -134,7 +153,7 @@ func (gl *GlueSchemaRepository) CreateTable(tableName string, columnMap map[stri
return
}

func (gl *GlueSchemaRepository) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
func (gl *GlueSchemaRepository) updateTable(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
updateTableInput := glue.UpdateTableInput{
DatabaseName: aws.String(gl.Namespace),
TableInput: &glue.TableInput{
Expand All @@ -159,16 +178,26 @@ func (gl *GlueSchemaRepository) AddColumns(tableName string, columnsInfo []wareh
tableSchema[columnInfo.Name] = columnInfo.Type
}

partitionKeys, err := gl.partitionColumns()
if err != nil {
return fmt.Errorf("partition keys: %w", err)
}

// add storage descriptor to update table request
updateTableInput.TableInput.StorageDescriptor = gl.getStorageDescriptor(tableName, tableSchema)
updateTableInput.TableInput.PartitionKeys = partitionKeys

// update table
_, err = gl.glueClient.UpdateTable(&updateTableInput)
_, err = gl.GlueClient.UpdateTable(&updateTableInput)
return
}

func (gl *GlueSchemaRepository) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
return gl.updateTable(tableName, columnsInfo)
}

func (gl *GlueSchemaRepository) AlterColumn(tableName, columnName, columnType string) (err error) {
return gl.AddColumns(tableName, []warehouseutils.ColumnInfo{{Name: columnName, Type: columnType}})
return gl.updateTable(tableName, []warehouseutils.ColumnInfo{{Name: columnName, Type: columnType}})
}

func getGlueClient(wh warehouseutils.Warehouse) (*glue.Glue, error) {
Expand Down Expand Up @@ -215,3 +244,111 @@ func (gl *GlueSchemaRepository) getS3LocationForTable(tableName string) string {
filePath += warehouseutils.GetTablePathInObjectStorage(gl.Namespace, tableName)
return fmt.Sprintf("%s/%s", bucketPath, filePath)
}

// RefreshPartitions takes a tableName and a list of loadFiles and refreshes all the
// partitions that are modified by the path in those loadFiles. It returns any error
// reported by Glue
func (gl *GlueSchemaRepository) RefreshPartitions(tableName string, loadFiles []warehouseutils.LoadFileT) error {
gl.Logger.Infof("Refreshing partitions for table: %s", tableName)

// Skip if time window layout is not defined
if layout := warehouseutils.GetConfigValue("timeWindowLayout", gl.Warehouse); layout == "" {
return nil
}

var (
locationsToPartition = make(map[string]*glue.PartitionInput)
locationFolder string
err error
partitionInputs []*glue.PartitionInput
partitionGroups map[string]string
)

for _, loadFile := range loadFiles {
if locationFolder, err = url.QueryUnescape(warehouseutils.GetS3LocationFolder(loadFile.Location)); err != nil {
return fmt.Errorf("unesscape location folder: %w", err)
}

// Skip if we are already going to process this locationFolder
if _, ok := locationsToPartition[locationFolder]; ok {
continue
}

if partitionGroups, err = warehouseutils.CaptureRegexGroup(PartitionFolderRegex, locationFolder); err != nil {
gl.Logger.Warnf("Skipping refresh partitions for table %s with location %s: %v", tableName, locationFolder, err)
continue
}

locationsToPartition[locationFolder] = &glue.PartitionInput{
StorageDescriptor: &glue.StorageDescriptor{
Location: aws.String(locationFolder),
SerdeInfo: &glue.SerDeInfo{
Name: aws.String(glueSerdeName),
SerializationLibrary: aws.String(glueSerdeSerializationLib),
},
InputFormat: aws.String(glueParquetInputFormat),
OutputFormat: aws.String(glueParquetOutputFormat),
},
Values: []*string{aws.String(partitionGroups["value"])},
}
}

// Check for existing partitions. We do not want to generate unnecessary (for already existing
// partitions) changes in Glue tables (since the number of versions of a Glue table
// is limited)
for location, partition := range locationsToPartition {
_, err := gl.GlueClient.GetPartition(&glue.GetPartitionInput{
DatabaseName: aws.String(gl.Namespace),
PartitionValues: partition.Values,
TableName: aws.String(tableName),
})

if err != nil {
if _, ok := err.(*glue.EntityNotFoundException); !ok {
return fmt.Errorf("get partition: %w", err)
}

partitionInputs = append(partitionInputs, locationsToPartition[location])
} else {
gl.Logger.Debugf("Partition %s already exists in table %s", location, tableName)
}
}
if len(partitionInputs) == 0 {
return nil
}

// Updating table partitions with empty columns to create partition keys if not created
if err = gl.updateTable(tableName, []warehouseutils.ColumnInfo{}); err != nil {
return fmt.Errorf("update table: %w", err)
}

gl.Logger.Debugf("Refreshing %d partitions", len(partitionInputs))

if _, err = gl.GlueClient.BatchCreatePartition(&glue.BatchCreatePartitionInput{
DatabaseName: aws.String(gl.Namespace),
PartitionInputList: partitionInputs,
TableName: aws.String(tableName),
}); err != nil {
return fmt.Errorf("batch create partitions: %w", err)
}

return nil
}

func (gl *GlueSchemaRepository) partitionColumns() (columns []*glue.Column, err error) {
var (
layout string
partitionGroups map[string]string
)

if layout = warehouseutils.GetConfigValue("timeWindowLayout", gl.Warehouse); layout == "" {
return
}

if partitionGroups, err = warehouseutils.CaptureRegexGroup(PartitionWindowRegex, layout); err != nil {
return columns, fmt.Errorf("capture partition window regex: %w", err)
}

columns = append(columns, &glue.Column{Name: aws.String(partitionGroups["name"]), Type: aws.String("date")})
return
}
Loading