Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Latch impl #1000

Merged
merged 4 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 37 additions & 70 deletions atrium/trcflow/core/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"syscall"
"time"

"github.com/glycerine/bchan"
tcopts "github.com/trimble-oss/tierceron/buildopts/tcopts"

"github.com/trimble-oss/tierceron/atrium/buildopts/flowcoreopts"
Expand Down Expand Up @@ -47,7 +48,6 @@ var AskFlumeFlow FlowNameType = "AskFlumeFlow"
var signalChannel chan os.Signal
var sourceDatabaseConnectionsMap map[string]map[string]interface{}
var tfmContextMap = make(map[string]*TrcFlowMachineContext, 5)
var cleanerInit = make(map[FlowNameType]bool, 1)

const (
TableSyncFlow FlowType = iota
Expand Down Expand Up @@ -86,9 +86,7 @@ func (fnt FlowNameType) ServiceName() string {
func TriggerChangeChannel(table string) {
for _, tfmContext := range tfmContextMap {
if notificationFlowChannel, notificationChannelOk := tfmContext.ChannelMap[FlowNameType(table)]; notificationChannelOk {
go func(nfc chan bool) {
nfc <- true
}(notificationFlowChannel)
notificationFlowChannel.Bcast(true)
return
}
}
Expand All @@ -111,19 +109,13 @@ func TriggerAllChangeChannel(table string, changeIds map[string]string) {
}
}
if notificationFlowChannel, notificationChannelOk := tfmContext.ChannelMap[FlowNameType(table)]; notificationChannelOk {
go func(nfc chan bool) {
nfc <- true
}(notificationFlowChannel)
return
notificationFlowChannel.Bcast(true)
continue
}
}

for _, notificationFlowChannel := range tfmContext.ChannelMap {
if len(notificationFlowChannel) < 10 {
go func(nfc chan bool) {
nfc <- true
}(notificationFlowChannel)
}
notificationFlowChannel.Bcast(true)
}
}
}
Expand All @@ -143,7 +135,7 @@ type TrcFlowMachineContext struct {
ExtensionAuthData map[string]interface{}
ExtensionAuthDataReloader map[string]interface{}
GetAdditionalFlowsByState func(teststate string) []FlowNameType
ChannelMap map[FlowNameType]chan bool
ChannelMap map[FlowNameType]*bchan.Bchan
FlowMap map[FlowNameType]*TrcFlowContext // Map of all running flows for engine
PermissionChan chan PermissionUpdate // This channel is used to alert for dynamic permissions when tables are loaded
}
Expand Down Expand Up @@ -240,18 +232,18 @@ func (tfmContext *TrcFlowMachineContext) Init(
tfmContext.GetTableModifierLock().Unlock()
eUtils.LogInfo(tfmContext.Config, "Tables creation completed.")

tfmContext.ChannelMap = make(map[FlowNameType]chan bool)
tfmContext.ChannelMap = make(map[FlowNameType]*bchan.Bchan)

for _, table := range tableNames {
tfmContext.ChannelMap[FlowNameType(table)] = make(chan bool, 10)
tfmContext.ChannelMap[FlowNameType(table)] = bchan.New(1)
}

for _, f := range additionalFlowNames {
tfmContext.ChannelMap[f] = make(chan bool, 5)
tfmContext.ChannelMap[f] = bchan.New(1)
}

for _, f := range testFlowNames {
tfmContext.ChannelMap[f] = make(chan bool, 5)
tfmContext.ChannelMap[f] = bchan.New(1)
}

tfmContext.PermissionChan = make(chan PermissionUpdate, 10)
Expand Down Expand Up @@ -434,38 +426,22 @@ func (tfmContext *TrcFlowMachineContext) seedVaultCycle(tfContext *TrcFlowContex

mysqlPushEnabled := sqlState
flowChangedChannel := tfmContext.ChannelMap[tfContext.Flow]
go func(fcc chan bool) {
fcc <- true
}(flowChangedChannel)

if init, ok := cleanerInit[tfContext.Flow]; !ok || !init { //Kicks off cleaner goroutine if not already active.
cleanerInit[tfContext.Flow] = true
go func(nFC chan bool) {
for {
time.Sleep(time.Second * 1)
if len(nFC) >= 8 {
for i := 0; i < 4; i++ {
<-nFC
}
nFC <- true
}
}
}(flowChangedChannel)
}
// flowChangedChannel.Bcast(true)

for {
select {
case <-signalChannel:
eUtils.LogErrorMessage(tfmContext.Config, "Receiving shutdown presumably from vault.", true)
os.Exit(0)
case <-flowChangedChannel:
case <-flowChangedChannel.Ch:
tfmContext.vaultPersistPushRemoteChanges(
tfContext,
identityColumnName,
indexColumnNames,
mysqlPushEnabled,
getIndexedPathExt,
flowPushRemote)
flowChangedChannel.Clear()
case <-tfContext.Context.Done():
tfmContext.Log(fmt.Sprintf("Flow shutdown: %s", tfContext.Flow), nil)
tfmContext.vaultPersistPushRemoteChanges(
Expand Down Expand Up @@ -605,11 +581,14 @@ func (tfmContext *TrcFlowMachineContext) SyncTableCycle(tfContext *TrcFlowContex
// tfContext.DataFlowStatistic["flume"] = "" //Used to be argosid
// tfContext.DataFlowStatistic["Flows"] = "" //Used to be flowGroup
// tfContext.DataFlowStatistic["mode"] = ""
df := InitDataFlow(nil, tfContext.Flow.TableName(), true) //Initializing dataflow
if tfContext.FlowState.FlowAlias != "" {
df.UpdateDataFlowStatistic("Flows", tfContext.FlowState.FlowAlias, "Loading", "1", 1, tfmContext.Log)
} else {
df.UpdateDataFlowStatistic("Flows", tfContext.Flow.TableName(), "Loading", "1", 1, tfmContext.Log)
var df *TTDINode = nil
if tfContext.Init {
df = InitDataFlow(nil, tfContext.Flow.TableName(), true) //Initializing dataflow
if tfContext.FlowState.FlowAlias != "" {
df.UpdateDataFlowStatistic("Flows", tfContext.FlowState.FlowAlias, "Loading", "1", 1, tfmContext.Log)
} else {
df.UpdateDataFlowStatistic("Flows", tfContext.Flow.TableName(), "Loading", "1", 1, tfmContext.Log)
}
}
//Copy ReportStatistics from process_registerenterprise.go if !buildopts.BuildOptions.IsTenantAutoRegReady(tenant)
// Do we need to account for that here?
Expand All @@ -634,16 +613,20 @@ func (tfmContext *TrcFlowMachineContext) SyncTableCycle(tfContext *TrcFlowContex
seedInitComplete <- true
}
<-seedInitComplete
if tfContext.FlowState.FlowAlias != "" {
df.UpdateDataFlowStatistic("Flows", tfContext.FlowState.FlowAlias, "Load complete", "2", 1, tfmContext.Log)
} else {
df.UpdateDataFlowStatistic("Flows", tfContext.Flow.TableName(), "Load complete", "2", 1, tfmContext.Log)
if tfContext.Init {
if tfContext.FlowState.FlowAlias != "" {
df.UpdateDataFlowStatistic("Flows", tfContext.FlowState.FlowAlias, "Load complete", "2", 1, tfmContext.Log)
} else {
df.UpdateDataFlowStatistic("Flows", tfContext.Flow.TableName(), "Load complete", "2", 1, tfmContext.Log)
}
}

// Second row here
// Not sure if necessary to copy entire ReportStatistics method
tenantIndexPath, tenantDFSIdPath := coreopts.BuildOptions.GetDFSPathName()
df.FinishStatistic(tfmContext, tfContext, tfContext.GoMod, "flume", tenantIndexPath, tenantDFSIdPath, tfmContext.Config.Log, false)
if tfContext.Init {
tenantIndexPath, tenantDFSIdPath := coreopts.BuildOptions.GetDFSPathName()
df.FinishStatistic(tfmContext, tfContext, tfContext.GoMod, "flume", tenantIndexPath, tenantDFSIdPath, tfmContext.Config.Log, false)
}

//df.FinishStatistic(tfmContext, tfContext, tfContext.GoMod, ...)
tfmContext.FlowControllerLock.Lock()
Expand All @@ -667,9 +650,9 @@ func (tfmContext *TrcFlowMachineContext) SyncTableCycle(tfContext *TrcFlowContex
go tfmContext.seedVaultCycle(tfContext, identityColumnName, indexColumnNames, getIndexedPathExt, flowPushRemote, sqlState)
}

func (tfmContext *TrcFlowMachineContext) SelectFlowChannel(tfContext *TrcFlowContext) <-chan bool {
func (tfmContext *TrcFlowMachineContext) SelectFlowChannel(tfContext *TrcFlowContext) <-chan interface{} {
if notificationFlowChannel, ok := tfmContext.ChannelMap[tfContext.Flow]; ok {
return notificationFlowChannel
return notificationFlowChannel.Ch
}
tfmContext.Log("Could not find channel for flow.", nil)

Expand Down Expand Up @@ -753,11 +736,7 @@ func (tfmContext *TrcFlowMachineContext) CallDBQuery(tfContext *TrcFlowContext,
// look up channels and notify them too.
for _, flowNotification := range flowNotifications {
if notificationFlowChannel, ok := tfmContext.ChannelMap[flowNotification]; ok {
if len(notificationFlowChannel) < 10 {
go func(nfc chan bool) {
nfc <- true
}(notificationFlowChannel)
}
notificationFlowChannel.Bcast(true)
}
}
}
Expand All @@ -766,11 +745,7 @@ func (tfmContext *TrcFlowMachineContext) CallDBQuery(tfContext *TrcFlowContext,
additionalTestFlows := tfmContext.GetAdditionalFlowsByState(flowtestState)
for _, flowNotification := range additionalTestFlows {
if notificationFlowChannel, ok := tfmContext.ChannelMap[flowNotification]; ok {
if len(notificationFlowChannel) < 10 {
go func(nfc chan bool) {
nfc <- true
}(notificationFlowChannel)
}
notificationFlowChannel.Bcast(true)
}
}
}
Expand Down Expand Up @@ -847,11 +822,7 @@ func (tfmContext *TrcFlowMachineContext) CallDBQuery(tfContext *TrcFlowContext,
// look up channels and notify them too.
for _, flowNotification := range flowNotifications {
if notificationFlowChannel, ok := tfmContext.ChannelMap[flowNotification]; ok {
if len(notificationFlowChannel) < 10 {
go func(nfc chan bool) {
nfc <- true
}(notificationFlowChannel)
}
notificationFlowChannel.Bcast(true)
}
}
}
Expand All @@ -860,11 +831,7 @@ func (tfmContext *TrcFlowMachineContext) CallDBQuery(tfContext *TrcFlowContext,
additionalTestFlows := tfmContext.GetAdditionalFlowsByState(flowtestState)
for _, flowNotification := range additionalTestFlows {
if notificationFlowChannel, ok := tfmContext.ChannelMap[flowNotification]; ok {
if len(notificationFlowChannel) < 10 {
go func(nfc chan bool) {
nfc <- true
}(notificationFlowChannel)
}
notificationFlowChannel.Bcast(true)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ require (
require (
github.com/cespare/xxhash v1.1.0 // indirect
github.com/dolthub/vitess v0.0.0-20221121184553-8d519d0bbb91 // indirect
github.com/glycerine/bchan v0.0.0-20170210221909-ad30cd867e1c // indirect
github.com/go-kit/kit v0.10.0 // indirect
github.com/gocraft/dbr/v2 v2.7.2 // indirect
github.com/golang-jwt/jwt/v5 v5.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0X
github.com/frankban/quicktest v1.14.4/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/glycerine/bchan v0.0.0-20170210221909-ad30cd867e1c h1:HSgdiDVw61eratEUy9GFtolPa00KZD+tdpJfKGE4xlQ=
github.com/glycerine/bchan v0.0.0-20170210221909-ad30cd867e1c/go.mod h1:jD29ULS3sOgFMjmLeH7QMK8CasGYe+buPBI+byHsjwk=
github.com/go-asn1-ber/asn1-ber v1.3.1/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0=
github.com/go-git/go-billy/v5 v5.4.1 h1:Uwp5tDRkPr+l/TnbHOQzp+tmJfLceOlbVucgpTz8ix4=
github.com/go-git/go-billy/v5 v5.4.1/go.mod h1:vjbugF6Fz7JIflbVpl1hJsGjSHNltrSw45YK/ukIvQg=
Expand Down
7 changes: 7 additions & 0 deletions pkg/trcinit/initlib/vault-seed.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ func seedVaultWithCertsFromEntry(config *eUtils.DriverConfig, mod *helperkv.Modi
mod2 := WriteData(config, entry.path, entry.data, mod)
if mod != mod2 {
mod.Stale = true
mod.Release()
defer mod2.Release()
mod = mod2
}
Expand All @@ -578,6 +579,7 @@ func seedVaultWithCertsFromEntry(config *eUtils.DriverConfig, mod *helperkv.Modi
mod2 := WriteData(config, commonPath, entry.data, mod)
if mod != mod2 {
mod.Stale = true
mod.Release()
defer mod2.Release()
mod = mod2
}
Expand All @@ -599,13 +601,15 @@ func seedVaultWithCertsFromEntry(config *eUtils.DriverConfig, mod *helperkv.Modi
mod2 := WriteData(config, secretEntry.path, secretEntry.data, mod)
if mod != mod2 {
mod.Stale = true
mod.Release()
defer mod2.Release()
mod = mod2
}

mod2 = WriteData(config, entry.path, entry.data, mod)
if mod != mod2 {
mod.Stale = true
mod.Release()
defer mod2.Release()
mod = mod2
}
Expand Down Expand Up @@ -785,6 +789,7 @@ func SeedVaultFromData(config *eUtils.DriverConfig, filepath string, fData []byt
mod2 := WriteData(config, entry.path, entry.data, mod)
if mod != mod2 {
mod.Stale = true
mod.Release()
defer mod2.Release()
mod = mod2
}
Expand All @@ -801,6 +806,7 @@ func SeedVaultFromData(config *eUtils.DriverConfig, filepath string, fData []byt
mod2 := WriteData(config, filepath, entry.data, mod)
if mod != mod2 {
mod.Stale = true
mod.Release()
defer mod2.Release()
mod = mod2
}
Expand All @@ -809,6 +815,7 @@ func SeedVaultFromData(config *eUtils.DriverConfig, filepath string, fData []byt
mod2 := WriteData(config, entry.path, entry.data, mod)
if mod != mod2 {
mod.Stale = true
mod.Release()
defer mod2.Release()
mod = mod2
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/vaulthelper/kv/Modifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func cachedModifierHelper(env string, addr string) (*Modifier, error) {
// Release - releases the modifier back to the cache.
func (m *Modifier) Release() {
if m.Stale {
m.httpClient.CloseIdleConnections()
return
}
if _, ok := modifierCache[m.Env]; ok {
Expand All @@ -183,7 +184,6 @@ func (m *Modifier) releaseHelper(env string) {

// Since modifiers are re-used now, this may not be necessary or even desired for that
// matter.
// m.httpClient.CloseIdleConnections()
if modifierCache[fmt.Sprintf("%s+%s", env, m.client.Address())].modCount > 10 {
m.CleanCache(10)
}
Expand Down