From 1e825c5487dedaa793a442fc1bacea6084e798a6 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Mon, 22 May 2023 14:45:57 +0000 Subject: [PATCH 01/10] Migration to transfer nodebalancer access to other application --- nodes/.gitignore | 1 + nodes/node_balancer/migrations/migrations.py | 119 ++++++++++++++++++ .../node_balancer/migrations/requirements.txt | 18 +++ 3 files changed, 138 insertions(+) create mode 100644 nodes/node_balancer/migrations/migrations.py create mode 100644 nodes/node_balancer/migrations/requirements.txt diff --git a/nodes/.gitignore b/nodes/.gitignore index 342fca884..e680b52c5 100644 --- a/nodes/.gitignore +++ b/nodes/.gitignore @@ -61,4 +61,5 @@ go.work dev.env prod.env test.env +.venv diff --git a/nodes/node_balancer/migrations/migrations.py b/nodes/node_balancer/migrations/migrations.py new file mode 100644 index 000000000..bb1cad907 --- /dev/null +++ b/nodes/node_balancer/migrations/migrations.py @@ -0,0 +1,119 @@ +import argparse +import csv +import json +import logging +import os +import uuid +from typing import Any, Dict, List + +from bugout.app import Bugout + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def migration_20230522( + token_current_owner: str, token_new_owner: str, new_application_id: str +) -> None: + BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev") + + bc = Bugout(brood_api_url=BUGOUT_BROOD_URL) + + try: + resources = bc.list_resources(token=token_current_owner, params={}) + except Exception as err: + raise Exception(err) + + logger.info(f"Found {len(resources.resources)} resources") + + cnt = 0 + for resource in resources.resources: + resource_data = resource.resource_data + resource_data["type"] = "nodebalancer-access" + try: + new_resource = bc.create_resource( + token=token_new_owner, + application_id=new_application_id, + resource_data=resource_data, + ) + cnt += 1 + logger.info( + f"Created resource with ID {new_resource.id} and copied modified resource data" + ) + except Exception as err: + logger.error(f"Unable to copy resource with ID {resource.id}, err: {err}") + + logger.info(f"Copied {cnt} resources") + + +MIGRATIONS_LIST = { + "20230522": { + "description": "Modify existing Brood resources to Moonstream resources structure " + "with `type` key equal to `nodebalancer`. And transfer ownership to moonstream admin.", + "exec_func": migration_20230522, + "required_args": [ + "token-current-owner", + "token-new-owner", + "new-application-id", + ], + } +} + + +def list_handler(args: argparse.Namespace) -> None: + return print(MIGRATIONS_LIST) + + +def run_handler(args: argparse.Namespace) -> None: + migration = MIGRATIONS_LIST.get(args.key, None) + if migration is None: + logger.error(f"Migration with key '{args.key}' not found") + return + + migration["exec_func"]( + token_current_owner=args.token_current_owner, + token_new_owner=args.token_new_owner, + new_application_id=args.new_application_id, + ) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Moonstream mode balancer migrations CLI" + ) + parser.set_defaults(func=lambda _: parser.print_help()) + subcommands = parser.add_subparsers(description="Migration commands") + + parser_list = subcommands.add_parser("list", description="List migrations") + parser_list.set_defaults(func=list_handler) + + parser_run = subcommands.add_parser("run", description="Run migration") + parser_run.add_argument( + "-k", "--key", required=True, type=str, help="Key of migration to run" + ) + parser_run.add_argument( + "--token-current-owner", + type=str, + default=argparse.SUPPRESS, + help="Bugout access token of current resource owner", + ) + parser_run.add_argument( + "--token-new-owner", + type=str, + default=argparse.SUPPRESS, + help="Bugout access token of new resource owner", + ) + parser_run.add_argument( + "--new-application-id", + type=str, + default=argparse.SUPPRESS, + help="Bugout application ID to transfer resources", + ) + parser_run.set_defaults(func=run_handler) + + args = parser.parse_args() + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/nodes/node_balancer/migrations/requirements.txt b/nodes/node_balancer/migrations/requirements.txt new file mode 100644 index 000000000..b4f97f298 --- /dev/null +++ b/nodes/node_balancer/migrations/requirements.txt @@ -0,0 +1,18 @@ +black==23.3.0 +bugout==0.2.6 +certifi==2023.5.7 +charset-normalizer==3.1.0 +click==8.1.3 +idna==3.4 +isort==5.12.0 +mypy==1.3.0 +mypy-extensions==1.0.0 +packaging==23.1 +pathspec==0.11.1 +pkg_resources==0.0.0 +platformdirs==3.5.1 +pydantic==1.10.7 +requests==2.30.0 +tomli==2.0.1 +typing_extensions==4.5.0 +urllib3==2.0.2 From 4c862174396cd27b8349a808785b6bf410b38c95 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Tue, 23 May 2023 07:46:29 +0000 Subject: [PATCH 02/10] User prompt for migration and updated README --- nodes/node_balancer/README.md | 11 +++++++++++ nodes/node_balancer/migrations/migrations.py | 6 +++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/nodes/node_balancer/README.md b/nodes/node_balancer/README.md index e13ee7eac..4c193c06d 100644 --- a/nodes/node_balancer/README.md +++ b/nodes/node_balancer/README.md @@ -113,3 +113,14 @@ For Web3 providers `access_id` and `data_source` could be specified in headers ```bash /usr/local/go/bin/go test -run ^TestCleanInactiveClientNodes$ github.com/bugout-dev/moonstream/nodes/node_balancer/cmd/nodebalancer -v -count=1 ``` + +## Migrations + +To run migration: + +```bash +python migrations/migrations.py run --key 20230522 \ + --token-current-owner "$NB_CONTROLLER_TOKEN" \ + --token-new-owner "$MOONSTREAM_ADMIN_OR_OTHER_CONTROLLER" \ + --new-application-id "$MOONSTREAM_APPLICATION_ID" +``` diff --git a/nodes/node_balancer/migrations/migrations.py b/nodes/node_balancer/migrations/migrations.py index bb1cad907..4bfc23840 100644 --- a/nodes/node_balancer/migrations/migrations.py +++ b/nodes/node_balancer/migrations/migrations.py @@ -1,9 +1,6 @@ import argparse -import csv -import json import logging import os -import uuid from typing import Any, Dict, List from bugout.app import Bugout @@ -26,6 +23,9 @@ def migration_20230522( logger.info(f"Found {len(resources.resources)} resources") + while input("Do you want to continue [y/n]? ") != "y": + return + cnt = 0 for resource in resources.resources: resource_data = resource.resource_data From 33adf6255a2f973a34e8dce7b89bd07394d15bfc Mon Sep 17 00:00:00 2001 From: kompotkot Date: Tue, 23 May 2023 09:58:33 +0000 Subject: [PATCH 03/10] During migration add permissions to access ID owner for resource --- nodes/node_balancer/migrations/migrations.py | 25 ++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/nodes/node_balancer/migrations/migrations.py b/nodes/node_balancer/migrations/migrations.py index 4bfc23840..7233a1679 100644 --- a/nodes/node_balancer/migrations/migrations.py +++ b/nodes/node_balancer/migrations/migrations.py @@ -1,7 +1,6 @@ import argparse import logging import os -from typing import Any, Dict, List from bugout.app import Bugout @@ -30,6 +29,7 @@ def migration_20230522( for resource in resources.resources: resource_data = resource.resource_data resource_data["type"] = "nodebalancer-access" + try: new_resource = bc.create_resource( token=token_new_owner, @@ -43,13 +43,34 @@ def migration_20230522( except Exception as err: logger.error(f"Unable to copy resource with ID {resource.id}, err: {err}") + user_id = new_resource.resource_data.get("user_id", "") + + try: + new_permissions = bc.add_resource_holder_permissions( + token=token_new_owner, + resource_id=new_resource.id, + holder_permissions={ + "holder_id": user_id, + "holder_type": "user", + "permissions": ["create", "read", "update", "delete"], + }, + ) + logger.info( + f"Granted permissions for resource with ID {new_permissions.resource_id} to user with ID {user_id}" + ) + except Exception as err: + logger.error( + f"Unable grant permissions for resource with ID {resource.id} to user with ID {user_id}, err: {err}" + ) + logger.info(f"Copied {cnt} resources") MIGRATIONS_LIST = { "20230522": { "description": "Modify existing Brood resources to Moonstream resources structure " - "with `type` key equal to `nodebalancer`. And transfer ownership to moonstream admin.", + "with `type` key equal to `nodebalancer-access`. And transfer ownership to moonstream admin. " + "Then create permissions for user access.", "exec_func": migration_20230522, "required_args": [ "token-current-owner", From 401ae81dc67baa521a936b28eab109d7f21ff2a4 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Tue, 23 May 2023 13:40:15 +0000 Subject: [PATCH 04/10] Set bugout resource type for nodebalancer access --- nodes/node_balancer/cmd/nodebalancer/configs.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nodes/node_balancer/cmd/nodebalancer/configs.go b/nodes/node_balancer/cmd/nodebalancer/configs.go index 429bec6db..4c6df3c2b 100644 --- a/nodes/node_balancer/cmd/nodebalancer/configs.go +++ b/nodes/node_balancer/cmd/nodebalancer/configs.go @@ -46,6 +46,9 @@ var ( // Humbug configuration HUMBUG_REPORTER_NB_TOKEN = os.Getenv("HUMBUG_REPORTER_NB_TOKEN") + + // Moonstream resources types + BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS = "nodebalancer-access" ) func CheckEnvVarSet() { From 2d42ac4cd595800c23fce5e67947dc6c4aa848e5 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Wed, 24 May 2023 13:24:23 +0000 Subject: [PATCH 05/10] Correct permissions during migration script for nb --- nodes/node_balancer/migrations/migrations.py | 2 +- nodes/node_balancer/migrations/requirements.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nodes/node_balancer/migrations/migrations.py b/nodes/node_balancer/migrations/migrations.py index 7233a1679..d7c7145bb 100644 --- a/nodes/node_balancer/migrations/migrations.py +++ b/nodes/node_balancer/migrations/migrations.py @@ -52,7 +52,7 @@ def migration_20230522( holder_permissions={ "holder_id": user_id, "holder_type": "user", - "permissions": ["create", "read", "update", "delete"], + "permissions": ["read", "update", "delete"], }, ) logger.info( diff --git a/nodes/node_balancer/migrations/requirements.txt b/nodes/node_balancer/migrations/requirements.txt index b4f97f298..70ea874fb 100644 --- a/nodes/node_balancer/migrations/requirements.txt +++ b/nodes/node_balancer/migrations/requirements.txt @@ -1,5 +1,5 @@ black==23.3.0 -bugout==0.2.6 +bugout==0.2.7 certifi==2023.5.7 charset-normalizer==3.1.0 click==8.1.3 From 7f89784f2f39ac468a4ce9774ab499e06109f6da Mon Sep 17 00:00:00 2001 From: kompotkot Date: Wed, 24 May 2023 13:25:00 +0000 Subject: [PATCH 06/10] Support of two caches for access IDs and Auth tokens --- nodes/node_balancer/cmd/nodebalancer/cli.go | 2 + .../node_balancer/cmd/nodebalancer/clients.go | 4 + .../cmd/nodebalancer/middleware.go | 253 ++++++++++++++---- 3 files changed, 211 insertions(+), 48 deletions(-) diff --git a/nodes/node_balancer/cmd/nodebalancer/cli.go b/nodes/node_balancer/cmd/nodebalancer/cli.go index b5fba9f1f..6364fd98d 100644 --- a/nodes/node_balancer/cmd/nodebalancer/cli.go +++ b/nodes/node_balancer/cmd/nodebalancer/cli.go @@ -253,6 +253,8 @@ func cli() { PeriodStartTs: time.Now().Unix(), MaxCallsPerPeriod: stateCLI.MaxCallsPerPeriodFlag, CallsPerPeriod: 0, + + Type: BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS, } _, err := bugoutClient.Brood.FindUser( NB_CONTROLLER_TOKEN, diff --git a/nodes/node_balancer/cmd/nodebalancer/clients.go b/nodes/node_balancer/cmd/nodebalancer/clients.go index ebc0a3a1e..56cb1e179 100644 --- a/nodes/node_balancer/cmd/nodebalancer/clients.go +++ b/nodes/node_balancer/cmd/nodebalancer/clients.go @@ -15,6 +15,8 @@ var ( type ClientAccess struct { ResourceID string + authorizationToken string + ClientResourceData ClientResourceData LastAccessTs int64 @@ -36,6 +38,8 @@ type ClientResourceData struct { PeriodStartTs int64 `json:"period_start_ts"` MaxCallsPerPeriod int64 `json:"max_calls_per_period"` CallsPerPeriod int64 `json:"calls_per_period"` + + Type string `json:"type"` } // CheckClientCallPeriodLimits returns true if limit of call requests per period is exceeded diff --git a/nodes/node_balancer/cmd/nodebalancer/middleware.go b/nodes/node_balancer/cmd/nodebalancer/middleware.go index 27d51a3ea..bd41f1fbe 100644 --- a/nodes/node_balancer/cmd/nodebalancer/middleware.go +++ b/nodes/node_balancer/cmd/nodebalancer/middleware.go @@ -16,23 +16,27 @@ import ( "sync" "time" + "github.com/bugout-dev/bugout-go/pkg/brood" humbug "github.com/bugout-dev/humbug/go/pkg" + "github.com/google/uuid" ) var ( - accessIdCache AccessCache + accessCache AccessCache ) type AccessCache struct { - accessIds map[string]ClientAccess + accessIds map[string]*ClientAccess + authorizationTokens map[string]*ClientAccess mux sync.RWMutex } // CreateAccessCache generates empty cache of client access func CreateAccessCache() { - accessIdCache = AccessCache{ - accessIds: make(map[string]ClientAccess), + accessCache = AccessCache{ + accessIds: make(map[string]*ClientAccess), + authorizationTokens: make(map[string]*ClientAccess), } } @@ -52,24 +56,47 @@ func (ac *AccessCache) FindAccessIdInCache(accessId string) string { return detectedId } +// Get access id from cache if exists +func (ac *AccessCache) FindAuthorizationTokenInCache(authorizationToken string) string { + var detected string + + ac.mux.RLock() + for id := range ac.authorizationTokens { + if id == authorizationToken { + detected = id + break + } + } + ac.mux.RUnlock() + + return detected +} + // Update last call access timestamp and datasource for access id -func (ac *AccessCache) UpdateAccessIdAtCache(accessId, requestedDataSource string, tsNow int64) { +func (ac *AccessCache) UpdateAccessAtCache(accessId, authorizationToken, requestedDataSource string, tsNow int64) { ac.mux.Lock() - if accessData, ok := ac.accessIds[accessId]; ok { - accessData.LastAccessTs = tsNow - accessData.requestedDataSource = requestedDataSource - accessData.LastSessionCallsCounter++ + var accessToModify *ClientAccess + if access, ok := ac.accessIds[accessId]; ok { + accessToModify = access - ac.accessIds[accessId] = accessData } + if access, ok := ac.authorizationTokens[authorizationToken]; ok { + accessToModify = access + } + + accessToModify.LastAccessTs = tsNow + accessToModify.requestedDataSource = requestedDataSource + accessToModify.LastSessionCallsCounter++ + ac.mux.Unlock() } // Add new access ID with data to cache -func (ac *AccessCache) AddAccessIdToCache(clientAccess ClientAccess, tsNow int64) { +func (ac *AccessCache) AddAccessToCache(clientAccess ClientAccess, tsNow int64) { ac.mux.Lock() - ac.accessIds[clientAccess.ClientResourceData.AccessID] = ClientAccess{ - ResourceID: clientAccess.ResourceID, + access := ClientAccess{ + ResourceID: clientAccess.ResourceID, + authorizationToken: clientAccess.authorizationToken, ClientResourceData: ClientResourceData{ UserID: clientAccess.ClientResourceData.UserID, @@ -83,6 +110,8 @@ func (ac *AccessCache) AddAccessIdToCache(clientAccess ClientAccess, tsNow int64 PeriodStartTs: clientAccess.ClientResourceData.PeriodStartTs, MaxCallsPerPeriod: clientAccess.ClientResourceData.MaxCallsPerPeriod, CallsPerPeriod: clientAccess.ClientResourceData.CallsPerPeriod, + + Type: clientAccess.ClientResourceData.Type, }, LastAccessTs: tsNow, @@ -91,6 +120,11 @@ func (ac *AccessCache) AddAccessIdToCache(clientAccess ClientAccess, tsNow int64 requestedDataSource: clientAccess.requestedDataSource, } + + ac.accessIds[clientAccess.ClientResourceData.AccessID] = &access + if clientAccess.authorizationToken != "" { + ac.authorizationTokens[clientAccess.authorizationToken] = &access + } ac.mux.Unlock() } @@ -100,9 +134,14 @@ func (ac *AccessCache) Cleanup() (int64, int64) { tsNow := time.Now().Unix() ac.mux.Lock() + for aId, clientAccess := range ac.accessIds { + totalAccessIds++ + removedUserId := "" + if tsNow-clientAccess.LastAccessTs > NB_CACHE_ACCESS_ID_LIFETIME { // Remove clients who is not active for NB_CACHE_ACCESS_ID_LIFETIME lifetime period + removedUserId = clientAccess.ClientResourceData.UserID delete(ac.accessIds, aId) removedAccessIds++ err := clientAccess.UpdateClientResourceCallCounter(tsNow) @@ -110,6 +149,7 @@ func (ac *AccessCache) Cleanup() (int64, int64) { log.Printf("Unable to update Brood resource, err: %v\n", err) } } else if tsNow-clientAccess.LastSessionAccessTs > NB_CACHE_ACCESS_ID_SESSION_LIFETIME { + removedUserId = clientAccess.ClientResourceData.UserID // Remove clients with too long sessions, greater then NB_CACHE_ACCESS_ID_SESSION_LIFETIME delete(ac.accessIds, aId) removedAccessIds++ @@ -117,12 +157,21 @@ func (ac *AccessCache) Cleanup() (int64, int64) { if err != nil { log.Printf("Unable to update Brood resource, err: %v\n", err) } - } else { - totalAccessIds++ + } + + if removedUserId != "" { + for aToken, clientAccess := range ac.authorizationTokens { + if clientAccess.ClientResourceData.UserID == removedUserId { + delete(ac.authorizationTokens, aToken) + } + } + removedUserId = "" } } ac.mux.Unlock() + totalAccessIds = totalAccessIds - removedAccessIds + return removedAccessIds, totalAccessIds } @@ -131,7 +180,7 @@ func initCacheCleaning(debug bool) { for { select { case <-t.C: - removedAccessIds, totalAccessIds := accessIdCache.Cleanup() + removedAccessIds, totalAccessIds := accessCache.Cleanup() if debug { log.Printf("Removed %d clients from access cache", removedAccessIds) } @@ -140,6 +189,102 @@ func initCacheCleaning(debug bool) { } } +func parseClientAccess(resource brood.Resource) (*ClientAccess, error) { + var clientAccess ClientAccess + + resourceData, err := json.Marshal(resource.ResourceData) + if err != nil { + return nil, err + } + clientAccess.ResourceID = resource.Id + err = json.Unmarshal(resourceData, &clientAccess.ClientResourceData) + if err != nil { + return nil, err + } + + return &clientAccess, nil +} + +// fetchResources fetch resources with access ID or authorization token and generate new one if there no one +func fetchResource(accessID, authorizationToken string, tsNow int64) (*brood.Resource, error) { + var err error + var resources brood.Resources + + queryParameters := map[string]string{"type": BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS} + if accessID != "" { + queryParameters["access_id"] = accessID + } + + token := NB_CONTROLLER_TOKEN + if authorizationToken != "" { + token = authorizationToken + } + + resources, err = bugoutClient.Brood.GetResources( + token, + NB_APPLICATION_ID, + queryParameters, + ) + if err != nil { + log.Printf("Unable to get resources, err: %v", err) + return nil, fmt.Errorf("unable to get access identifiers") + } + + if len(resources.Resources) == 0 { + if authorizationToken != "" { + // Generate new autogenerated access resource with default parameters and grant user permissions to work with it + user, err := bugoutClient.Brood.GetUser(authorizationToken) + if err != nil { + log.Printf("Unable to get user, err: %v", err) + return nil, fmt.Errorf("unable to find user with provided authorization token") + } + newResource, err := bugoutClient.Brood.CreateResource( + NB_CONTROLLER_TOKEN, NB_APPLICATION_ID, ClientResourceData{ + UserID: user.Id, + AccessID: uuid.New().String(), + Name: user.Username, + Description: "Autogenerated access ID", + BlockchainAccess: true, + ExtendedMethods: false, + + PeriodDuration: 86400, + PeriodStartTs: tsNow, + MaxCallsPerPeriod: 1000, + CallsPerPeriod: 0, + + Type: BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS, + }, + ) + if err != nil { + log.Printf("Unable to create resource with autogenerated access for user with ID %s, err: %v", user.Id, err) + return nil, fmt.Errorf("unable to create resource with autogenerated access for user") + } + + resourceHolderPermissions, err := bugoutClient.Brood.AddResourceHolderPermissions( + NB_CONTROLLER_TOKEN, newResource.Id, brood.ResourceHolder{ + Id: user.Id, + HolderType: "user", + Permissions: []string{"read", "update", "delete"}, + }, + ) + if err != nil { + log.Printf("Unable to grant permissions to user with ID %s at resource with ID %s, err: %v", newResource.Id, user.Id, err) + return nil, fmt.Errorf("unable to create resource with autogenerated access for user") + } + + log.Printf("Created new resource with ID %s with autogenerated access for user with ID %s", resourceHolderPermissions.ResourceId, user.Id) + resources.Resources = append(resources.Resources, newResource) + } else { + return nil, fmt.Errorf("there are no provided access identifier") + } + } else if len(resources.Resources) > 1 { + // TODO(kompotkot): Write support of multiple resources, be careful, because NB_CONTROLLER has several resources + return nil, fmt.Errorf("there are no provided access identifier") + } + + return &resources.Resources[0], nil +} + // Extract access_id from header and query. Query takes precedence over header. func extractAccessID(r *http.Request) string { var accessID string @@ -289,23 +434,39 @@ func accessMiddleware(next http.Handler) http.Handler { accessID := extractAccessID(r) requestedDataSource := extractRequestedDataSource(r) - if accessID == "" { - http.Error(w, "No access id passed with request", http.StatusForbidden) - return + // Extract Authorization token if Bearer header provided + var authorizationTokenRaw string + authorizationTokenHeaders := r.Header[strings.Title("authorization")] + for _, h := range authorizationTokenHeaders { + authorizationTokenRaw = h + } + var authorizationToken string + if authorizationTokenRaw != "" { + authorizationTokenSlice := strings.Split(authorizationTokenRaw, " ") + if len(authorizationTokenSlice) != 2 || authorizationTokenSlice[0] != "Bearer" || authorizationTokenSlice[1] == "" { + http.Error(w, "Wrong authorization token provided", http.StatusForbidden) + return + } + authorizationToken = authorizationTokenSlice[1] } tsNow := time.Now().Unix() + if accessID == "" && authorizationToken == "" { + http.Error(w, "No access ID or authorization header passed with request", http.StatusForbidden) + return + } + // If access id does not belong to internal crawlers, then check cache or find it in Bugout resources - if accessID == NB_CONTROLLER_ACCESS_ID { + if accessID != "" && accessID == NB_CONTROLLER_ACCESS_ID { currentClientAccess = internalUsageAccess if stateCLI.enableDebugFlag { log.Printf("Access ID belongs to internal usage for user with ID %s", currentClientAccess.ClientResourceData.UserID) } currentClientAccess.LastAccessTs = tsNow currentClientAccess.requestedDataSource = requestedDataSource - } else if accessIdCache.FindAccessIdInCache(accessID) != "" { - currentClientAccess = accessIdCache.accessIds[accessID] + } else if accessID != "" && accessCache.FindAccessIdInCache(accessID) != "" { + currentClientAccess = *accessCache.accessIds[accessID] if stateCLI.enableDebugFlag { log.Printf("Access ID found in cache for user with ID %s", currentClientAccess.ClientResourceData.UserID) } @@ -316,41 +477,37 @@ func accessMiddleware(next http.Handler) http.Handler { return } currentClientAccess.requestedDataSource = requestedDataSource - accessIdCache.UpdateAccessIdAtCache(accessID, requestedDataSource, tsNow) + accessCache.UpdateAccessAtCache(accessID, authorizationToken, requestedDataSource, tsNow) + } else if accessID == "" && accessCache.FindAuthorizationTokenInCache(authorizationToken) != "" { + currentClientAccess = *accessCache.authorizationTokens[authorizationToken] + // Check if limit of calls not exceeded + isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow) + if !isClientAllowedToGetAccess { + http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden) + return + } + fmt.Println(currentClientAccess.ResourceID) + currentClientAccess.requestedDataSource = requestedDataSource + accessCache.UpdateAccessAtCache(accessID, authorizationToken, requestedDataSource, tsNow) } else { if stateCLI.enableDebugFlag { - log.Printf("New access id, looking at Brood resources") + log.Printf("No access in cache found, looking at Brood resources") } - resources, err := bugoutClient.Brood.GetResources( - NB_CONTROLLER_TOKEN, - NB_APPLICATION_ID, - map[string]string{"access_id": accessID}, - ) + + resource, err := fetchResource(accessID, authorizationToken, tsNow) if err != nil { - http.Error(w, "Unable to get user with provided access identifier", http.StatusForbidden) - return - } - resourcesLen := len(resources.Resources) - if resourcesLen == 0 { - http.Error(w, "User with provided access identifier not found", http.StatusForbidden) - return - } - if resourcesLen > 1 { - http.Error(w, "User with provided access identifier has several access IDs", http.StatusInternalServerError) + http.Error(w, fmt.Sprintf("%v", err), http.StatusForbidden) return } - resourceData, err := json.Marshal(resources.Resources[0].ResourceData) + + clientAccess, err := parseClientAccess(*resource) if err != nil { - http.Error(w, "Unable to encode resource data interface to json", http.StatusInternalServerError) + http.Error(w, "Unable to decode resource data to access identifier", http.StatusInternalServerError) return } - currentClientAccess.ResourceID = resources.Resources[0].Id + currentClientAccess = ClientAccess(*clientAccess) + currentClientAccess.authorizationToken = authorizationToken currentClientAccess.requestedDataSource = requestedDataSource - err = json.Unmarshal(resourceData, ¤tClientAccess.ClientResourceData) - if err != nil { - http.Error(w, "Unable to decode resource data json to structure", http.StatusInternalServerError) - return - } // Check if limit of calls not exceeded isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow) @@ -358,7 +515,7 @@ func accessMiddleware(next http.Handler) http.Handler { http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden) return } - accessIdCache.AddAccessIdToCache(currentClientAccess, tsNow) + accessCache.AddAccessToCache(currentClientAccess, tsNow) } ctxUser := context.WithValue(r.Context(), "currentClientAccess", currentClientAccess) From bfdc2eb7187cff7b51c745343ce07fad63aba7f1 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Wed, 24 May 2023 16:49:41 +0000 Subject: [PATCH 07/10] Optimized cache reuse and set default values in config --- .../node_balancer/cmd/nodebalancer/configs.go | 5 +- .../cmd/nodebalancer/middleware.go | 115 ++++++++++-------- nodes/node_balancer/go.mod | 2 +- nodes/node_balancer/go.sum | 4 +- 4 files changed, 70 insertions(+), 56 deletions(-) diff --git a/nodes/node_balancer/cmd/nodebalancer/configs.go b/nodes/node_balancer/cmd/nodebalancer/configs.go index 4c6df3c2b..4cada00f6 100644 --- a/nodes/node_balancer/cmd/nodebalancer/configs.go +++ b/nodes/node_balancer/cmd/nodebalancer/configs.go @@ -48,7 +48,10 @@ var ( HUMBUG_REPORTER_NB_TOKEN = os.Getenv("HUMBUG_REPORTER_NB_TOKEN") // Moonstream resources types - BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS = "nodebalancer-access" + BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS = "nodebalancer-access" + DEFAULT_AUTOGENERATED_USER_PERMISSIONS = []string{"read", "update", "delete"} + DEFAULT_AUTOGENERATED_PERIOD_DURATION = int64(86400) + DEFAULT_AUTOGENERATED_MAX_CALLS_PER_PERIOD = int64(1000) ) func CheckEnvVarSet() { diff --git a/nodes/node_balancer/cmd/nodebalancer/middleware.go b/nodes/node_balancer/cmd/nodebalancer/middleware.go index bd41f1fbe..c9a85c885 100644 --- a/nodes/node_balancer/cmd/nodebalancer/middleware.go +++ b/nodes/node_balancer/cmd/nodebalancer/middleware.go @@ -25,6 +25,11 @@ var ( accessCache AccessCache ) +// AccessCache caches client identification for fast access to nodes +// +// If authorization passed with Bearer token, then it triggers to fetch Brood resource with access ID +// or create new one. After it under key `accessIds` and `authorizationTokens` will be added similar +// address pointers to one `ClientAccess`. type AccessCache struct { accessIds map[string]*ClientAccess authorizationTokens map[string]*ClientAccess @@ -40,30 +45,30 @@ func CreateAccessCache() { } } -// Get access id from cache if exists -func (ac *AccessCache) FindAccessIdInCache(accessId string) string { - var detectedId string +// FindAccessIdInCache looking for user access in `accessIds` cache +func (ac *AccessCache) isAccessIdInCache(accessId string) bool { + detected := false ac.mux.RLock() for id := range ac.accessIds { if id == accessId { - detectedId = id + detected = true break } } ac.mux.RUnlock() - return detectedId + return detected } -// Get access id from cache if exists -func (ac *AccessCache) FindAuthorizationTokenInCache(authorizationToken string) string { - var detected string +// FindAuthorizationTokenInCache looking for user access in `authorizationTokens` cache +func (ac *AccessCache) isAuthorizationTokenInCache(authorizationToken string) bool { + detected := false ac.mux.RLock() for id := range ac.authorizationTokens { if id == authorizationToken { - detected = id + detected = true break } } @@ -72,16 +77,22 @@ func (ac *AccessCache) FindAuthorizationTokenInCache(authorizationToken string) return detected } -// Update last call access timestamp and datasource for access id +// Update last call access timestamp and datasource for user access func (ac *AccessCache) UpdateAccessAtCache(accessId, authorizationToken, requestedDataSource string, tsNow int64) { ac.mux.Lock() var accessToModify *ClientAccess - if access, ok := ac.accessIds[accessId]; ok { - accessToModify = access + if accessId != "" { + if access, ok := ac.accessIds[accessId]; ok { + accessToModify = access + + } } - if access, ok := ac.authorizationTokens[authorizationToken]; ok { - accessToModify = access + + if authorizationToken != "" { + if access, ok := ac.authorizationTokens[authorizationToken]; ok { + accessToModify = access + } } accessToModify.LastAccessTs = tsNow @@ -91,10 +102,10 @@ func (ac *AccessCache) UpdateAccessAtCache(accessId, authorizationToken, request ac.mux.Unlock() } -// Add new access ID with data to cache +// Add new user access identifier with data to cache func (ac *AccessCache) AddAccessToCache(clientAccess ClientAccess, tsNow int64) { ac.mux.Lock() - access := ClientAccess{ + access := &ClientAccess{ ResourceID: clientAccess.ResourceID, authorizationToken: clientAccess.authorizationToken, @@ -121,9 +132,9 @@ func (ac *AccessCache) AddAccessToCache(clientAccess ClientAccess, tsNow int64) requestedDataSource: clientAccess.requestedDataSource, } - ac.accessIds[clientAccess.ClientResourceData.AccessID] = &access + ac.accessIds[clientAccess.ClientResourceData.AccessID] = access if clientAccess.authorizationToken != "" { - ac.authorizationTokens[clientAccess.authorizationToken] = &access + ac.authorizationTokens[clientAccess.authorizationToken] = access } ac.mux.Unlock() } @@ -189,22 +200,6 @@ func initCacheCleaning(debug bool) { } } -func parseClientAccess(resource brood.Resource) (*ClientAccess, error) { - var clientAccess ClientAccess - - resourceData, err := json.Marshal(resource.ResourceData) - if err != nil { - return nil, err - } - clientAccess.ResourceID = resource.Id - err = json.Unmarshal(resourceData, &clientAccess.ClientResourceData) - if err != nil { - return nil, err - } - - return &clientAccess, nil -} - // fetchResources fetch resources with access ID or authorization token and generate new one if there no one func fetchResource(accessID, authorizationToken string, tsNow int64) (*brood.Resource, error) { var err error @@ -247,9 +242,9 @@ func fetchResource(accessID, authorizationToken string, tsNow int64) (*brood.Res BlockchainAccess: true, ExtendedMethods: false, - PeriodDuration: 86400, + PeriodDuration: DEFAULT_AUTOGENERATED_PERIOD_DURATION, PeriodStartTs: tsNow, - MaxCallsPerPeriod: 1000, + MaxCallsPerPeriod: DEFAULT_AUTOGENERATED_MAX_CALLS_PER_PERIOD, CallsPerPeriod: 0, Type: BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS, @@ -264,7 +259,7 @@ func fetchResource(accessID, authorizationToken string, tsNow int64) (*brood.Res NB_CONTROLLER_TOKEN, newResource.Id, brood.ResourceHolder{ Id: user.Id, HolderType: "user", - Permissions: []string{"read", "update", "delete"}, + Permissions: DEFAULT_AUTOGENERATED_USER_PERMISSIONS, }, ) if err != nil { @@ -459,18 +454,17 @@ func accessMiddleware(next http.Handler) http.Handler { // If access id does not belong to internal crawlers, then check cache or find it in Bugout resources if accessID != "" && accessID == NB_CONTROLLER_ACCESS_ID { - currentClientAccess = internalUsageAccess if stateCLI.enableDebugFlag { log.Printf("Access ID belongs to internal usage for user with ID %s", currentClientAccess.ClientResourceData.UserID) } + currentClientAccess = internalUsageAccess currentClientAccess.LastAccessTs = tsNow currentClientAccess.requestedDataSource = requestedDataSource - } else if accessID != "" && accessCache.FindAccessIdInCache(accessID) != "" { - currentClientAccess = *accessCache.accessIds[accessID] + } else if accessID != "" && accessCache.isAccessIdInCache(accessID) { if stateCLI.enableDebugFlag { log.Printf("Access ID found in cache for user with ID %s", currentClientAccess.ClientResourceData.UserID) } - // Check if limit of calls not exceeded + currentClientAccess = *accessCache.accessIds[accessID] isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow) if !isClientAllowedToGetAccess { http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden) @@ -478,20 +472,21 @@ func accessMiddleware(next http.Handler) http.Handler { } currentClientAccess.requestedDataSource = requestedDataSource accessCache.UpdateAccessAtCache(accessID, authorizationToken, requestedDataSource, tsNow) - } else if accessID == "" && accessCache.FindAuthorizationTokenInCache(authorizationToken) != "" { + } else if accessID == "" && accessCache.isAuthorizationTokenInCache(authorizationToken) { + if stateCLI.enableDebugFlag { + log.Printf("Client connected with Authorization token") + } currentClientAccess = *accessCache.authorizationTokens[authorizationToken] - // Check if limit of calls not exceeded isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow) if !isClientAllowedToGetAccess { http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden) return } - fmt.Println(currentClientAccess.ResourceID) currentClientAccess.requestedDataSource = requestedDataSource accessCache.UpdateAccessAtCache(accessID, authorizationToken, requestedDataSource, tsNow) } else { if stateCLI.enableDebugFlag { - log.Printf("No access in cache found, looking at Brood resources") + log.Printf("No access identity found in cache, looking at Brood resources") } resource, err := fetchResource(accessID, authorizationToken, tsNow) @@ -500,22 +495,38 @@ func accessMiddleware(next http.Handler) http.Handler { return } - clientAccess, err := parseClientAccess(*resource) + var clientAccessRaw ClientAccess + resourceData, err := json.Marshal(resource.ResourceData) + if err != nil { + http.Error(w, "Unable to parse resource data to access identifier", http.StatusInternalServerError) + return + } + err = json.Unmarshal(resourceData, &clientAccessRaw.ClientResourceData) if err != nil { http.Error(w, "Unable to decode resource data to access identifier", http.StatusInternalServerError) return } - currentClientAccess = ClientAccess(*clientAccess) - currentClientAccess.authorizationToken = authorizationToken - currentClientAccess.requestedDataSource = requestedDataSource - // Check if limit of calls not exceeded - isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow) + isClientAllowedToGetAccess := clientAccessRaw.CheckClientCallPeriodLimits(tsNow) if !isClientAllowedToGetAccess { http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden) return } - accessCache.AddAccessToCache(currentClientAccess, tsNow) + currentClientAccess = ClientAccess(clientAccessRaw) + currentClientAccess.ResourceID = resource.Id + currentClientAccess.authorizationToken = authorizationToken + currentClientAccess.requestedDataSource = requestedDataSource + + // If client logged in before with access ID and it exists in cache, then re-use it + // else create new instances in cache + if authorizationToken != "" && accessCache.isAccessIdInCache(currentClientAccess.ClientResourceData.AccessID) { + accessCache.authorizationTokens[authorizationToken] = accessCache.accessIds[currentClientAccess.ClientResourceData.AccessID] + } else { + if stateCLI.enableDebugFlag { + log.Printf("Adding new access identifier in cache") + } + accessCache.AddAccessToCache(currentClientAccess, tsNow) + } } ctxUser := context.WithValue(r.Context(), "currentClientAccess", currentClientAccess) diff --git a/nodes/node_balancer/go.mod b/nodes/node_balancer/go.mod index afe71f5ca..b812d40f2 100644 --- a/nodes/node_balancer/go.mod +++ b/nodes/node_balancer/go.mod @@ -3,7 +3,7 @@ module github.com/bugout-dev/moonstream/nodes/node_balancer go 1.17 require ( - github.com/bugout-dev/bugout-go v0.4.1 + github.com/bugout-dev/bugout-go v0.4.2 github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205 github.com/google/uuid v1.3.0 ) diff --git a/nodes/node_balancer/go.sum b/nodes/node_balancer/go.sum index 95070b9c3..18e9cda61 100644 --- a/nodes/node_balancer/go.sum +++ b/nodes/node_balancer/go.sum @@ -23,8 +23,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= -github.com/bugout-dev/bugout-go v0.4.1 h1:idZ4k+/skHj217/q8OmHBoYdzwJrqCY5Vd7S8FM6zlo= -github.com/bugout-dev/bugout-go v0.4.1/go.mod h1:P4+788iHtt/32u2wIaRTaiXTWpvSVBYxZ01qQ8N7eB8= +github.com/bugout-dev/bugout-go v0.4.2 h1:oADFQzZ4iZeQOz8dDaO/+25eQkrCYG8SqjA8mRSQl7k= +github.com/bugout-dev/bugout-go v0.4.2/go.mod h1:P4+788iHtt/32u2wIaRTaiXTWpvSVBYxZ01qQ8N7eB8= github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205 h1:UQ7XGjvoOVKGRIuTFXgqGtU/UgMOk8+ikpoHWrWefjQ= github.com/bugout-dev/humbug/go v0.0.0-20211206230955-57607cd2d205/go.mod h1:U/NXHfc3tzGeQz+xVfpifXdPZi7p6VV8xdP/4ZKeWJU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= From 43d7e4a8073423bcb73e124875d971032008f04b Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 25 May 2023 11:19:46 +0000 Subject: [PATCH 08/10] Small fixes for access cache in nb --- .../cmd/nodebalancer/access_cache_test.go | 49 ++++++++++++++++ .../cmd/nodebalancer/clients_test.go | 9 ++- .../cmd/nodebalancer/middleware.go | 58 +++++++++++-------- nodes/node_balancer/sample.env | 3 + 4 files changed, 91 insertions(+), 28 deletions(-) create mode 100644 nodes/node_balancer/cmd/nodebalancer/access_cache_test.go diff --git a/nodes/node_balancer/cmd/nodebalancer/access_cache_test.go b/nodes/node_balancer/cmd/nodebalancer/access_cache_test.go new file mode 100644 index 000000000..4772a1f87 --- /dev/null +++ b/nodes/node_balancer/cmd/nodebalancer/access_cache_test.go @@ -0,0 +1,49 @@ +package main + +import ( + "testing" + "time" +) + +func accessCacheSetupSuit(t *testing.T) func(t *testing.T) { + t.Log("Setup suit") + + CreateAccessCache() + + return func(t *testing.T) { + t.Log("Teardown suit") + } +} + +func TestAddAccessToCache(t *testing.T) { + teardownSuit := accessCacheSetupSuit(t) + defer teardownSuit(t) + + tsNow := time.Now().Unix() + + var cases = []struct { + prop ClientAccess + expected bool + }{ + { + prop: ClientAccess{ClientResourceData: ClientResourceData{AccessID: "7378e2b2-b6ac-4738-bf34-fe39aa0d19e9"}}, + expected: true, + }, + { + prop: ClientAccess{ClientResourceData: ClientResourceData{AccessID: "000000000000000000000000000000000000"}}, + expected: false, + }, + { + prop: ClientAccess{ClientResourceData: ClientResourceData{Name: "name-1"}}, + expected: false, + }, + } + for _, c := range cases { + accessId := c.prop.ClientResourceData.AccessID + accessCache.AddAccessToCache(c.prop, tsNow) + if accessCache.isAccessIdInCache(accessId) != c.expected { + t.Logf("Access %s not found in access cache", accessId) + t.Fatal() + } + } +} diff --git a/nodes/node_balancer/cmd/nodebalancer/clients_test.go b/nodes/node_balancer/cmd/nodebalancer/clients_test.go index 4eecd1328..5d19d8715 100644 --- a/nodes/node_balancer/cmd/nodebalancer/clients_test.go +++ b/nodes/node_balancer/cmd/nodebalancer/clients_test.go @@ -1,4 +1,3 @@ -// TODO(kompotkot): Re-write tests for client package main import ( @@ -7,7 +6,7 @@ import ( "time" ) -func setupSuit(t *testing.T) func(t *testing.T) { +func clientsSetupSuit(t *testing.T) func(t *testing.T) { t.Log("Setup suit") supportedBlockchains = map[string]bool{"ethereum": true} @@ -18,7 +17,7 @@ func setupSuit(t *testing.T) func(t *testing.T) { } func TestAddClientNode(t *testing.T) { - teardownSuit := setupSuit(t) + teardownSuit := clientsSetupSuit(t) defer teardownSuit(t) var cases = []struct { @@ -44,7 +43,7 @@ func TestAddClientNode(t *testing.T) { } func TestGetClientNode(t *testing.T) { - teardownSuit := setupSuit(t) + teardownSuit := clientsSetupSuit(t) defer teardownSuit(t) ts := time.Now().Unix() @@ -75,7 +74,7 @@ func TestGetClientNode(t *testing.T) { } func TestCleanInactiveClientNodes(t *testing.T) { - teardownSuit := setupSuit(t) + teardownSuit := clientsSetupSuit(t) defer teardownSuit(t) ts := time.Now().Unix() diff --git a/nodes/node_balancer/cmd/nodebalancer/middleware.go b/nodes/node_balancer/cmd/nodebalancer/middleware.go index c9a85c885..486e6d976 100644 --- a/nodes/node_balancer/cmd/nodebalancer/middleware.go +++ b/nodes/node_balancer/cmd/nodebalancer/middleware.go @@ -103,7 +103,13 @@ func (ac *AccessCache) UpdateAccessAtCache(accessId, authorizationToken, request } // Add new user access identifier with data to cache -func (ac *AccessCache) AddAccessToCache(clientAccess ClientAccess, tsNow int64) { +func (ac *AccessCache) AddAccessToCache(clientAccess ClientAccess, tsNow int64) error { + _, err := uuid.Parse(clientAccess.ClientResourceData.AccessID) + if err != nil { + log.Printf("Access ID %s is not valid UUID, err: %v", clientAccess.ClientResourceData.AccessID, err) + return fmt.Errorf("access ID is not valid UUID") + } + ac.mux.Lock() access := &ClientAccess{ ResourceID: clientAccess.ResourceID, @@ -137,6 +143,8 @@ func (ac *AccessCache) AddAccessToCache(clientAccess ClientAccess, tsNow int64) ac.authorizationTokens[clientAccess.authorizationToken] = access } ac.mux.Unlock() + + return nil } // Check each access id in cache if it exceeds lifetime @@ -200,10 +208,9 @@ func initCacheCleaning(debug bool) { } } -// fetchResources fetch resources with access ID or authorization token and generate new one if there no one -func fetchResource(accessID, authorizationToken string, tsNow int64) (*brood.Resource, error) { +// fetchClientAccessFromResources get resources with access ID or authorization token and generate new one if there no one +func fetchClientAccessFromResources(accessID, authorizationToken string, tsNow int64) (*ClientAccess, error) { var err error - var resources brood.Resources queryParameters := map[string]string{"type": BUGOUT_RESOURCE_TYPE_NODEBALANCER_ACCESS} if accessID != "" { @@ -215,6 +222,7 @@ func fetchResource(accessID, authorizationToken string, tsNow int64) (*brood.Res token = authorizationToken } + var resources brood.Resources resources, err = bugoutClient.Brood.GetResources( token, NB_APPLICATION_ID, @@ -277,7 +285,20 @@ func fetchResource(accessID, authorizationToken string, tsNow int64) (*brood.Res return nil, fmt.Errorf("there are no provided access identifier") } - return &resources.Resources[0], nil + var clientAccessRaw ClientAccess + resourceData, err := json.Marshal(&resources.Resources[0].ResourceData) + if err != nil { + log.Printf("Unable to parse resource data to access identifier, err: %v", err) + return nil, fmt.Errorf("unable to parse resource data to access identifier") + } + err = json.Unmarshal(resourceData, &clientAccessRaw.ClientResourceData) + if err != nil { + log.Printf("Unable to decode resource data to access identifier, err: %v", err) + return nil, fmt.Errorf("unable to decode resource data to access identifier") + } + clientAccessRaw.ResourceID = resources.Resources[0].Id + + return &clientAccessRaw, nil } // Extract access_id from header and query. Query takes precedence over header. @@ -445,13 +466,13 @@ func accessMiddleware(next http.Handler) http.Handler { authorizationToken = authorizationTokenSlice[1] } - tsNow := time.Now().Unix() - if accessID == "" && authorizationToken == "" { http.Error(w, "No access ID or authorization header passed with request", http.StatusForbidden) return } + tsNow := time.Now().Unix() + // If access id does not belong to internal crawlers, then check cache or find it in Bugout resources if accessID != "" && accessID == NB_CONTROLLER_ACCESS_ID { if stateCLI.enableDebugFlag { @@ -489,31 +510,18 @@ func accessMiddleware(next http.Handler) http.Handler { log.Printf("No access identity found in cache, looking at Brood resources") } - resource, err := fetchResource(accessID, authorizationToken, tsNow) + clientAccessRaw, err := fetchClientAccessFromResources(accessID, authorizationToken, tsNow) if err != nil { http.Error(w, fmt.Sprintf("%v", err), http.StatusForbidden) return } - var clientAccessRaw ClientAccess - resourceData, err := json.Marshal(resource.ResourceData) - if err != nil { - http.Error(w, "Unable to parse resource data to access identifier", http.StatusInternalServerError) - return - } - err = json.Unmarshal(resourceData, &clientAccessRaw.ClientResourceData) - if err != nil { - http.Error(w, "Unable to decode resource data to access identifier", http.StatusInternalServerError) - return - } - isClientAllowedToGetAccess := clientAccessRaw.CheckClientCallPeriodLimits(tsNow) if !isClientAllowedToGetAccess { http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden) return } - currentClientAccess = ClientAccess(clientAccessRaw) - currentClientAccess.ResourceID = resource.Id + currentClientAccess = ClientAccess(*clientAccessRaw) currentClientAccess.authorizationToken = authorizationToken currentClientAccess.requestedDataSource = requestedDataSource @@ -525,7 +533,11 @@ func accessMiddleware(next http.Handler) http.Handler { if stateCLI.enableDebugFlag { log.Printf("Adding new access identifier in cache") } - accessCache.AddAccessToCache(currentClientAccess, tsNow) + err := accessCache.AddAccessToCache(currentClientAccess, tsNow) + if err != nil { + http.Error(w, "Unable to add access ID to cache", http.StatusForbidden) + return + } } } diff --git a/nodes/node_balancer/sample.env b/nodes/node_balancer/sample.env index 1917d0235..d4dcb7d19 100644 --- a/nodes/node_balancer/sample.env +++ b/nodes/node_balancer/sample.env @@ -6,3 +6,6 @@ export NB_CONTROLLER_ACCESS_ID="" # Error humbug reporter export HUMBUG_REPORTER_NODE_BALANCER_TOKEN="" + +# Tests +export TEST_NB_AUTH_TOKEN="" From 30a1fee536c1c02a696173ad1e516b3223c6cbf9 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 25 May 2023 11:22:57 +0000 Subject: [PATCH 09/10] Bumped version of nb --- nodes/node_balancer/cmd/nodebalancer/version.go | 2 +- nodes/node_balancer/sample.env | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/nodes/node_balancer/cmd/nodebalancer/version.go b/nodes/node_balancer/cmd/nodebalancer/version.go index 0a350a652..4aeaf8772 100644 --- a/nodes/node_balancer/cmd/nodebalancer/version.go +++ b/nodes/node_balancer/cmd/nodebalancer/version.go @@ -1,3 +1,3 @@ package main -var NB_VERSION = "0.2.2" +var NB_VERSION = "0.2.3" diff --git a/nodes/node_balancer/sample.env b/nodes/node_balancer/sample.env index d4dcb7d19..1917d0235 100644 --- a/nodes/node_balancer/sample.env +++ b/nodes/node_balancer/sample.env @@ -6,6 +6,3 @@ export NB_CONTROLLER_ACCESS_ID="" # Error humbug reporter export HUMBUG_REPORTER_NODE_BALANCER_TOKEN="" - -# Tests -export TEST_NB_AUTH_TOKEN="" From f6883d87a97bcf8e2502da4677e8f8d241fc07a1 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Wed, 31 May 2023 12:05:57 +0000 Subject: [PATCH 10/10] Renamed env var for nb app id --- nodes/node_balancer/cmd/nodebalancer/cli.go | 10 +++++----- nodes/node_balancer/cmd/nodebalancer/configs.go | 2 +- nodes/node_balancer/cmd/nodebalancer/middleware.go | 4 ++-- nodes/node_balancer/cmd/nodebalancer/server.go | 2 +- nodes/node_balancer/sample.env | 2 +- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/nodes/node_balancer/cmd/nodebalancer/cli.go b/nodes/node_balancer/cmd/nodebalancer/cli.go index 6364fd98d..ebf9e70db 100644 --- a/nodes/node_balancer/cmd/nodebalancer/cli.go +++ b/nodes/node_balancer/cmd/nodebalancer/cli.go @@ -260,14 +260,14 @@ func cli() { NB_CONTROLLER_TOKEN, map[string]string{ "user_id": proposedClientResourceData.UserID, - "application_id": NB_APPLICATION_ID, + "application_id": MOONSTREAM_APPLICATION_ID, }, ) if err != nil { fmt.Printf("User does not exists, err: %v\n", err) os.Exit(1) } - resource, err := bugoutClient.Brood.CreateResource(NB_CONTROLLER_TOKEN, NB_APPLICATION_ID, proposedClientResourceData) + resource, err := bugoutClient.Brood.CreateResource(NB_CONTROLLER_TOKEN, MOONSTREAM_APPLICATION_ID, proposedClientResourceData) if err != nil { fmt.Printf("Unable to create user access, err: %v\n", err) os.Exit(1) @@ -304,7 +304,7 @@ func cli() { } resources, err := bugoutClient.Brood.GetResources( NB_CONTROLLER_TOKEN, - NB_APPLICATION_ID, + MOONSTREAM_APPLICATION_ID, queryParameters, ) if err != nil { @@ -408,7 +408,7 @@ func cli() { } resources, err := bugoutClient.Brood.GetResources( NB_CONTROLLER_TOKEN, - NB_APPLICATION_ID, + MOONSTREAM_APPLICATION_ID, queryParameters, ) if err != nil { @@ -466,7 +466,7 @@ func cli() { } resources, err := bugoutClient.Brood.GetResources( NB_CONTROLLER_TOKEN, - NB_APPLICATION_ID, + MOONSTREAM_APPLICATION_ID, queryParameters, ) if err != nil { diff --git a/nodes/node_balancer/cmd/nodebalancer/configs.go b/nodes/node_balancer/cmd/nodebalancer/configs.go index 4cada00f6..6b4679f40 100644 --- a/nodes/node_balancer/cmd/nodebalancer/configs.go +++ b/nodes/node_balancer/cmd/nodebalancer/configs.go @@ -23,7 +23,7 @@ var ( // Bugout and application configuration BUGOUT_AUTH_CALL_TIMEOUT = time.Second * 5 - NB_APPLICATION_ID = os.Getenv("NB_APPLICATION_ID") + MOONSTREAM_APPLICATION_ID = os.Getenv("MOONSTREAM_APPLICATION_ID") NB_CONTROLLER_TOKEN = os.Getenv("NB_CONTROLLER_TOKEN") NB_CONTROLLER_ACCESS_ID = os.Getenv("NB_CONTROLLER_ACCESS_ID") diff --git a/nodes/node_balancer/cmd/nodebalancer/middleware.go b/nodes/node_balancer/cmd/nodebalancer/middleware.go index 486e6d976..f91774f28 100644 --- a/nodes/node_balancer/cmd/nodebalancer/middleware.go +++ b/nodes/node_balancer/cmd/nodebalancer/middleware.go @@ -225,7 +225,7 @@ func fetchClientAccessFromResources(accessID, authorizationToken string, tsNow i var resources brood.Resources resources, err = bugoutClient.Brood.GetResources( token, - NB_APPLICATION_ID, + MOONSTREAM_APPLICATION_ID, queryParameters, ) if err != nil { @@ -242,7 +242,7 @@ func fetchClientAccessFromResources(accessID, authorizationToken string, tsNow i return nil, fmt.Errorf("unable to find user with provided authorization token") } newResource, err := bugoutClient.Brood.CreateResource( - NB_CONTROLLER_TOKEN, NB_APPLICATION_ID, ClientResourceData{ + NB_CONTROLLER_TOKEN, MOONSTREAM_APPLICATION_ID, ClientResourceData{ UserID: user.Id, AccessID: uuid.New().String(), Name: user.Username, diff --git a/nodes/node_balancer/cmd/nodebalancer/server.go b/nodes/node_balancer/cmd/nodebalancer/server.go index 7ce082789..cbddb1f5c 100644 --- a/nodes/node_balancer/cmd/nodebalancer/server.go +++ b/nodes/node_balancer/cmd/nodebalancer/server.go @@ -121,7 +121,7 @@ func Server() { // Fetch access id for internal usage (crawlers, infrastructure, etc) resources, err := bugoutClient.Brood.GetResources( NB_CONTROLLER_TOKEN, - NB_APPLICATION_ID, + MOONSTREAM_APPLICATION_ID, map[string]string{"access_id": NB_CONTROLLER_ACCESS_ID}, ) if err != nil { diff --git a/nodes/node_balancer/sample.env b/nodes/node_balancer/sample.env index 1917d0235..74f37284f 100644 --- a/nodes/node_balancer/sample.env +++ b/nodes/node_balancer/sample.env @@ -1,6 +1,6 @@ # Required environment variables for load balancer export BUGOUT_BROOD_URL="https://auth.bugout.dev" -export NB_APPLICATION_ID="" +export MOONSTREAM_APPLICATION_ID="" export NB_CONTROLLER_TOKEN="" export NB_CONTROLLER_ACCESS_ID=""