Skip to content

Commit

Permalink
feat(warehouse): glue partitions (#2899)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Feb 1, 2023
1 parent c58100c commit 9a928d4
Show file tree
Hide file tree
Showing 14 changed files with 877 additions and 118 deletions.
1 change: 1 addition & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,6 @@ jobs:
TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_HOST: ${{ secrets.TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_HOST }}
TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_EVENTHUB_NAME: ${{ secrets.TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_EVENTHUB_NAME }}
TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_CONNECTION_STRING: ${{ secrets.TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_CONNECTION_STRING }}
TEST_S3_DATALAKE_CREDENTIALS: ${{ secrets.TEST_S3_DATALAKE_CREDENTIALS }}
run: make test
- uses: codecov/codecov-action@v2
163 changes: 150 additions & 13 deletions warehouse/integrations/datalake/schema-repository/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package schemarepository

import (
"fmt"
"net/url"
"regexp"

"github.com/rudderlabs/rudder-server/utils/logger"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/glue"
Expand All @@ -19,12 +23,18 @@ var (
glueParquetOutputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
)

var (
PartitionFolderRegex = regexp.MustCompile(`.*/(?P<name>.*)=(?P<value>.*)$`)
PartitionWindowRegex = regexp.MustCompile(`^(?P<name>.*)=(?P<value>.*)$`)
)

type GlueSchemaRepository struct {
glueClient *glue.Glue
GlueClient *glue.Glue
s3bucket string
s3prefix string
Warehouse warehouseutils.Warehouse
Namespace string
Logger logger.Logger
}

func NewGlueSchemaRepository(wh warehouseutils.Warehouse) (*GlueSchemaRepository, error) {
Expand All @@ -33,13 +43,14 @@ func NewGlueSchemaRepository(wh warehouseutils.Warehouse) (*GlueSchemaRepository
s3prefix: warehouseutils.GetConfigValue(warehouseutils.AWSS3Prefix, wh),
Warehouse: wh,
Namespace: wh.Namespace,
Logger: pkgLogger,
}

glueClient, err := getGlueClient(wh)
if err != nil {
return nil, err
}
gl.glueClient = glueClient
gl.GlueClient = glueClient

return &gl, nil
}
Expand All @@ -59,10 +70,10 @@ func (gl *GlueSchemaRepository) FetchSchema(warehouse warehouseutils.Warehouse)
getTablesInput.NextToken = getTablesOutput.NextToken
}

getTablesOutput, err = gl.glueClient.GetTables(getTablesInput)
getTablesOutput, err = gl.GlueClient.GetTables(getTablesInput)
if err != nil {
if _, ok := err.(*glue.EntityNotFoundException); ok {
pkgLogger.Debugf("FetchSchema: database %s not found in glue. returning empty schema", warehouse.Namespace)
gl.Logger.Debugf("FetchSchema: database %s not found in glue. returning empty schema", warehouse.Namespace)
err = nil
}
return schema, unrecognizedSchema, err
Expand Down Expand Up @@ -100,31 +111,39 @@ func (gl *GlueSchemaRepository) FetchSchema(warehouse warehouseutils.Warehouse)
}

func (gl *GlueSchemaRepository) CreateSchema() (err error) {
_, err = gl.glueClient.CreateDatabase(&glue.CreateDatabaseInput{
_, err = gl.GlueClient.CreateDatabase(&glue.CreateDatabaseInput{
DatabaseInput: &glue.DatabaseInput{
Name: &gl.Namespace,
},
})
if _, ok := err.(*glue.AlreadyExistsException); ok {
pkgLogger.Infof("Skipping database creation : database %s already exists", gl.Namespace)
gl.Logger.Infof("Skipping database creation : database %s already exists", gl.Namespace)
err = nil
}
return
}

func (gl *GlueSchemaRepository) CreateTable(tableName string, columnMap map[string]string) (err error) {
partitionKeys, err := gl.partitionColumns()
if err != nil {
return fmt.Errorf("partition keys: %w", err)
}

tableInput := &glue.TableInput{
Name: aws.String(tableName),
PartitionKeys: partitionKeys,
}

// create table request
input := glue.CreateTableInput{
DatabaseName: aws.String(gl.Namespace),
TableInput: &glue.TableInput{
Name: aws.String(tableName),
},
TableInput: tableInput,
}

// add storage descriptor to create table request
input.TableInput.StorageDescriptor = gl.getStorageDescriptor(tableName, columnMap)

_, err = gl.glueClient.CreateTable(&input)
_, err = gl.GlueClient.CreateTable(&input)
if err != nil {
_, ok := err.(*glue.AlreadyExistsException)
if ok {
Expand All @@ -134,7 +153,7 @@ func (gl *GlueSchemaRepository) CreateTable(tableName string, columnMap map[stri
return
}

func (gl *GlueSchemaRepository) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
func (gl *GlueSchemaRepository) updateTable(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
updateTableInput := glue.UpdateTableInput{
DatabaseName: aws.String(gl.Namespace),
TableInput: &glue.TableInput{
Expand All @@ -159,16 +178,26 @@ func (gl *GlueSchemaRepository) AddColumns(tableName string, columnsInfo []wareh
tableSchema[columnInfo.Name] = columnInfo.Type
}

partitionKeys, err := gl.partitionColumns()
if err != nil {
return fmt.Errorf("partition keys: %w", err)
}

// add storage descriptor to update table request
updateTableInput.TableInput.StorageDescriptor = gl.getStorageDescriptor(tableName, tableSchema)
updateTableInput.TableInput.PartitionKeys = partitionKeys

// update table
_, err = gl.glueClient.UpdateTable(&updateTableInput)
_, err = gl.GlueClient.UpdateTable(&updateTableInput)
return
}

func (gl *GlueSchemaRepository) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
return gl.updateTable(tableName, columnsInfo)
}

func (gl *GlueSchemaRepository) AlterColumn(tableName, columnName, columnType string) (err error) {
return gl.AddColumns(tableName, []warehouseutils.ColumnInfo{{Name: columnName, Type: columnType}})
return gl.updateTable(tableName, []warehouseutils.ColumnInfo{{Name: columnName, Type: columnType}})
}

func getGlueClient(wh warehouseutils.Warehouse) (*glue.Glue, error) {
Expand Down Expand Up @@ -215,3 +244,111 @@ func (gl *GlueSchemaRepository) getS3LocationForTable(tableName string) string {
filePath += warehouseutils.GetTablePathInObjectStorage(gl.Namespace, tableName)
return fmt.Sprintf("%s/%s", bucketPath, filePath)
}

// RefreshPartitions takes a tableName and a list of loadFiles and refreshes all the
// partitions that are modified by the path in those loadFiles. It returns any error
// reported by Glue
func (gl *GlueSchemaRepository) RefreshPartitions(tableName string, loadFiles []warehouseutils.LoadFileT) error {
gl.Logger.Infof("Refreshing partitions for table: %s", tableName)

// Skip if time window layout is not defined
if layout := warehouseutils.GetConfigValue("timeWindowLayout", gl.Warehouse); layout == "" {
return nil
}

var (
locationsToPartition = make(map[string]*glue.PartitionInput)
locationFolder string
err error
partitionInputs []*glue.PartitionInput
partitionGroups map[string]string
)

for _, loadFile := range loadFiles {
if locationFolder, err = url.QueryUnescape(warehouseutils.GetS3LocationFolder(loadFile.Location)); err != nil {
return fmt.Errorf("unesscape location folder: %w", err)
}

// Skip if we are already going to process this locationFolder
if _, ok := locationsToPartition[locationFolder]; ok {
continue
}

if partitionGroups, err = warehouseutils.CaptureRegexGroup(PartitionFolderRegex, locationFolder); err != nil {
gl.Logger.Warnf("Skipping refresh partitions for table %s with location %s: %v", tableName, locationFolder, err)
continue
}

locationsToPartition[locationFolder] = &glue.PartitionInput{
StorageDescriptor: &glue.StorageDescriptor{
Location: aws.String(locationFolder),
SerdeInfo: &glue.SerDeInfo{
Name: aws.String(glueSerdeName),
SerializationLibrary: aws.String(glueSerdeSerializationLib),
},
InputFormat: aws.String(glueParquetInputFormat),
OutputFormat: aws.String(glueParquetOutputFormat),
},
Values: []*string{aws.String(partitionGroups["value"])},
}
}

// Check for existing partitions. We do not want to generate unnecessary (for already existing
// partitions) changes in Glue tables (since the number of versions of a Glue table
// is limited)
for location, partition := range locationsToPartition {
_, err := gl.GlueClient.GetPartition(&glue.GetPartitionInput{
DatabaseName: aws.String(gl.Namespace),
PartitionValues: partition.Values,
TableName: aws.String(tableName),
})

if err != nil {
if _, ok := err.(*glue.EntityNotFoundException); !ok {
return fmt.Errorf("get partition: %w", err)
}

partitionInputs = append(partitionInputs, locationsToPartition[location])
} else {
gl.Logger.Debugf("Partition %s already exists in table %s", location, tableName)
}
}
if len(partitionInputs) == 0 {
return nil
}

// Updating table partitions with empty columns to create partition keys if not created
if err = gl.updateTable(tableName, []warehouseutils.ColumnInfo{}); err != nil {
return fmt.Errorf("update table: %w", err)
}

gl.Logger.Debugf("Refreshing %d partitions", len(partitionInputs))

if _, err = gl.GlueClient.BatchCreatePartition(&glue.BatchCreatePartitionInput{
DatabaseName: aws.String(gl.Namespace),
PartitionInputList: partitionInputs,
TableName: aws.String(tableName),
}); err != nil {
return fmt.Errorf("batch create partitions: %w", err)
}

return nil
}

func (gl *GlueSchemaRepository) partitionColumns() (columns []*glue.Column, err error) {
var (
layout string
partitionGroups map[string]string
)

if layout = warehouseutils.GetConfigValue("timeWindowLayout", gl.Warehouse); layout == "" {
return
}

if partitionGroups, err = warehouseutils.CaptureRegexGroup(PartitionWindowRegex, layout); err != nil {
return columns, fmt.Errorf("capture partition window regex: %w", err)
}

columns = append(columns, &glue.Column{Name: aws.String(partitionGroups["name"]), Type: aws.String("date")})
return
}
Loading

0 comments on commit 9a928d4

Please sign in to comment.