Skip to content

Commit

Permalink
fix(redis): mitigate JedisPool depletion (#419)
Browse files Browse the repository at this point in the history
* chore(dependencies): gradle 5.4.1

* fix(redis): mitigate JedisPool depletion

Fixes redis interactions in RedisPermissionRepository that can result in
depleting the redis pool.

Specifically:
* converts smembers calls to sscan calls, and gets/returns an object
  from the pool to do so.
* removes nested calls to redisClientDelegate that were holding on
  to pool objects
* partitions lookup of users into smaller batches
* partitions role lookup

Additionally, uses InstrumentedJedisPool to enable metrics collection
on redis interactions.
  • Loading branch information
cfieber committed Jun 6, 2019
1 parent dbcd062 commit 1d0ef72
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 81 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.netflix.spinnaker.fiat.config;

import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.kork.jedis.JedisClientDelegate;
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate;
import com.netflix.spinnaker.kork.jedis.telemetry.InstrumentedJedisPool;
import java.net.URI;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -31,8 +33,9 @@ public GenericObjectPoolConfig redisPoolConfig() {
public JedisPool jedisPool(
@Value("${redis.connection:redis://localhost:6379}") String connection,
@Value("${redis.timeout:2000}") int timeout,
GenericObjectPoolConfig redisPoolConfig) {
return createPool(redisPoolConfig, connection, timeout);
GenericObjectPoolConfig redisPoolConfig,
Registry registry) {
return createPool(redisPoolConfig, connection, timeout, registry);
}

@Bean
Expand All @@ -41,7 +44,7 @@ RedisClientDelegate redisClientDelegate(JedisPool jedisPool) {
}

private static JedisPool createPool(
GenericObjectPoolConfig redisPoolConfig, String connection, int timeout) {
GenericObjectPoolConfig redisPoolConfig, String connection, int timeout, Registry registry) {
URI redisConnection = URI.create(connection);

String host = redisConnection.getHost();
Expand All @@ -62,6 +65,9 @@ private static JedisPool createPool(
redisPoolConfig = new GenericObjectPoolConfig();
}

return new JedisPool(redisPoolConfig, host, port, timeout, password, database, null);
return new InstrumentedJedisPool(
registry,
new JedisPool(redisPoolConfig, host, port, timeout, password, database, null),
"fiat");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,16 @@
import com.fasterxml.jackson.databind.util.ArrayIterator;
import com.google.common.collect.ArrayTable;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Table;
import com.netflix.spinnaker.fiat.config.UnrestrictedResourceConfig;
import com.netflix.spinnaker.fiat.model.UserPermission;
import com.netflix.spinnaker.fiat.model.resources.Resource;
import com.netflix.spinnaker.fiat.model.resources.ResourceType;
import com.netflix.spinnaker.fiat.model.resources.Role;
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.NonNull;
Expand All @@ -42,6 +40,8 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Response;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;

/**
* This Redis-backed permission repository is structured in a way to optimized reading types of
Expand Down Expand Up @@ -216,40 +216,32 @@ public Map<String, UserPermission> getAllByRoles(List<String> anyRoles) {
return new HashMap<>();
}

return redisClientDelegate.withMultiKeyPipeline(
p -> {
List<Response<Set<String>>> responses =
anyRoles.stream().map(role -> p.smembers(roleKey(role))).collect(Collectors.toList());
p.sync();

Set<String> dedupedUsernames =
responses.stream()
.flatMap(response -> response.get().stream())
.collect(Collectors.toSet());
dedupedUsernames.add(UNRESTRICTED);

Table<String, ResourceType, Response<Map<String, String>>> responseTable =
getAllFromRedis(dedupedUsernames);
if (responseTable == null) {
return new HashMap<>(0);
}

RawUserPermission rawUnrestricted =
new RawUserPermission(responseTable.row(UNRESTRICTED));
UserPermission unrestrictedUser = getUserPermission(UNRESTRICTED, rawUnrestricted);
Set<String> adminSet = getAllAdmins();

return dedupedUsernames.stream()
.map(
userId -> {
RawUserPermission rawUser = new RawUserPermission(responseTable.row(userId));
rawUser.isAdmin = adminSet.contains(userId);
return getUserPermission(userId, rawUser);
})
.collect(
Collectors.toMap(
UserPermission::getId, permission -> permission.merge(unrestrictedUser)));
});
final Set<String> dedupedUsernames = new HashSet<>();
for (String role : new HashSet<>(anyRoles)) {
dedupedUsernames.addAll(scanSet(roleKey(role)));
}
dedupedUsernames.add(UNRESTRICTED);

Table<String, ResourceType, Response<Map<String, String>>> responseTable =
getAllFromRedis(dedupedUsernames);
if (responseTable == null) {
return new HashMap<>(0);
}

RawUserPermission rawUnrestricted = new RawUserPermission(responseTable.row(UNRESTRICTED));
UserPermission unrestrictedUser = getUserPermission(UNRESTRICTED, rawUnrestricted);
Set<String> adminSet = getAllAdmins();

return dedupedUsernames.stream()
.map(
userId -> {
RawUserPermission rawUser = new RawUserPermission(responseTable.row(userId));
rawUser.isAdmin = adminSet.contains(userId);
return getUserPermission(userId, rawUser);
})
.collect(
Collectors.toMap(
UserPermission::getId, permission -> permission.merge(unrestrictedUser)));
}

private UserPermission getUserPermission(String userId, RawUserPermission raw) {
Expand All @@ -269,10 +261,8 @@ private UserPermission getUserPermission(String userId, RawUserPermission raw) {

private Table<String, ResourceType, Response<Map<String, String>>> getAllFromRedis() {
try {
return redisClientDelegate.withCommandsClient(
jedis -> {
return getAllFromRedis(jedis.smembers(allUsersKey()));
});
Set<String> allUsers = scanSet(allUsersKey());
return getAllFromRedis(allUsers);
} catch (Exception e) {
log.error("Storage exception reading all entries.", e);
return null;
Expand All @@ -286,19 +276,20 @@ private Table<String, ResourceType, Response<Map<String, String>>> getAllFromRed
}

try {
return redisClientDelegate.withMultiKeyPipeline(
p -> {
Table<String, ResourceType, Response<Map<String, String>>> responseTable =
ArrayTable.create(userIds, new ArrayIterator<>(ResourceType.values()));

for (String userId : userIds) {
for (ResourceType r : ResourceType.values()) {
responseTable.put(userId, r, p.hgetAll(userKey(userId, r)));
final Table<String, ResourceType, Response<Map<String, String>>> responseTable =
ArrayTable.create(userIds, new ArrayIterator<>(ResourceType.values()));
for (List<String> userIdSubset : Lists.partition(new ArrayList<>(userIds), 10)) {
redisClientDelegate.withMultiKeyPipeline(
p -> {
for (String userId : userIdSubset) {
for (ResourceType r : ResourceType.values()) {
responseTable.put(userId, r, p.hgetAll(userKey(userId, r)));
}
}
}
p.sync();
return responseTable;
});
p.sync();
});
}
return responseTable;
} catch (Exception e) {
log.error("Storage exception reading all entries.", e);
}
Expand All @@ -308,34 +299,47 @@ private Table<String, ResourceType, Response<Map<String, String>>> getAllFromRed
@Override
public void remove(@NonNull String id) {
try {
redisClientDelegate.withCommandsClient(
jedis -> {
Map<String, String> userRolesById = jedis.hgetAll(userKey(id, ResourceType.ROLE));

redisClientDelegate.withMultiKeyPipeline(
p -> {
p.srem(allUsersKey(), id);
for (String roleName : userRolesById.keySet()) {
p.srem(roleKey(roleName), id);
}

for (ResourceType r : ResourceType.values()) {
p.del(userKey(id, r));
}
p.srem(adminKey(), id);
p.sync();
});
Map<String, String> userRolesById =
redisClientDelegate.withCommandsClient(
jedis -> {
return jedis.hgetAll(userKey(id, ResourceType.ROLE));
});

redisClientDelegate.withMultiKeyPipeline(
p -> {
p.srem(allUsersKey(), id);
for (String roleName : userRolesById.keySet()) {
p.srem(roleKey(roleName), id);
}

for (ResourceType r : ResourceType.values()) {
p.del(userKey(id, r));
}
p.srem(adminKey(), id);
p.sync();
});
} catch (Exception e) {
log.error("Storage exception reading " + id + " entry.", e);
}
}

private Set<String> scanSet(String key) {
final Set<String> results = new HashSet<>();
final AtomicReference<String> cursor = new AtomicReference<>(ScanParams.SCAN_POINTER_START);
do {
final ScanResult<String> result =
redisClientDelegate.withCommandsClient(
jedis -> {
return jedis.sscan(key, cursor.get());
});
results.addAll(result.getResult());
cursor.set(result.getStringCursor());
} while (!"0".equals(cursor.get()));
return results;
}

private Set<String> getAllAdmins() {
return redisClientDelegate.withCommandsClient(
jedis -> {
return jedis.smembers(adminKey());
});
return scanSet(adminKey());
}

private String allUsersKey() {
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.4-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

0 comments on commit 1d0ef72

Please sign in to comment.