-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
sync.go
343 lines (309 loc) · 13.1 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
package ldapauth
import (
"errors"
"fmt"
"time"
"github.com/go-ldap/ldap/v3"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/smartcontractkit/chainlink-common/pkg/utils"
"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/sessions"
)
type LDAPServerStateSyncer struct {
q pg.Q
ldapClient LDAPClient
config config.LDAP
lggr logger.Logger
nextSyncTime time.Time
}
// NewLDAPServerStateSync creates a reaper that cleans stale sessions from the store.
func NewLDAPServerStateSync(
db *sqlx.DB,
pgCfg pg.QConfig,
config config.LDAP,
lggr logger.Logger,
) *utils.SleeperTask {
namedLogger := lggr.Named("LDAPServerStateSync")
serverSync := LDAPServerStateSyncer{
q: pg.NewQ(db, namedLogger, pgCfg),
ldapClient: newLDAPClient(config),
config: config,
lggr: namedLogger,
nextSyncTime: time.Time{},
}
// If enabled, start a background task that calls the Sync/Work function on an
// interval without needing an auth event to trigger it
// Use IsInstant to check 0 value to omit functionality.
if !config.UpstreamSyncInterval().IsInstant() {
lggr.Info("LDAP Config UpstreamSyncInterval is non-zero, sync functionality will be called on a timer, respecting the UpstreamSyncRateLimit value")
serverSync.StartWorkOnTimer()
} else {
// Ensure upstream server state is synced on startup manually if interval check not set
serverSync.Work()
}
// Start background Sync call task reactive to auth related events
serverSyncSleeperTask := utils.NewSleeperTask(&serverSync)
return serverSyncSleeperTask
}
func (ldSync *LDAPServerStateSyncer) Name() string {
return "LDAPServerStateSync"
}
func (ldSync *LDAPServerStateSyncer) StartWorkOnTimer() {
time.AfterFunc(ldSync.config.UpstreamSyncInterval().Duration(), ldSync.StartWorkOnTimer)
ldSync.Work()
}
func (ldSync *LDAPServerStateSyncer) Work() {
// Purge expired ldap_sessions and ldap_user_api_tokens
recordCreationStaleThreshold := ldSync.config.SessionTimeout().Before(time.Now())
err := ldSync.deleteStaleSessions(recordCreationStaleThreshold)
if err != nil {
ldSync.lggr.Error("unable to expire local LDAP sessions: ", err)
}
recordCreationStaleThreshold = ldSync.config.UserAPITokenDuration().Before(time.Now())
err = ldSync.deleteStaleAPITokens(recordCreationStaleThreshold)
if err != nil {
ldSync.lggr.Error("unable to expire user API tokens: ", err)
}
// Optional rate limiting check to limit the amount of upstream LDAP server queries performed
if !ldSync.config.UpstreamSyncRateLimit().IsInstant() {
if !time.Now().After(ldSync.nextSyncTime) {
return
}
// Enough time has elapsed to sync again, store the time for when next sync is allowed and begin sync
ldSync.nextSyncTime = time.Now().Add(ldSync.config.UpstreamSyncRateLimit().Duration())
}
ldSync.lggr.Info("Begin Upstream LDAP provider state sync after checking time against config UpstreamSyncInterval and UpstreamSyncRateLimit")
// For each defined role/group, query for the list of group members to gather the full list of possible users
users := []sessions.User{}
conn, err := ldSync.ldapClient.CreateEphemeralConnection()
if err != nil {
ldSync.lggr.Errorf("Failed to Dial LDAP Server", err)
return
}
// Root level root user auth with credentials provided from config
bindStr := ldSync.config.BaseUserAttr() + "=" + ldSync.config.ReadOnlyUserLogin() + "," + ldSync.config.BaseDN()
if err = conn.Bind(bindStr, ldSync.config.ReadOnlyUserPass()); err != nil {
ldSync.lggr.Errorf("Unable to login as initial root LDAP user", err)
}
defer conn.Close()
// Query for list of uniqueMember IDs present in Admin group
adminUsers, err := ldSync.ldapGroupMembersListToUser(conn, ldSync.config.AdminUserGroupCN(), sessions.UserRoleAdmin)
if err != nil {
ldSync.lggr.Errorf("Error in ldapGroupMembersListToUser: ", err)
return
}
// Query for list of uniqueMember IDs present in Edit group
editUsers, err := ldSync.ldapGroupMembersListToUser(conn, ldSync.config.EditUserGroupCN(), sessions.UserRoleEdit)
if err != nil {
ldSync.lggr.Errorf("Error in ldapGroupMembersListToUser: ", err)
return
}
// Query for list of uniqueMember IDs present in Edit group
runUsers, err := ldSync.ldapGroupMembersListToUser(conn, ldSync.config.RunUserGroupCN(), sessions.UserRoleRun)
if err != nil {
ldSync.lggr.Errorf("Error in ldapGroupMembersListToUser: ", err)
return
}
// Query for list of uniqueMember IDs present in Edit group
readUsers, err := ldSync.ldapGroupMembersListToUser(conn, ldSync.config.ReadUserGroupCN(), sessions.UserRoleView)
if err != nil {
ldSync.lggr.Errorf("Error in ldapGroupMembersListToUser: ", err)
return
}
users = append(users, adminUsers...)
users = append(users, editUsers...)
users = append(users, runUsers...)
users = append(users, readUsers...)
// Dedupe preserving order of highest role (sorted)
// Preserve members as a map for future lookup
upstreamUserStateMap := make(map[string]sessions.User)
dedupedEmails := []string{}
for _, user := range users {
if _, ok := upstreamUserStateMap[user.Email]; !ok {
upstreamUserStateMap[user.Email] = user
dedupedEmails = append(dedupedEmails, user.Email)
}
}
// For each unique user in list of active sessions, check for 'Is Active' propery if defined in the config. Some LDAP providers
// list group members that are no longer marked as active
usersActiveFlags, err := ldSync.validateUsersActive(dedupedEmails, conn)
if err != nil {
ldSync.lggr.Errorf("Error validating supplied user list: ", err)
}
// Remove users in the upstreamUserStateMap source of truth who are part of groups but marked as deactivated/no-active
for i, active := range usersActiveFlags {
if !active {
delete(upstreamUserStateMap, dedupedEmails[i])
}
}
// upstreamUserStateMap is now the most up to date source of truth
// Now sync database sessions and roles with new data
err = ldSync.q.Transaction(func(tx pg.Queryer) error {
// First, purge users present in the local ldap_sessions table but not in the upstream server
type LDAPSession struct {
UserEmail string
UserRole sessions.UserRole
}
var existingSessions []LDAPSession
if err = tx.Select(&existingSessions, "SELECT user_email, user_role FROM ldap_sessions WHERE localauth_user = false"); err != nil {
return fmt.Errorf("unable to query ldap_sessions table: %w", err)
}
var existingAPITokens []LDAPSession
if err = tx.Select(&existingAPITokens, "SELECT user_email, user_role FROM ldap_user_api_tokens WHERE localauth_user = false"); err != nil {
return fmt.Errorf("unable to query ldap_user_api_tokens table: %w", err)
}
// Create existing sessions and API tokens lookup map for later
existingSessionsMap := make(map[string]LDAPSession)
for _, sess := range existingSessions {
existingSessionsMap[sess.UserEmail] = sess
}
existingAPITokensMap := make(map[string]LDAPSession)
for _, sess := range existingAPITokens {
existingAPITokensMap[sess.UserEmail] = sess
}
// Populate list of session emails present in the local session table but not in the upstream state
emailsToPurge := []interface{}{}
for _, ldapSession := range existingSessions {
if _, ok := upstreamUserStateMap[ldapSession.UserEmail]; !ok {
emailsToPurge = append(emailsToPurge, ldapSession.UserEmail)
}
}
// Likewise for API Tokens table
apiTokenEmailsToPurge := []interface{}{}
for _, ldapSession := range existingAPITokens {
if _, ok := upstreamUserStateMap[ldapSession.UserEmail]; !ok {
apiTokenEmailsToPurge = append(apiTokenEmailsToPurge, ldapSession.UserEmail)
}
}
// Remove any active sessions this user may have
if len(emailsToPurge) > 0 {
_, err = ldSync.q.Exec("DELETE FROM ldap_sessions WHERE user_email = ANY($1)", pq.Array(emailsToPurge))
if err != nil {
return err
}
}
// Remove any active API tokens this user may have
if len(apiTokenEmailsToPurge) > 0 {
_, err = ldSync.q.Exec("DELETE FROM ldap_user_api_tokens WHERE user_email = ANY($1)", pq.Array(apiTokenEmailsToPurge))
if err != nil {
return err
}
}
// For each user session row, update role to match state of user map from upstream source
queryWhenClause := ""
emailValues := []interface{}{}
// Prepare CASE WHEN query statement with parameterized argument $n placeholders and matching role based on index
for email, user := range upstreamUserStateMap {
// Only build on SET CASE statement per local session and API token role, not for each upstream user value
_, sessionOk := existingSessionsMap[email]
_, tokenOk := existingAPITokensMap[email]
if !sessionOk && !tokenOk {
continue
}
emailValues = append(emailValues, email)
queryWhenClause += fmt.Sprintf("WHEN user_email = $%d THEN '%s' ", len(emailValues), user.Role)
}
// If there are remaining user entries to update
if len(emailValues) != 0 {
// Set new role state for all rows in single Exec
query := fmt.Sprintf("UPDATE ldap_sessions SET user_role = CASE %s ELSE user_role END", queryWhenClause)
_, err = ldSync.q.Exec(query, emailValues...)
if err != nil {
return err
}
// Update role of API tokens as well
query = fmt.Sprintf("UPDATE ldap_user_api_tokens SET user_role = CASE %s ELSE user_role END", queryWhenClause)
_, err = ldSync.q.Exec(query, emailValues...)
if err != nil {
return err
}
}
ldSync.lggr.Info("local ldap_sessions and ldap_user_api_tokens table successfully synced with upstream LDAP state")
return nil
})
if err != nil {
ldSync.lggr.Errorf("Error syncing local database state: ", err)
}
ldSync.lggr.Info("Upstream LDAP sync complete")
}
// deleteStaleSessions deletes all ldap_sessions before the passed time.
func (ldSync *LDAPServerStateSyncer) deleteStaleSessions(before time.Time) error {
_, err := ldSync.q.Exec("DELETE FROM ldap_sessions WHERE created_at < $1", before)
return err
}
// deleteStaleAPITokens deletes all ldap_user_api_tokens before the passed time.
func (ldSync *LDAPServerStateSyncer) deleteStaleAPITokens(before time.Time) error {
_, err := ldSync.q.Exec("DELETE FROM ldap_user_api_tokens WHERE created_at < $1", before)
return err
}
// ldapGroupMembersListToUser queries the LDAP server given a conn for a list of uniqueMember who are part of the parameterized group
func (ldSync *LDAPServerStateSyncer) ldapGroupMembersListToUser(conn LDAPConn, groupNameCN string, roleToAssign sessions.UserRole) ([]sessions.User, error) {
users, err := ldapGroupMembersListToUser(
conn, groupNameCN, roleToAssign, ldSync.config.GroupsDN(),
ldSync.config.BaseDN(), ldSync.config.QueryTimeout(),
ldSync.lggr,
)
if err != nil {
ldSync.lggr.Errorf("Error listing members of group (%s): %v", groupNameCN, err)
return users, errors.New("error searching group members in LDAP directory")
}
return users, nil
}
// validateUsersActive performs an additional LDAP server query for the supplied emails, checking the
// returned user data for an 'active' property defined optionally in the config.
// Returns same length bool 'valid' array, order preserved
func (ldSync *LDAPServerStateSyncer) validateUsersActive(emails []string, conn LDAPConn) ([]bool, error) {
validUsers := make([]bool, len(emails))
// If active attribute to check is not defined in config, skip
if ldSync.config.ActiveAttribute() == "" {
// pre fill with valids
for i := range emails {
validUsers[i] = true
}
return validUsers, nil
}
// Build the full email list query to pull all 'isActive' information for each user specified in one query
filterQuery := "(|"
for _, email := range emails {
escapedEmail := ldap.EscapeFilter(email)
filterQuery = fmt.Sprintf("%s(%s=%s)", filterQuery, ldSync.config.BaseUserAttr(), escapedEmail)
}
filterQuery = fmt.Sprintf("(&%s))", filterQuery)
searchBaseDN := fmt.Sprintf("%s,%s", ldSync.config.UsersDN(), ldSync.config.BaseDN())
searchRequest := ldap.NewSearchRequest(
searchBaseDN,
ldap.ScopeWholeSubtree, ldap.NeverDerefAliases,
0, int(ldSync.config.QueryTimeout().Seconds()), false,
filterQuery,
[]string{ldSync.config.BaseUserAttr(), ldSync.config.ActiveAttribute()},
nil,
)
// Query LDAP server for the ActiveAttribute property of each specified user
results, err := conn.Search(searchRequest)
if err != nil {
ldSync.lggr.Errorf("Error searching user in LDAP query: %v", err)
return validUsers, errors.New("error searching users in LDAP directory")
}
// Ensure user response entries
if len(results.Entries) == 0 {
return validUsers, errors.New("no users matching email query")
}
// Pull expected ActiveAttribute value from list of string possible values
// keyed on email for final step to return flag bool list where order is preserved
emailToActiveMap := make(map[string]bool)
for _, result := range results.Entries {
isActiveAttribute := result.GetAttributeValue(ldSync.config.ActiveAttribute())
uidAttribute := result.GetAttributeValue(ldSync.config.BaseUserAttr())
emailToActiveMap[uidAttribute] = isActiveAttribute == ldSync.config.ActiveAttributeAllowedValue()
}
for i, email := range emails {
active, ok := emailToActiveMap[email]
if ok && active {
validUsers[i] = true
}
}
return validUsers, nil
}