Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replication #69

Merged
merged 37 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
b7d537a
make KV EnsureActivation return multiple results
aratz-lasa May 13, 2023
3a2d756
make it compile
aratz-lasa May 13, 2023
ea0e94e
ranme isBlacklisted to isServerIdBlacklisted
aratz-lasa May 13, 2023
ad05991
change replicasNumber to extraReplicas
aratz-lasa May 14, 2023
8627689
optimize kv storage of activations
aratz-lasa May 14, 2023
1bcc122
add load balancing server state to actor reference
aratz-lasa May 14, 2023
fcbf33f
add tests + fix replication logic
aratz-lasa May 15, 2023
0e635c6
add environment test + fix cache beahvior
aratz-lasa May 15, 2023
16b6fe0
add doc to ExtraReplicas
aratz-lasa May 15, 2023
c80750d
change dedupeKey to join blacklist with ','
aratz-lasa May 16, 2023
13440ed
rename isServerIdBlacklisted to isServerIDBlacklisted
aratz-lasa May 16, 2023
d8875c7
make ActorReference a struct
aratz-lasa May 17, 2023
757dff3
update test for actor replication
aratz-lasa May 17, 2023
2e507ad
add josn tag to avoid marshalling
aratz-lasa May 17, 2023
363fc0a
fix TestReplicationRandomLoadBalancing
aratz-lasa May 17, 2023
c035f8a
fix stack overflow
aratz-lasa May 17, 2023
f4e472f
limit cache return
aratz-lasa May 17, 2023
c61cce8
update and comment caching logic
aratz-lasa May 17, 2023
dc3f156
refactor code
aratz-lasa May 17, 2023
f3c23c3
add test for activations persistence
aratz-lasa May 19, 2023
86fb820
fix actor retrieval from cache
aratz-lasa May 20, 2023
bcb9af7
merge master
aratz-lasa May 20, 2023
2c50d90
add doc to selection reasoning
aratz-lasa May 20, 2023
25cd9c8
fix actor reference marshalling test
aratz-lasa May 20, 2023
3992c70
add error log to kv get
aratz-lasa May 20, 2023
fdaff25
update require.Never and require.Eventually frequency
aratz-lasa May 20, 2023
d9d3465
update Github test timeout to 10s
aratz-lasa May 20, 2023
89cd444
increase test eventually assertion
aratz-lasa May 20, 2023
daa008f
improve heartbeat simulation
aratz-lasa May 20, 2023
1c5cbc0
reduce testEnsureActivationPersistence timeout to 5
aratz-lasa May 20, 2023
d469651
debug statement
aratz-lasa May 20, 2023
0defd63
refactor testEnsureActivationPersistence
aratz-lasa May 20, 2023
364f6ea
gofmt
aratz-lasa May 20, 2023
2b95233
fix closing of testEnsureActivationPersistence
aratz-lasa May 20, 2023
4475c26
change test timeout to 5s
aratz-lasa May 20, 2023
0b59230
fix selectionReason
aratz-lasa May 20, 2023
4e86fa6
improve doc for testEnsureActivationPersistence
aratz-lasa May 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 37 additions & 14 deletions virtual/activation_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"runtime"
"strings"
aratz-lasa marked this conversation as resolved.
Show resolved Hide resolved
"sync"
"time"

Expand Down Expand Up @@ -83,16 +84,22 @@ func (a *activationsCache) ensureActivation(
moduleID,
actorID string,

blacklistedServerID string,
extraReplicas uint64,
aratz-lasa marked this conversation as resolved.
Show resolved Hide resolved
blacklistedServerIDs []string,
) ([]types.ActorReference, error) {
// Ensure we have a short timeout when communicating with registry.
ctx, cc := context.WithTimeout(ctx, defaultActivationCacheTimeout)
defer cc()

isServerIdBlacklisted := make(map[string]bool)
aratz-lasa marked this conversation as resolved.
Show resolved Hide resolved
aratz-lasa marked this conversation as resolved.
Show resolved Hide resolved
for _, s := range blacklistedServerIDs {
isServerIdBlacklisted[s] = true
}

if a.c == nil {
// Cache disabled, load directly.
return a.ensureActivationAndUpdateCache(
ctx, namespace, moduleID, actorID, nil, blacklistedServerID)
ctx, namespace, moduleID, actorID, extraReplicas, nil, isServerIdBlacklisted, blacklistedServerIDs)
}

