Skip to content

Commit

Permalink
fix: Namespace read from storage, if cache misses
Browse files Browse the repository at this point in the history
  • Loading branch information
JigarJoshi committed Jul 12, 2022
1 parent 6ee8a75 commit dcb7f72
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 15 deletions.
35 changes: 31 additions & 4 deletions server/metadata/tenant.go
Expand Up @@ -192,13 +192,40 @@ func (m *TenantManager) CreateTenant(ctx context.Context, tx transaction.Tx, nam
return namespace, nil
}

func (m *TenantManager) GetNamespace(namespaceName string) Namespace {
func (m *TenantManager) GetNamespace(ctx context.Context, namespaceName string, txMgr *transaction.Manager) (Namespace, error) {
m.RLock()
defer m.RUnlock()
if tenant, found := m.tenants[namespaceName]; found {
return tenant.namespace
m.RUnlock()
return tenant.namespace, nil
}
return nil
m.RUnlock()

m.Lock()
defer m.Unlock()
if tenant, found := m.tenants[namespaceName]; found {
return tenant.namespace, nil
}
// this will never create new namespace
// this is for reading namespaces from storage into cache
tx, err := txMgr.StartTx(ctx)
if err != nil {
return nil, err
}

namespaces, err := m.encoder.GetNamespaces(ctx, tx)
if err != nil {
_ = tx.Rollback(ctx)
log.Warn().Msg("Could not get namespaces")
return nil, err
}
err = tx.Commit(ctx)
if err != nil {
return nil, err
}
if id, ok := namespaces[namespaceName]; ok {
return NewTenantNamespace(namespaceName, id), nil
}
return nil, api.Errorf(api.Code_NOT_FOUND, "Namespace not found")
}

func (m *TenantManager) createOrGetTenantInternal(ctx context.Context, tx transaction.Tx, namespace Namespace) (*Tenant, error) {
Expand Down
17 changes: 9 additions & 8 deletions server/midddleware/request_metadata.go
Expand Up @@ -39,14 +39,15 @@ func GetRequestMetadata(ctx context.Context) (*RequestMetadata, error) {
}

func setAccessToken(ctx context.Context, token *AccessToken) context.Context {
requestMetadata, err := GetRequestMetadata(ctx)
var result = ctx
if err != nil && requestMetadata == nil {
requestMetadata, _ := GetRequestMetadata(ctx)
if requestMetadata == nil {
requestMetadata = &RequestMetadata{}
result = context.WithValue(ctx, RequestMetadataCtxKey{}, requestMetadata)
requestMetadata.accessToken = token
return context.WithValue(ctx, RequestMetadataCtxKey{}, requestMetadata)
} else {
requestMetadata.accessToken = token
return context.WithValue(ctx, RequestMetadataCtxKey{}, requestMetadata)
}
requestMetadata.accessToken = token
return result
}

func setNamespace(ctx context.Context, namespace string) context.Context {
Expand All @@ -61,9 +62,9 @@ func setNamespace(ctx context.Context, namespace string) context.Context {
}
func GetAccessToken(ctx context.Context) (*AccessToken, error) {
// read token
value := ctx.Value(&RequestMetadataCtxKey{})
value := ctx.Value(RequestMetadataCtxKey{})
if value != nil {
if requestMetadata, ok := value.(RequestMetadata); ok {
if requestMetadata, ok := value.(*RequestMetadata); ok {
return requestMetadata.accessToken, nil
}
}
Expand Down
4 changes: 2 additions & 2 deletions server/midddleware/request_metadata_test.go
Expand Up @@ -26,7 +26,7 @@ func TestRequestMetadata(t *testing.T) {

t.Run("extraction of namespace name", func(t *testing.T) {
ctx := context.TODO()
ctx = context.WithValue(ctx, &RequestMetadataCtxKey{}, RequestMetadata{
ctx = context.WithValue(ctx, RequestMetadataCtxKey{}, &RequestMetadata{
accessToken: &AccessToken{
Namespace: "test-namespace-1",
Sub: "test@tigrisdata.com",
Expand All @@ -41,7 +41,7 @@ func TestRequestMetadata(t *testing.T) {

t.Run("extraction of token", func(t *testing.T) {
ctx := context.TODO()
ctx = context.WithValue(ctx, &RequestMetadataCtxKey{}, RequestMetadata{
ctx = context.WithValue(ctx, RequestMetadataCtxKey{}, &RequestMetadata{
accessToken: &AccessToken{
Namespace: "test-namespace-1",
Sub: "test@tigrisdata.com",
Expand Down
5 changes: 4 additions & 1 deletion server/services/v1/sessions.go
Expand Up @@ -78,10 +78,13 @@ func (sessMgr *SessionManager) Create(ctx context.Context, reloadVerOutside bool
tenant, err = sessMgr.tenantMgr.CreateOrGetTenant(ctx, sessMgr.txMgr, metadata.NewDefaultNamespace())
} else {
// this call will validate if namespace is not already present
namespace := sessMgr.tenantMgr.GetNamespace(namespaceForThisSession)
namespace, err := sessMgr.tenantMgr.GetNamespace(ctx, namespaceForThisSession, sessMgr.txMgr)
if namespace == nil {
return nil, api.Errorf(api.Code_NOT_FOUND, "Namespace not found %s", namespaceForThisSession)
}
if err != nil {
return nil, err
}
tenant, err = sessMgr.tenantMgr.CreateOrGetTenant(ctx, sessMgr.txMgr, namespace)
if err != nil {
return nil, api.Errorf(api.Code_INTERNAL, "Could not create or get tenant from the input namespace")
Expand Down

0 comments on commit dcb7f72

Please sign in to comment.