Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Locks improvement: RWLock #85

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/java/dev/keva/core/aof/AOFContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public List<Command> read() throws IOException {
byte[][] objects = (byte[][]) input.readObject();
commands.add(Command.newInstance(objects, false));
} catch (EOFException e) {
log.error("Error while reading AOF command", e);
fis.close();
return commands;
} catch (ClassNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import dev.keva.ioc.annotation.Component;
import dev.keva.protocol.resp.Command;
import dev.keva.store.KevaDatabase;
import dev.keva.store.lock.SpinLock;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -71,14 +72,14 @@ public void executeExpire(byte[] key) {
data[0] = "delete".getBytes();
data[1] = key;
Command command = Command.newInstance(data, false);
Lock lock = database.getLock();
lock.lock();
SpinLock lock = database.getLock();
lock.exclusiveLock();
try {
aof.write(command);
database.remove(key);
clearExpiration(key);
} finally {
lock.unlock();
lock.exclusiveUnlock();
}
} else {
database.remove(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import dev.keva.core.command.mapping.CommandMapper;
import dev.keva.protocol.resp.Command;
import dev.keva.store.lock.SpinLock;
import dev.keva.util.hashbytes.BytesKey;
import dev.keva.util.hashbytes.BytesValue;
import dev.keva.protocol.resp.reply.MultiBulkReply;
Expand Down Expand Up @@ -42,8 +43,8 @@ public void discard() {
isQueuing = false;
}

public Reply<?> exec(ChannelHandlerContext ctx, Lock txLock) throws InterruptedException {
txLock.lock();
public Reply<?> exec(ChannelHandlerContext ctx, SpinLock txLock) throws InterruptedException {
txLock.exclusiveLock();
try {
for (val watch : watchMap.entrySet()) {
val key = watch.getKey();
Expand Down Expand Up @@ -74,7 +75,7 @@ public Reply<?> exec(ChannelHandlerContext ctx, Lock txLock) throws InterruptedE

return new MultiBulkReply(replies);
} finally {
txLock.unlock();
txLock.exclusiveUnlock();
}
}
}
123 changes: 123 additions & 0 deletions core/src/main/java/dev/keva/core/command/impl/zset/ZAdd.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package dev.keva.core.command.impl.zset;

import dev.keva.core.command.annotation.CommandImpl;
import dev.keva.core.command.annotation.Execute;
import dev.keva.core.command.annotation.Mutate;
import dev.keva.core.command.annotation.ParamLength;
import dev.keva.ioc.annotation.Autowired;
import dev.keva.ioc.annotation.Component;
import dev.keva.protocol.resp.reply.BulkReply;
import dev.keva.protocol.resp.reply.ErrorReply;
import dev.keva.protocol.resp.reply.IntegerReply;
import dev.keva.protocol.resp.reply.Reply;
import dev.keva.store.KevaDatabase;
import dev.keva.util.DoubleUtil;
import dev.keva.util.hashbytes.BytesKey;

import java.nio.charset.StandardCharsets;
import java.util.AbstractMap.SimpleEntry;

import static dev.keva.util.Constants.FLAG_CH;
import static dev.keva.util.Constants.FLAG_GT;
import static dev.keva.util.Constants.FLAG_INCR;
import static dev.keva.util.Constants.FLAG_LT;
import static dev.keva.util.Constants.FLAG_NX;
import static dev.keva.util.Constants.FLAG_XX;

@Component
@CommandImpl("zadd")
@ParamLength(type = ParamLength.Type.AT_LEAST, value = 3)
@Mutate
public final class ZAdd {
axblueblader marked this conversation as resolved.
Show resolved Hide resolved
private static final String XX = "xx";
private static final String NX = "nx";
private static final String GT = "gt";
private static final String LT = "lt";
private static final String INCR = "incr";
private static final String CH = "ch";

private final KevaDatabase database;

@Autowired
public ZAdd(KevaDatabase database) {
this.database = database;
}

@Execute
public Reply<?> execute(byte[][] params) {
// Parse the flags, if any
boolean xx = false, nx = false, gt = false, lt = false, incr = false;
int argPos = 1, flags = 0;
String arg;
while (argPos < params.length) {
arg = new String(params[argPos], StandardCharsets.UTF_8);
if (XX.equalsIgnoreCase(arg)) {
xx = true;
flags |= FLAG_XX;
} else if (NX.equalsIgnoreCase(arg)) {
nx = true;
flags |= FLAG_NX;
} else if (GT.equalsIgnoreCase(arg)) {
gt = true;
flags |= FLAG_GT;
} else if (LT.equalsIgnoreCase(arg)) {
lt = true;
flags |= FLAG_LT;
} else if (INCR.equalsIgnoreCase(arg)) {
incr = true;
flags |= FLAG_INCR;
} else if (CH.equalsIgnoreCase(arg)) {
flags |= FLAG_CH;
} else {
break;
}
++argPos;
}

int numMembers = params.length - argPos;
if (numMembers % 2 != 0) {
return ErrorReply.SYNTAX_ERROR;
}
numMembers /= 2;

if (nx && xx) {
return ErrorReply.ZADD_NX_XX_ERROR;
}
if ((gt && nx) || (lt && nx) || (gt && lt)) {
return ErrorReply.ZADD_GT_LT_NX_ERROR;
}
if (incr && numMembers > 1) {
return ErrorReply.ZADD_INCR_ERROR;
}

// Parse the key and value
final SimpleEntry<Double, BytesKey>[] members = new SimpleEntry[numMembers];
double score;
String rawScore;
for (int memberIndex = 0; memberIndex < numMembers; ++memberIndex) {
try {
rawScore = new String(params[argPos++], StandardCharsets.UTF_8);
if (rawScore.equalsIgnoreCase("inf") || rawScore.equalsIgnoreCase("infinity")
|| rawScore.equalsIgnoreCase("+inf") || rawScore.equalsIgnoreCase("+infinity")
) {
score = Double.POSITIVE_INFINITY;
} else if (rawScore.equalsIgnoreCase("-inf") || rawScore.equalsIgnoreCase("-infinity")) {
score = Double.NEGATIVE_INFINITY;
} else {
score = Double.parseDouble(rawScore);
}
} catch (final NumberFormatException ignored) {
// return on first bad input
return ErrorReply.ZADD_SCORE_FLOAT_ERROR;
}
members[memberIndex] = new SimpleEntry<>(score, new BytesKey(params[argPos++]));
}

if (incr) {
Double result = database.zincrby(params[0], members[0].getKey(), members[0].getValue(), flags);
return result == null ? BulkReply.NIL_REPLY : new BulkReply(DoubleUtil.toString(result));
}
int result = database.zadd(params[0], members, flags);
return new IntegerReply(result);
}
}
36 changes: 36 additions & 0 deletions core/src/main/java/dev/keva/core/command/impl/zset/ZScore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package dev.keva.core.command.impl.zset;

import dev.keva.core.command.annotation.CommandImpl;
import dev.keva.core.command.annotation.Execute;
import dev.keva.core.command.annotation.ParamLength;
import dev.keva.ioc.annotation.Autowired;
import dev.keva.ioc.annotation.Component;
import dev.keva.protocol.resp.reply.BulkReply;
import dev.keva.store.KevaDatabase;

@Component
@CommandImpl("zscore")
@ParamLength(type = ParamLength.Type.EXACT, value = 2)
public final class ZScore {
private final KevaDatabase database;

@Autowired
public ZScore(KevaDatabase database) {
this.database = database;
}

@Execute
public BulkReply execute(byte[] key, byte[] member) {
final Double result = database.zscore(key, member);
if(result == null){
return BulkReply.NIL_REPLY;
}
if (result.equals(Double.POSITIVE_INFINITY)) {
return BulkReply.POSITIVE_INFINITY_REPLY;
}
if (result.equals(Double.NEGATIVE_INFINITY)) {
return BulkReply.NEGATIVE_INFINITY_REPLY;
}
return new BulkReply(result.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,17 @@
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

@Component
@Slf4j
public class CommandMapper {

private static final Set<String> EXCLUSIVE_COMMANDS = new HashSet<>(Arrays.asList(
tuhuynh27 marked this conversation as resolved.
Show resolved Hide resolved
"exec", "expire", "expireat", "restore", "flushdb"));

@Getter
private final Map<BytesKey, CommandWrapper> methods = new HashMap<>();

Expand Down Expand Up @@ -98,25 +104,40 @@ public void init() {

try {
val lock = database.getLock();
lock.lock();
boolean locked = false, exclusive = false, writeToAOF = isAoF && isMutate;
try {
if (ctx != null && isAoF && isMutate) {
try {
aof.write(command);
} catch (Exception e) {
log.error("Error writing to AOF", e);
}
}
Object[] objects = new Object[types.length];
command.toArguments(objects, types, ctx);
if (ctx != null) {
locked = true;
if (isMutate || EXCLUSIVE_COMMANDS.contains(name)) {
lock.exclusiveLock();
exclusive = true;
if (writeToAOF) {
try {
aof.write(command);
} catch (Exception e) {
log.error("Error writing to AOF", e);
}
}
} else {
lock.sharedLock();
}
}
// If not in AOF mode, then recycle(),
// else, the command will be recycled in the AOF dump
if (!kevaConfig.getAof()) {
command.recycle();
}
return (Reply<?>) method.invoke(instance, objects);
} finally {
lock.unlock();
if (locked) {
if (exclusive) {
lock.exclusiveUnlock();
} else {
lock.sharedUnlock();
}
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/dev/keva/core/server/AOFTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Server startServer(int port) throws Exception {
.persistence(false)
.aof(true)
.aofInterval(1000)
.workDirectory("./")
.workDirectory(System.getProperty("java.io.tmpdir"))
.build();
val server = KevaServer.of(config);
new Thread(() -> {
Expand Down
Loading