diff --git a/nodes/.gitignore b/nodes/.gitignore index 342fca884..e680b52c5 100644 --- a/nodes/.gitignore +++ b/nodes/.gitignore @@ -61,4 +61,5 @@ go.work dev.env prod.env test.env +.venv diff --git a/nodes/node_balancer/README.md b/nodes/node_balancer/README.md index e13ee7eac..4c193c06d 100644 --- a/nodes/node_balancer/README.md +++ b/nodes/node_balancer/README.md @@ -113,3 +113,14 @@ For Web3 providers `access_id` and `data_source` could be specified in headers ```bash /usr/local/go/bin/go test -run ^TestCleanInactiveClientNodes$ github.com/bugout-dev/moonstream/nodes/node_balancer/cmd/nodebalancer -v -count=1 ``` + +## Migrations + +To run migration: + +```bash +python migrations/migrations.py run --key 20230522 \ + --token-current-owner "$NB_CONTROLLER_TOKEN" \ + --token-new-owner "$MOONSTREAM_ADMIN_OR_OTHER_CONTROLLER" \ + --new-application-id "$MOONSTREAM_APPLICATION_ID" +``` diff --git a/nodes/node_balancer/cmd/nodebalancer/access_cache_test.go b/nodes/node_balancer/cmd/nodebalancer/access_cache_test.go new file mode 100644 index 000000000..4772a1f87 --- /dev/null +++ b/nodes/node_balancer/cmd/nodebalancer/access_cache_test.go @@ -0,0 +1,49 @@ +package main + +import ( + "testing" + "time" +) + +func accessCacheSetupSuit(t *testing.T) func(t *testing.T) { + t.Log("Setup suit") + + CreateAccessCache() + + return func(t *testing.T) { + t.Log("Teardown suit") + } +} + +func TestAddAccessToCache(t *testing.T) { + teardownSuit := accessCacheSetupSuit(t) + defer teardownSuit(t) + + tsNow := time.Now().Unix() + + var cases = []struct { + prop ClientAccess + expected bool + }{ + { + prop: ClientAccess{ClientResourceData: ClientResourceData{AccessID: "7378e2b2-b6ac-4738-bf34-fe39aa0d19e9"}}, + expected: true, + }, + { + prop: ClientAccess{ClientResourceData: ClientResourceData{AccessID: "000000000000000000000000000000000000"}}, + expected: false, + }, + { + prop: ClientAccess{ClientResourceData: ClientResourceData{Name: "name-1"}}, + expected: false, + }, + } + for _, c := range cases { + accessId := c.prop.ClientResourceData.AccessID + accessCache.AddAccessToCache(c.prop, tsNow) + if accessCache.isAccessIdInCache(accessId) != c.expected { + t.Logf("Access %s not found in access cache", accessId) + t.Fatal() + } + } +} diff --git a/nodes/node_balancer/cmd/nodebalancer/cli.go b/nodes/node_balancer/cmd/nodebalancer/cli.go index b5fba9f1f..ebf9e70db 100644 --- a/nodes/node_balancer/cmd/nodebalancer/cli.go +++ b/nodes/node_balancer/cmd/nodebalancer/cli.go @@ -253,19 +253,21 @@ func cli() { PeriodStartTs: time.Now().Unix(), MaxCallsPerPeriod: stateCLI.MaxCallsPerPeriodFlag, CallsPerPeriod: 0, + + Type: BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS, } _, err := bugoutClient.Brood.FindUser( NB_CONTROLLER_TOKEN, map[string]string{ "user_id": proposedClientResourceData.UserID, - "application_id": NB_APPLICATION_ID, + "application_id": MOONSTREAM_APPLICATION_ID, }, ) if err != nil { fmt.Printf("User does not exists, err: %v\n", err) os.Exit(1) } - resource, err := bugoutClient.Brood.CreateResource(NB_CONTROLLER_TOKEN, NB_APPLICATION_ID, proposedClientResourceData) + resource, err := bugoutClient.Brood.CreateResource(NB_CONTROLLER_TOKEN, MOONSTREAM_APPLICATION_ID, proposedClientResourceData) if err != nil { fmt.Printf("Unable to create user access, err: %v\n", err) os.Exit(1) @@ -302,7 +304,7 @@ func cli() { } resources, err := bugoutClient.Brood.GetResources( NB_CONTROLLER_TOKEN, - NB_APPLICATION_ID, + MOONSTREAM_APPLICATION_ID, queryParameters, ) if err != nil { @@ -406,7 +408,7 @@ func cli() { } resources, err := bugoutClient.Brood.GetResources( NB_CONTROLLER_TOKEN, - NB_APPLICATION_ID, + MOONSTREAM_APPLICATION_ID, queryParameters, ) if err != nil { @@ -464,7 +466,7 @@ func cli() { } resources, err := bugoutClient.Brood.GetResources( NB_CONTROLLER_TOKEN, - NB_APPLICATION_ID, + MOONSTREAM_APPLICATION_ID, queryParameters, ) if err != nil { diff --git a/nodes/node_balancer/cmd/nodebalancer/clients.go b/nodes/node_balancer/cmd/nodebalancer/clients.go index ebc0a3a1e..56cb1e179 100644 --- a/nodes/node_balancer/cmd/nodebalancer/clients.go +++ b/nodes/node_balancer/cmd/nodebalancer/clients.go @@ -15,6 +15,8 @@ var ( type ClientAccess struct { ResourceID string + authorizationToken string + ClientResourceData ClientResourceData LastAccessTs int64 @@ -36,6 +38,8 @@ type ClientResourceData struct { PeriodStartTs int64 `json:"period_start_ts"` MaxCallsPerPeriod int64 `json:"max_calls_per_period"` CallsPerPeriod int64 `json:"calls_per_period"` + + Type string `json:"type"` } // CheckClientCallPeriodLimits returns true if limit of call requests per period is exceeded diff --git a/nodes/node_balancer/cmd/nodebalancer/clients_test.go b/nodes/node_balancer/cmd/nodebalancer/clients_test.go index 4eecd1328..5d19d8715 100644 --- a/nodes/node_balancer/cmd/nodebalancer/clients_test.go +++ b/nodes/node_balancer/cmd/nodebalancer/clients_test.go @@ -1,4 +1,3 @@ -// TODO(kompotkot): Re-write tests for client package main import ( @@ -7,7 +6,7 @@ import ( "time" ) -func setupSuit(t *testing.T) func(t *testing.T) { +func clientsSetupSuit(t *testing.T) func(t *testing.T) { t.Log("Setup suit") supportedBlockchains = map[string]bool{"ethereum": true} @@ -18,7 +17,7 @@ func setupSuit(t *testing.T) func(t *testing.T) { } func TestAddClientNode(t *testing.T) { - teardownSuit := setupSuit(t) + teardownSuit := clientsSetupSuit(t) defer teardownSuit(t) var cases = []struct { @@ -44,7 +43,7 @@ func TestAddClientNode(t *testing.T) { } func TestGetClientNode(t *testing.T) { - teardownSuit := setupSuit(t) + teardownSuit := clientsSetupSuit(t) defer teardownSuit(t) ts := time.Now().Unix() @@ -75,7 +74,7 @@ func TestGetClientNode(t *testing.T) { } func TestCleanInactiveClientNodes(t *testing.T) { - teardownSuit := setupSuit(t) + teardownSuit := clientsSetupSuit(t) defer teardownSuit(t) ts := time.Now().Unix() diff --git a/nodes/node_balancer/cmd/nodebalancer/configs.go b/nodes/node_balancer/cmd/nodebalancer/configs.go index 429bec6db..6b4679f40 100644 --- a/nodes/node_balancer/cmd/nodebalancer/configs.go +++ b/nodes/node_balancer/cmd/nodebalancer/configs.go @@ -23,7 +23,7 @@ var ( // Bugout and application configuration BUGOUT_AUTH_CALL_TIMEOUT = time.Second * 5 - NB_APPLICATION_ID = os.Getenv("NB_APPLICATION_ID") + MOONSTREAM_APPLICATION_ID = os.Getenv("MOONSTREAM_APPLICATION_ID") NB_CONTROLLER_TOKEN = os.Getenv("NB_CONTROLLER_TOKEN") NB_CONTROLLER_ACCESS_ID = os.Getenv("NB_CONTROLLER_ACCESS_ID") @@ -46,6 +46,12 @@ var ( // Humbug configuration HUMBUG_REPORTER_NB_TOKEN = os.Getenv("HUMBUG_REPORTER_NB_TOKEN") + + // Moonstream resources types + BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS = "nodebalancer-access" + DEFAULT_AUTOGENERATED_USER_PERMISSIONS = []string{"read", "update", "delete"} + DEFAULT_AUTOGENERATED_PERIOD_DURATION = int64(86400) + DEFAULT_AUTOGENERATED_MAX_CALLS_PER_PERIOD = int64(1000) ) func CheckEnvVarSet() { diff --git a/nodes/node_balancer/cmd/nodebalancer/middleware.go b/nodes/node_balancer/cmd/nodebalancer/middleware.go index 27d51a3ea..f91774f28 100644 --- a/nodes/node_balancer/cmd/nodebalancer/middleware.go +++ b/nodes/node_balancer/cmd/nodebalancer/middleware.go @@ -16,60 +16,104 @@ import ( "sync" "time" + "github.com/bugout-dev/bugout-go/pkg/brood" humbug "github.com/bugout-dev/humbug/go/pkg" + "github.com/google/uuid" ) var ( - accessIdCache AccessCache + accessCache AccessCache ) +// AccessCache caches client identification for fast access to nodes +// +// If authorization passed with Bearer token, then it triggers to fetch Brood resource with access ID +// or create new one. After it under key `accessIds` and `authorizationTokens` will be added similar +// address pointers to one `ClientAccess`. type AccessCache struct { - accessIds map[string]ClientAccess + accessIds map[string]*ClientAccess + authorizationTokens map[string]*ClientAccess mux sync.RWMutex } // CreateAccessCache generates empty cache of client access func CreateAccessCache() { - accessIdCache = AccessCache{ - accessIds: make(map[string]ClientAccess), + accessCache = AccessCache{ + accessIds: make(map[string]*ClientAccess), + authorizationTokens: make(map[string]*ClientAccess), } } -// Get access id from cache if exists -func (ac *AccessCache) FindAccessIdInCache(accessId string) string { - var detectedId string +// FindAccessIdInCache looking for user access in `accessIds` cache +func (ac *AccessCache) isAccessIdInCache(accessId string) bool { + detected := false ac.mux.RLock() for id := range ac.accessIds { if id == accessId { - detectedId = id + detected = true break } } ac.mux.RUnlock() - return detectedId + return detected } -// Update last call access timestamp and datasource for access id -func (ac *AccessCache) UpdateAccessIdAtCache(accessId, requestedDataSource string, tsNow int64) { +// FindAuthorizationTokenInCache looking for user access in `authorizationTokens` cache +func (ac *AccessCache) isAuthorizationTokenInCache(authorizationToken string) bool { + detected := false + + ac.mux.RLock() + for id := range ac.authorizationTokens { + if id == authorizationToken { + detected = true + break + } + } + ac.mux.RUnlock() + + return detected +} + +// Update last call access timestamp and datasource for user access +func (ac *AccessCache) UpdateAccessAtCache(accessId, authorizationToken, requestedDataSource string, tsNow int64) { ac.mux.Lock() - if accessData, ok := ac.accessIds[accessId]; ok { - accessData.LastAccessTs = tsNow - accessData.requestedDataSource = requestedDataSource - accessData.LastSessionCallsCounter++ + var accessToModify *ClientAccess + + if accessId != "" { + if access, ok := ac.accessIds[accessId]; ok { + accessToModify = access + + } + } - ac.accessIds[accessId] = accessData + if authorizationToken != "" { + if access, ok := ac.authorizationTokens[authorizationToken]; ok { + accessToModify = access + } } + + accessToModify.LastAccessTs = tsNow + accessToModify.requestedDataSource = requestedDataSource + accessToModify.LastSessionCallsCounter++ + ac.mux.Unlock() } -// Add new access ID with data to cache -func (ac *AccessCache) AddAccessIdToCache(clientAccess ClientAccess, tsNow int64) { +// Add new user access identifier with data to cache +func (ac *AccessCache) AddAccessToCache(clientAccess ClientAccess, tsNow int64) error { + _, err := uuid.Parse(clientAccess.ClientResourceData.AccessID) + if err != nil { + log.Printf("Access ID %s is not valid UUID, err: %v", clientAccess.ClientResourceData.AccessID, err) + return fmt.Errorf("access ID is not valid UUID") + } + ac.mux.Lock() - ac.accessIds[clientAccess.ClientResourceData.AccessID] = ClientAccess{ - ResourceID: clientAccess.ResourceID, + access := &ClientAccess{ + ResourceID: clientAccess.ResourceID, + authorizationToken: clientAccess.authorizationToken, ClientResourceData: ClientResourceData{ UserID: clientAccess.ClientResourceData.UserID, @@ -83,6 +127,8 @@ func (ac *AccessCache) AddAccessIdToCache(clientAccess ClientAccess, tsNow int64 PeriodStartTs: clientAccess.ClientResourceData.PeriodStartTs, MaxCallsPerPeriod: clientAccess.ClientResourceData.MaxCallsPerPeriod, CallsPerPeriod: clientAccess.ClientResourceData.CallsPerPeriod, + + Type: clientAccess.ClientResourceData.Type, }, LastAccessTs: tsNow, @@ -91,7 +137,14 @@ func (ac *AccessCache) AddAccessIdToCache(clientAccess ClientAccess, tsNow int64 requestedDataSource: clientAccess.requestedDataSource, } + + ac.accessIds[clientAccess.ClientResourceData.AccessID] = access + if clientAccess.authorizationToken != "" { + ac.authorizationTokens[clientAccess.authorizationToken] = access + } ac.mux.Unlock() + + return nil } // Check each access id in cache if it exceeds lifetime @@ -100,9 +153,14 @@ func (ac *AccessCache) Cleanup() (int64, int64) { tsNow := time.Now().Unix() ac.mux.Lock() + for aId, clientAccess := range ac.accessIds { + totalAccessIds++ + removedUserId := "" + if tsNow-clientAccess.LastAccessTs > NB_CACHE_ACCESS_ID_LIFETIME { // Remove clients who is not active for NB_CACHE_ACCESS_ID_LIFETIME lifetime period + removedUserId = clientAccess.ClientResourceData.UserID delete(ac.accessIds, aId) removedAccessIds++ err := clientAccess.UpdateClientResourceCallCounter(tsNow) @@ -110,6 +168,7 @@ func (ac *AccessCache) Cleanup() (int64, int64) { log.Printf("Unable to update Brood resource, err: %v\n", err) } } else if tsNow-clientAccess.LastSessionAccessTs > NB_CACHE_ACCESS_ID_SESSION_LIFETIME { + removedUserId = clientAccess.ClientResourceData.UserID // Remove clients with too long sessions, greater then NB_CACHE_ACCESS_ID_SESSION_LIFETIME delete(ac.accessIds, aId) removedAccessIds++ @@ -117,12 +176,21 @@ func (ac *AccessCache) Cleanup() (int64, int64) { if err != nil { log.Printf("Unable to update Brood resource, err: %v\n", err) } - } else { - totalAccessIds++ + } + + if removedUserId != "" { + for aToken, clientAccess := range ac.authorizationTokens { + if clientAccess.ClientResourceData.UserID == removedUserId { + delete(ac.authorizationTokens, aToken) + } + } + removedUserId = "" } } ac.mux.Unlock() + totalAccessIds = totalAccessIds - removedAccessIds + return removedAccessIds, totalAccessIds } @@ -131,7 +199,7 @@ func initCacheCleaning(debug bool) { for { select { case <-t.C: - removedAccessIds, totalAccessIds := accessIdCache.Cleanup() + removedAccessIds, totalAccessIds := accessCache.Cleanup() if debug { log.Printf("Removed %d clients from access cache", removedAccessIds) } @@ -140,6 +208,99 @@ func initCacheCleaning(debug bool) { } } +// fetchClientAccessFromResources get resources with access ID or authorization token and generate new one if there no one +func fetchClientAccessFromResources(accessID, authorizationToken string, tsNow int64) (*ClientAccess, error) { + var err error + + queryParameters := map[string]string{"type": BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS} + if accessID != "" { + queryParameters["access_id"] = accessID + } + + token := NB_CONTROLLER_TOKEN + if authorizationToken != "" { + token = authorizationToken + } + + var resources brood.Resources + resources, err = bugoutClient.Brood.GetResources( + token, + MOONSTREAM_APPLICATION_ID, + queryParameters, + ) + if err != nil { + log.Printf("Unable to get resources, err: %v", err) + return nil, fmt.Errorf("unable to get access identifiers") + } + + if len(resources.Resources) == 0 { + if authorizationToken != "" { + // Generate new autogenerated access resource with default parameters and grant user permissions to work with it + user, err := bugoutClient.Brood.GetUser(authorizationToken) + if err != nil { + log.Printf("Unable to get user, err: %v", err) + return nil, fmt.Errorf("unable to find user with provided authorization token") + } + newResource, err := bugoutClient.Brood.CreateResource( + NB_CONTROLLER_TOKEN, MOONSTREAM_APPLICATION_ID, ClientResourceData{ + UserID: user.Id, + AccessID: uuid.New().String(), + Name: user.Username, + Description: "Autogenerated access ID", + BlockchainAccess: true, + ExtendedMethods: false, + + PeriodDuration: DEFAULT_AUTOGENERATED_PERIOD_DURATION, + PeriodStartTs: tsNow, + MaxCallsPerPeriod: DEFAULT_AUTOGENERATED_MAX_CALLS_PER_PERIOD, + CallsPerPeriod: 0, + + Type: BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS, + }, + ) + if err != nil { + log.Printf("Unable to create resource with autogenerated access for user with ID %s, err: %v", user.Id, err) + return nil, fmt.Errorf("unable to create resource with autogenerated access for user") + } + + resourceHolderPermissions, err := bugoutClient.Brood.AddResourceHolderPermissions( + NB_CONTROLLER_TOKEN, newResource.Id, brood.ResourceHolder{ + Id: user.Id, + HolderType: "user", + Permissions: DEFAULT_AUTOGENERATED_USER_PERMISSIONS, + }, + ) + if err != nil { + log.Printf("Unable to grant permissions to user with ID %s at resource with ID %s, err: %v", newResource.Id, user.Id, err) + return nil, fmt.Errorf("unable to create resource with autogenerated access for user") + } + + log.Printf("Created new resource with ID %s with autogenerated access for user with ID %s", resourceHolderPermissions.ResourceId, user.Id) + resources.Resources = append(resources.Resources, newResource) + } else { + return nil, fmt.Errorf("there are no provided access identifier") + } + } else if len(resources.Resources) > 1 { + // TODO(kompotkot): Write support of multiple resources, be careful, because NB_CONTROLLER has several resources + return nil, fmt.Errorf("there are no provided access identifier") + } + + var clientAccessRaw ClientAccess + resourceData, err := json.Marshal(&resources.Resources[0].ResourceData) + if err != nil { + log.Printf("Unable to parse resource data to access identifier, err: %v", err) + return nil, fmt.Errorf("unable to parse resource data to access identifier") + } + err = json.Unmarshal(resourceData, &clientAccessRaw.ClientResourceData) + if err != nil { + log.Printf("Unable to decode resource data to access identifier, err: %v", err) + return nil, fmt.Errorf("unable to decode resource data to access identifier") + } + clientAccessRaw.ResourceID = resources.Resources[0].Id + + return &clientAccessRaw, nil +} + // Extract access_id from header and query. Query takes precedence over header. func extractAccessID(r *http.Request) string { var accessID string @@ -289,76 +450,95 @@ func accessMiddleware(next http.Handler) http.Handler { accessID := extractAccessID(r) requestedDataSource := extractRequestedDataSource(r) - if accessID == "" { - http.Error(w, "No access id passed with request", http.StatusForbidden) + // Extract Authorization token if Bearer header provided + var authorizationTokenRaw string + authorizationTokenHeaders := r.Header[strings.Title("authorization")] + for _, h := range authorizationTokenHeaders { + authorizationTokenRaw = h + } + var authorizationToken string + if authorizationTokenRaw != "" { + authorizationTokenSlice := strings.Split(authorizationTokenRaw, " ") + if len(authorizationTokenSlice) != 2 || authorizationTokenSlice[0] != "Bearer" || authorizationTokenSlice[1] == "" { + http.Error(w, "Wrong authorization token provided", http.StatusForbidden) + return + } + authorizationToken = authorizationTokenSlice[1] + } + + if accessID == "" && authorizationToken == "" { + http.Error(w, "No access ID or authorization header passed with request", http.StatusForbidden) return } tsNow := time.Now().Unix() // If access id does not belong to internal crawlers, then check cache or find it in Bugout resources - if accessID == NB_CONTROLLER_ACCESS_ID { - currentClientAccess = internalUsageAccess + if accessID != "" && accessID == NB_CONTROLLER_ACCESS_ID { if stateCLI.enableDebugFlag { log.Printf("Access ID belongs to internal usage for user with ID %s", currentClientAccess.ClientResourceData.UserID) } + currentClientAccess = internalUsageAccess currentClientAccess.LastAccessTs = tsNow currentClientAccess.requestedDataSource = requestedDataSource - } else if accessIdCache.FindAccessIdInCache(accessID) != "" { - currentClientAccess = accessIdCache.accessIds[accessID] + } else if accessID != "" && accessCache.isAccessIdInCache(accessID) { if stateCLI.enableDebugFlag { log.Printf("Access ID found in cache for user with ID %s", currentClientAccess.ClientResourceData.UserID) } - // Check if limit of calls not exceeded + currentClientAccess = *accessCache.accessIds[accessID] isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow) if !isClientAllowedToGetAccess { http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden) return } currentClientAccess.requestedDataSource = requestedDataSource - accessIdCache.UpdateAccessIdAtCache(accessID, requestedDataSource, tsNow) - } else { + accessCache.UpdateAccessAtCache(accessID, authorizationToken, requestedDataSource, tsNow) + } else if accessID == "" && accessCache.isAuthorizationTokenInCache(authorizationToken) { if stateCLI.enableDebugFlag { - log.Printf("New access id, looking at Brood resources") + log.Printf("Client connected with Authorization token") } - resources, err := bugoutClient.Brood.GetResources( - NB_CONTROLLER_TOKEN, - NB_APPLICATION_ID, - map[string]string{"access_id": accessID}, - ) - if err != nil { - http.Error(w, "Unable to get user with provided access identifier", http.StatusForbidden) - return - } - resourcesLen := len(resources.Resources) - if resourcesLen == 0 { - http.Error(w, "User with provided access identifier not found", http.StatusForbidden) - return - } - if resourcesLen > 1 { - http.Error(w, "User with provided access identifier has several access IDs", http.StatusInternalServerError) - return - } - resourceData, err := json.Marshal(resources.Resources[0].ResourceData) - if err != nil { - http.Error(w, "Unable to encode resource data interface to json", http.StatusInternalServerError) + currentClientAccess = *accessCache.authorizationTokens[authorizationToken] + isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow) + if !isClientAllowedToGetAccess { + http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden) return } - currentClientAccess.ResourceID = resources.Resources[0].Id currentClientAccess.requestedDataSource = requestedDataSource - err = json.Unmarshal(resourceData, ¤tClientAccess.ClientResourceData) + accessCache.UpdateAccessAtCache(accessID, authorizationToken, requestedDataSource, tsNow) + } else { + if stateCLI.enableDebugFlag { + log.Printf("No access identity found in cache, looking at Brood resources") + } + + clientAccessRaw, err := fetchClientAccessFromResources(accessID, authorizationToken, tsNow) if err != nil { - http.Error(w, "Unable to decode resource data json to structure", http.StatusInternalServerError) + http.Error(w, fmt.Sprintf("%v", err), http.StatusForbidden) return } - // Check if limit of calls not exceeded - isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow) + isClientAllowedToGetAccess := clientAccessRaw.CheckClientCallPeriodLimits(tsNow) if !isClientAllowedToGetAccess { http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden) return } - accessIdCache.AddAccessIdToCache(currentClientAccess, tsNow) + currentClientAccess = ClientAccess(*clientAccessRaw) + currentClientAccess.authorizationToken = authorizationToken + currentClientAccess.requestedDataSource = requestedDataSource + + // If client logged in before with access ID and it exists in cache, then re-use it + // else create new instances in cache + if authorizationToken != "" && accessCache.isAccessIdInCache(currentClientAccess.ClientResourceData.AccessID) { + accessCache.authorizationTokens[authorizationToken] = accessCache.accessIds[currentClientAccess.ClientResourceData.AccessID] + } else { + if stateCLI.enableDebugFlag { + log.Printf("Adding new access identifier in cache") + } + err := accessCache.AddAccessToCache(currentClientAccess, tsNow) + if err != nil { + http.Error(w, "Unable to add access ID to cache", http.StatusForbidden) + return + } + } } ctxUser := context.WithValue(r.Context(), "currentClientAccess", currentClientAccess) diff --git a/nodes/node_balancer/cmd/nodebalancer/server.go b/nodes/node_balancer/cmd/nodebalancer/server.go index 7ce082789..cbddb1f5c 100644 --- a/nodes/node_balancer/cmd/nodebalancer/server.go +++ b/nodes/node_balancer/cmd/nodebalancer/server.go @@ -121,7 +121,7 @@ func Server() { // Fetch access id for internal usage (crawlers, infrastructure, etc) resources, err := bugoutClient.Brood.GetResources( NB_CONTROLLER_TOKEN, - NB_APPLICATION_ID, + MOONSTREAM_APPLICATION_ID, map[string]string{"access_id": NB_CONTROLLER_ACCESS_ID}, ) if err != nil { diff --git a/nodes/node_balancer/cmd/nodebalancer/version.go b/nodes/node_balancer/cmd/nodebalancer/version.go index 0a350a652..4aeaf8772 100644 --- a/nodes/node_balancer/cmd/nodebalancer/version.go +++ b/nodes/node_balancer/cmd/nodebalancer/version.go @@ -1,3 +1,3 @@ package main -var NB_VERSION = "0.2.2" +var NB_VERSION = "0.2.3" diff --git a/nodes/node_balancer/go.mod b/nodes/node_balancer/go.mod index afe71f5ca..b812d40f2 100644 --- a/nodes/node_balancer/go.mod +++ b/nodes/node_balancer/go.mod @@ -3,7 +3,7 @@ module github.com/bugout-dev/moonstream/nodes/node_balancer go 1.17 require ( - github.com/bugout-dev/bugout-go v0.4.1 + github.com/bugout-dev/bugout-go v0.4.2 github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205 github.com/google/uuid v1.3.0 ) diff --git a/nodes/node_balancer/go.sum b/nodes/node_balancer/go.sum index 95070b9c3..18e9cda61 100644 --- a/nodes/node_balancer/go.sum +++ b/nodes/node_balancer/go.sum @@ -23,8 +23,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= -github.com/bugout-dev/bugout-go v0.4.1 h1:idZ4k+/skHj217/q8OmHBoYdzwJrqCY5Vd7S8FM6zlo= -github.com/bugout-dev/bugout-go v0.4.1/go.mod h1:P4+788iHtt/32u2wIaRTaiXTWpvSVBYxZ01qQ8N7eB8= +github.com/bugout-dev/bugout-go v0.4.2 h1:oADFQzZ4iZeQOz8dDaO/+25eQkrCYG8SqjA8mRSQl7k= +github.com/bugout-dev/bugout-go v0.4.2/go.mod h1:P4+788iHtt/32u2wIaRTaiXTWpvSVBYxZ01qQ8N7eB8= github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205 h1:UQ7XGjvoOVKGRIuTFXgqGtU/UgMOk8+ikpoHWrWefjQ= github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205/go.mod h1:U/NXHfc3tzGeQz+xVfpifXdPZi7p6VV8xdP/4ZKeWJU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= diff --git a/nodes/node_balancer/migrations/migrations.py b/nodes/node_balancer/migrations/migrations.py new file mode 100644 index 000000000..d7c7145bb --- /dev/null +++ b/nodes/node_balancer/migrations/migrations.py @@ -0,0 +1,140 @@ +import argparse +import logging +import os + +from bugout.app import Bugout + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def migration_20230522( + token_current_owner: str, token_new_owner: str, new_application_id: str +) -> None: + BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev") + + bc = Bugout(brood_api_url=BUGOUT_BROOD_URL) + + try: + resources = bc.list_resources(token=token_current_owner, params={}) + except Exception as err: + raise Exception(err) + + logger.info(f"Found {len(resources.resources)} resources") + + while input("Do you want to continue [y/n]? ") != "y": + return + + cnt = 0 + for resource in resources.resources: + resource_data = resource.resource_data + resource_data["type"] = "nodebalancer-access" + + try: + new_resource = bc.create_resource( + token=token_new_owner, + application_id=new_application_id, + resource_data=resource_data, + ) + cnt += 1 + logger.info( + f"Created resource with ID {new_resource.id} and copied modified resource data" + ) + except Exception as err: + logger.error(f"Unable to copy resource with ID {resource.id}, err: {err}") + + user_id = new_resource.resource_data.get("user_id", "") + + try: + new_permissions = bc.add_resource_holder_permissions( + token=token_new_owner, + resource_id=new_resource.id, + holder_permissions={ + "holder_id": user_id, + "holder_type": "user", + "permissions": ["read", "update", "delete"], + }, + ) + logger.info( + f"Granted permissions for resource with ID {new_permissions.resource_id} to user with ID {user_id}" + ) + except Exception as err: + logger.error( + f"Unable grant permissions for resource with ID {resource.id} to user with ID {user_id}, err: {err}" + ) + + logger.info(f"Copied {cnt} resources") + + +MIGRATIONS_LIST = { + "20230522": { + "description": "Modify existing Brood resources to Moonstream resources structure " + "with `type` key equal to `nodebalancer-access`. And transfer ownership to moonstream admin. " + "Then create permissions for user access.", + "exec_func": migration_20230522, + "required_args": [ + "token-current-owner", + "token-new-owner", + "new-application-id", + ], + } +} + + +def list_handler(args: argparse.Namespace) -> None: + return print(MIGRATIONS_LIST) + + +def run_handler(args: argparse.Namespace) -> None: + migration = MIGRATIONS_LIST.get(args.key, None) + if migration is None: + logger.error(f"Migration with key '{args.key}' not found") + return + + migration["exec_func"]( + token_current_owner=args.token_current_owner, + token_new_owner=args.token_new_owner, + new_application_id=args.new_application_id, + ) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Moonstream mode balancer migrations CLI" + ) + parser.set_defaults(func=lambda _: parser.print_help()) + subcommands = parser.add_subparsers(description="Migration commands") + + parser_list = subcommands.add_parser("list", description="List migrations") + parser_list.set_defaults(func=list_handler) + + parser_run = subcommands.add_parser("run", description="Run migration") + parser_run.add_argument( + "-k", "--key", required=True, type=str, help="Key of migration to run" + ) + parser_run.add_argument( + "--token-current-owner", + type=str, + default=argparse.SUPPRESS, + help="Bugout access token of current resource owner", + ) + parser_run.add_argument( + "--token-new-owner", + type=str, + default=argparse.SUPPRESS, + help="Bugout access token of new resource owner", + ) + parser_run.add_argument( + "--new-application-id", + type=str, + default=argparse.SUPPRESS, + help="Bugout application ID to transfer resources", + ) + parser_run.set_defaults(func=run_handler) + + args = parser.parse_args() + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/nodes/node_balancer/migrations/requirements.txt b/nodes/node_balancer/migrations/requirements.txt new file mode 100644 index 000000000..70ea874fb --- /dev/null +++ b/nodes/node_balancer/migrations/requirements.txt @@ -0,0 +1,18 @@ +black==23.3.0 +bugout==0.2.7 +certifi==2023.5.7 +charset-normalizer==3.1.0 +click==8.1.3 +idna==3.4 +isort==5.12.0 +mypy==1.3.0 +mypy-extensions==1.0.0 +packaging==23.1 +pathspec==0.11.1 +pkg_resources==0.0.0 +platformdirs==3.5.1 +pydantic==1.10.7 +requests==2.30.0 +tomli==2.0.1 +typing_extensions==4.5.0 +urllib3==2.0.2 diff --git a/nodes/node_balancer/sample.env b/nodes/node_balancer/sample.env index 1917d0235..74f37284f 100644 --- a/nodes/node_balancer/sample.env +++ b/nodes/node_balancer/sample.env @@ -1,6 +1,6 @@ # Required environment variables for load balancer export BUGOUT_BROOD_URL="https://auth.bugout.dev" -export NB_APPLICATION_ID="" +export MOONSTREAM_APPLICATION_ID="" export NB_CONTROLLER_TOKEN="" export NB_CONTROLLER_ACCESS_ID=""