Skip to content

Commit

Permalink
add support for sync subsetting composite foreign keys (#1830)
Browse files Browse the repository at this point in the history
  • Loading branch information
alishakawaguchi committed Apr 24, 2024
1 parent 3bbbf1f commit b341380
Show file tree
Hide file tree
Showing 2 changed files with 286 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ const (
type joinType string

type sqlJoin struct {
JoinType joinType
JoinTable string
JoinColumn string
BaseTable string
BaseColumn string
JoinType joinType
JoinTable string
BaseTable string
JoinColumnsMap map[string]string // map of joinColumn to baseColumn
}

func buildSelectQuery(
Expand Down Expand Up @@ -76,11 +75,15 @@ func buildSelectJoinQuery(
if j == nil {
continue
}
joinCondition := goqu.Ex{}
for joinCol, baseCol := range j.JoinColumnsMap {
joinCondition[buildSqlIdentifier(j.JoinTable, joinCol)] = goqu.I(buildSqlIdentifier(j.BaseTable, baseCol))
}
if j.JoinType == innerJoin {
joinTable := goqu.I(j.JoinTable)
query = query.InnerJoin(
joinTable,
goqu.On(goqu.Ex{buildSqlIdentifier(j.JoinTable, j.JoinColumn): goqu.I(buildSqlIdentifier(j.BaseTable, j.BaseColumn))}),
goqu.On(joinCondition),
)
}
}
Expand Down Expand Up @@ -134,14 +137,15 @@ func buildSelectRecursiveQuery(
if j == nil {
continue
}
joinCondition := goqu.Ex{}
for joinCol, baseCol := range j.JoinColumnsMap {
joinCondition[buildSqlIdentifier(j.JoinTable, joinCol)] = goqu.I(buildSqlIdentifier(j.BaseTable, baseCol))
}
if j.JoinType == innerJoin {
table := goqu.I(j.JoinTable)
joinTable := buildSqlIdentifier(j.JoinTable, j.JoinColumn)
baseTable := buildSqlIdentifier(j.BaseTable, j.BaseColumn)

initialSelect = initialSelect.InnerJoin(
table,
goqu.On(goqu.Ex{joinTable: goqu.I(baseTable)}),
goqu.On(joinCondition),
)
}
}
Expand Down Expand Up @@ -380,13 +384,13 @@ func buildTableSubsetQueryConfig(
pathToRoot []string,
dependencyMap map[string]*dbschemas.TableConstraints,
tableWhereMap map[string]string,

) *subsetQueryConfig {
joins := []*sqlJoin{}
whereClauses := []string{}
subsetTables := []string{} // keeps track of tables that are being subset
fks := dependencyMap[table]

fksTableMap := buildFkTableMap(fks)
for _, t := range pathToRoot {
if t == table {
continue
Expand All @@ -400,43 +404,52 @@ func buildTableSubsetQueryConfig(
if len(whereClauses) > 0 {
subsetTables = append(subsetTables, t)
// add joins for parent tables up to first subsetted table
if dependencies != nil {
for _, c := range dependencies.Constraints {
if t != c.ForeignKey.Table && slices.Contains(subsetTables, c.ForeignKey.Table) {
joins = append(joins, &sqlJoin{
JoinType: innerJoin,
BaseTable: t,
BaseColumn: c.Column,
JoinTable: c.ForeignKey.Table,
JoinColumn: c.ForeignKey.Column,
})
}
depsTableMap := buildFkTableMap(dependencies)
for fkTable, colsMap := range depsTableMap {
if t != fkTable && slices.Contains(subsetTables, fkTable) {
joins = append(joins, &sqlJoin{
JoinType: innerJoin,
BaseTable: t,
JoinTable: fkTable,
JoinColumnsMap: colsMap,
})
}
}
// add join for current table
if fks != nil {
for _, c := range fks.Constraints {
if t == c.ForeignKey.Table {
joins = append(joins, &sqlJoin{
JoinType: innerJoin,
BaseTable: table,
BaseColumn: c.Column,
JoinTable: c.ForeignKey.Table,
JoinColumn: c.ForeignKey.Column,
})
}
for fkTable, colMap := range fksTableMap {
if t == fkTable {
joins = append(joins, &sqlJoin{
JoinType: innerJoin,
BaseTable: table,
JoinTable: fkTable,
JoinColumnsMap: colMap,
})
}
}
}
}
// reverse joins so they are constructed in correct order
reverseSlice(joins)

return &subsetQueryConfig{
Joins: joins,
WhereClauses: whereClauses,
}
}

func buildFkTableMap(fks *dbschemas.TableConstraints) map[string]map[string]string {
fksTableMap := map[string]map[string]string{} // map of fk table to map of fk column to base table column
if fks != nil {
for _, c := range fks.Constraints {
if _, exists := fksTableMap[c.ForeignKey.Table]; !exists {
fksTableMap[c.ForeignKey.Table] = map[string]string{}
}
fksTableMap[c.ForeignKey.Table][c.ForeignKey.Column] = c.Column
}
}
return fksTableMap
}

type bfsPaths struct {
Path []string
NodePathMap map[string][]string
Expand Down Expand Up @@ -510,18 +523,27 @@ func qualifyWhereColumnNames(driver, where, schema, table string) (string, error
}

func getPrimaryToForeignTableMapFromRunConfigs(runConfigs []*tabledependency.RunConfig) map[string][]string {
dpMap := make(map[string][]string)
dpMap := map[string]map[string]struct{}{}

for _, cfg := range runConfigs {
if _, exists := dpMap[cfg.Table]; !exists {
dpMap[cfg.Table] = []string{}
dpMap[cfg.Table] = map[string]struct{}{}
}
for _, dep := range cfg.DependsOn {
dpMap[dep.Table] = append(dpMap[dep.Table], cfg.Table)
if _, exists := dpMap[dep.Table]; !exists {
dpMap[dep.Table] = map[string]struct{}{}
}
dpMap[dep.Table][cfg.Table] = struct{}{}
}
}
tableDependencyMap := map[string][]string{}
for table, fkTables := range dpMap {
for t := range fkTables {
tableDependencyMap[table] = append(tableDependencyMap[table], t)
}
}

return dpMap
return tableDependencyMap
}

type selfReferencingCircularDependency struct {
Expand Down
Loading

0 comments on commit b341380

Please sign in to comment.