var (
Expand All @@ -102,18 +109,31 @@ func (a *activationsCache) ensureActivation(
bufIface, cacheKey = actorCacheKeyUnsafePooled(namespace, moduleID, actorID)
aceI, ok := a.c.Get(cacheKey)
bufPool.Put(bufIface)
// Cache miss, fill the cache.
if !ok ||

hasBlacklistedID := false
aratz-lasa marked this conversation as resolved.
Show resolved Hide resolved
if ok {
blacklistedIDs := aceI.(activationCacheEntry).blacklistedServerIDs
aratz-lasa marked this conversation as resolved.
Show resolved Hide resolved

for _, id := range blacklistedIDs {
aratz-lasa marked this conversation as resolved.
Show resolved Hide resolved
if isServerIdBlacklisted[id] {
hasBlacklistedID = true
break
}
}
}

// Cache miss or not enough replicas, fill the cache.
if !ok || (1+extraReplicas)> uint64(len(aceI.(activationCacheEntry).references)) ||
// There is an existing cache entry, however, it was satisfied by a request that did not provide
// the same blacklistedServerID we have currently. We must ignore this entry because it could be
// stale and end up routing us back to the blacklisted server ID.
(blacklistedServerID != "" && aceI.(activationCacheEntry).blacklistedServerID != blacklistedServerID) {
hasBlacklistedID {
var cachedReferences []types.ActorReference
if ok {
cachedReferences = aceI.(activationCacheEntry).references
}
return a.ensureActivationAndUpdateCache(
ctx, namespace, moduleID, actorID, cachedReferences, blacklistedServerID)
ctx, namespace, moduleID, actorID, extraReplicas, cachedReferences, isServerIdBlacklisted, blacklistedServerIDs)
}

// Cache hit, return result from cache but check if we should proactively refresh
Expand All @@ -126,7 +146,7 @@ func (a *activationsCache) ensureActivation(
go func() {
defer cc()
_, err := a.ensureActivationAndUpdateCache(
ctx, namespace, moduleID, actorID, ace.references, blacklistedServerID)
ctx, namespace, moduleID, actorID, extraReplicas, ace.references, isServerIdBlacklisted, blacklistedServerIDs)
if err != nil {
a.logger.Error(
"error refreshing activation cache in background",
Expand Down Expand Up @@ -156,8 +176,10 @@ func (a *activationsCache) ensureActivationAndUpdateCache(
moduleID,
actorID string,

extraReplicas uint64,
cachedReferences []types.ActorReference,
blacklistedServerID string,
isServerIdBlacklisted map[string]bool,
blacklistedServerIDs []string,
) ([]types.ActorReference, error) {
// Since this method is less common (cache miss) we just allocate instead of messing
// around with unsafe object pooling.
Expand All @@ -166,7 +188,7 @@ func (a *activationsCache) ensureActivationAndUpdateCache(
// Include blacklistedServerID in the dedupeKey so that "force refreshes" due to a
// server blacklist / load-shedding an actor can be initiated *after* a regular
// refresh has already started, but *before* it has completed.
dedupeKey := fmt.Sprintf("%s::%s", cacheKey, blacklistedServerID)
dedupeKey := fmt.Sprintf("%s::%s", cacheKey, strings.Join(blacklistedServerIDs, ":"))
aratz-lasa marked this conversation as resolved.
Show resolved Hide resolved
referencesI, err, _ := a.deduper.Do(dedupeKey, func() (any, error) {
var cachedServerIDs []string
for _, ref := range cachedReferences {
Expand All @@ -185,7 +207,8 @@ func (a *activationsCache) ensureActivationAndUpdateCache(
ModuleID: moduleID,
ActorID: actorID,

BlacklistedServerID: blacklistedServerID,
ExtraReplicas: extraReplicas,
BlacklistedServerIDs: blacklistedServerIDs,
CachedActivationServerIDs: cachedServerIDs,
})
// Release the semaphore as soon as we're done with the network call since the purpose
Expand All @@ -209,10 +232,10 @@ func (a *activationsCache) ensureActivationAndUpdateCache(
}

for _, ref := range references.References {
if ref.ServerID() == blacklistedServerID {
if isServerIdBlacklisted[ref.ServerID()] {
return nil, fmt.Errorf(
"[invariant violated] registry returned blacklisted server ID: %s in references",
blacklistedServerID)
blacklistedServerIDs)
aratz-lasa marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -225,7 +248,7 @@ func (a *activationsCache) ensureActivationAndUpdateCache(
references: references.References,
cachedAt: time.Now(),
registryVersionStamp: references.VersionStamp,
blacklistedServerID: blacklistedServerID,
blacklistedServerIDs: blacklistedServerIDs,
}

// a.c is internally synchronized, but we use a lock here so we can do an atomic
Expand Down Expand Up @@ -264,5 +287,5 @@ type activationCacheEntry struct {
references []types.ActorReference
cachedAt time.Time
registryVersionStamp int64
blacklistedServerID string
blacklistedServerIDs []string
}
6 changes: 3 additions & 3 deletions virtual/activations.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (a *activations) invoke(
invokePayload []byte,
isTimer bool,
) (io.ReadCloser, error) {
if err := a.isBlacklisted(reference); err != nil {
if err := a.isServerIdBlacklisted(reference); err != nil {
return nil, err
}

Expand Down Expand Up @@ -607,7 +607,7 @@ func (a *activations) close(ctx context.Context, numWorkers int) error {
return nil
}

func (a *activations) isBlacklisted(
func (a *activations) isServerIdBlacklisted(
reference types.ActorReferenceVirtual,
) error {
bufIface, cacheKey := actorCacheKeyUnsafePooled(
Expand All @@ -619,7 +619,7 @@ func (a *activations) isBlacklisted(
err := fmt.Errorf(
"actor %s is blacklisted on this server", reference.ActorID())
serverID, _ := a.getServerState()
return NewBlacklistedActivationError(err, serverID)
return NewBlacklistedActivationError(err, []string{serverID})
aratz-lasa marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions virtual/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (h *httpClient) InvokeActorRemote(

req, err := http.NewRequestWithContext(
ctx, "POST",
fmt.Sprintf("http://%s/api/v1/invoke-actor-direct", reference.Address()),
fmt.Sprintf("http://%s/api/v1/invoke-actor-direct", reference.ServerState().Address()),
bytes.NewReader(marshaled))
if err != nil {
return nil, fmt.Errorf("HTTPClient: InvokeDirect: error constructing request: %w", err)
Expand All @@ -69,7 +69,7 @@ func (h *httpClient) InvokeActorRemote(
// in statusCodeToErrorWrapper will be converted back to the proper in memory
// error type if sent by a server to a client.
if wrapper, ok := statusCodeToErrorWrapper[resp.StatusCode]; ok {
err = wrapper(err, reference.ServerID())
err = wrapper(err, []string{reference.ServerID()})
}
return nil, err
}
Expand Down
33 changes: 21 additions & 12 deletions virtual/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io/ioutil"
"net"
"runtime"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -411,28 +412,28 @@ func (r *environment) InvokeActorStream(
create types.CreateIfNotExist,
) (io.ReadCloser, error) {
resp, err := r.invokeActorStreamHelper(
ctx, namespace, actorID, moduleID, operation, payload, create, "")
ctx, namespace, actorID, moduleID, operation, payload, create, []string{})
aratz-lasa marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
return resp, nil
}

if IsBlacklistedActivationError(err) {
if isServerIdBlacklistedActivationError(err) {
aratz-lasa marked this conversation as resolved.
Show resolved Hide resolved
// If we received an error because the target server has blacklisted activations
// of this actor, then we'll invalidate our cache to force the subsequent call
// to lookup the actor's new activation location in the registry. We'll also set
// a flag on the call to the registry to indicate that the server has blacklisted
// that actor to ensure we get an activation on a different serer since the registry
// may not know about the blacklist yet.
r.activationsCache.delete(namespace, moduleID, actorID)
blacklistedServerID := err.(BlacklistedActivationErr).ServerID()
blacklistedServerIDs := err.(BlacklistedActivationErr).ServerIDs()

r.log.Warn(
"encountered blacklisted actor, forcing activation cache refresh and retrying",
slog.String("actor_id", fmt.Sprintf("%s::%s::%s", namespace, moduleID, actorID)),
slog.String("blacklisted_server_id", blacklistedServerID))
slog.Any("blacklisted_server_ids", blacklistedServerIDs))

return r.invokeActorStreamHelper(
ctx, namespace, actorID, moduleID, operation, payload, create, blacklistedServerID)
ctx, namespace, actorID, moduleID, operation, payload, create, blacklistedServerIDs)
}

return nil, err
Expand All @@ -446,7 +447,7 @@ func (r *environment) invokeActorStreamHelper(
operation string,
payload []byte,
create types.CreateIfNotExist,
blacklistedServerID string,
blacklistedServerID []string,
aratz-lasa marked this conversation as resolved.
Show resolved Hide resolved
) (io.ReadCloser, error) {
if r.isClosed() {
return nil, ErrEnvironmentClosed
Expand All @@ -468,8 +469,8 @@ func (r *environment) invokeActorStreamHelper(
}

references, err := r.activationsCache.ensureActivation(
ctx, namespace, moduleID, actorID, blacklistedServerID)
if err != nil {
ctx, namespace, moduleID, actorID, create.Options.ExtraReplicas, blacklistedServerID)
if err != nil {
return nil, fmt.Errorf("error ensuring actor activation: %w", err)
}
if len(references) == 0 {
Expand Down Expand Up @@ -764,14 +765,13 @@ func (r *environment) invokeReferences(
payload []byte,
create types.CreateIfNotExist,
) (io.ReadCloser, error) {
// TODO: Load balancing or some other strategy if the number of references is > 1?
ref := references[0]
ref := pickServerForInvocation(references, create)
if !r.opts.ForceRemoteProcedureCalls {
// First check the global localEnvironmentsRouter map for scenarios where we're
// potentially trying to communicate between multiple different in-memory
// instances of Environment.
localEnvironmentsRouterLock.RLock()
localEnv, ok := localEnvironmentsRouter[ref.Address()]
localEnv, ok := localEnvironmentsRouter[ref.ServerState().Address()]
localEnvironmentsRouterLock.RUnlock()
if ok {
return localEnv.InvokeActorDirectStream(
Expand All @@ -795,7 +795,7 @@ func (r *environment) invokeReferences(
// always return dnsregistry.Localhost as the address for all actor references and
// thus ensure that tests can be written without having to also ensure that a NOLA
// server is running on the appropriate port, among other things.
if ref.Address() == Localhost || ref.Address() == dnsregistry.Localhost {
if ref.ServerState().Address() == Localhost || ref.ServerState().Address() == dnsregistry.Localhost {
return localEnv.InvokeActorDirectStream(
ctx, versionStamp, ref.ServerID(), ref.ServerVersion(), ref,
operation, payload, create)
Expand Down Expand Up @@ -899,3 +899,12 @@ func formatActorCacheKey(
dst = append(dst, []byte(actorID)...)
return dst
}

func pickServerForInvocation(references []types.ActorReference, create types.CreateIfNotExist) types.ActorReference {
// TODO: implement invokation strategies in 'create' e.g. memory-balanced, cpu-balanced, multi-invoke...
sort.Slice(references, func(i, j int) bool {
aratz-lasa marked this conversation as resolved.
Show resolved Hide resolved
return references[i].ServerState().UsedMemory() < references[j].ServerState().UsedMemory()
})

return references[0]
}
Loading
Loading