Skip to content

Commit

Permalink
Initial commit for RW lock
Browse files Browse the repository at this point in the history
  • Loading branch information
the123saurav committed Dec 19, 2021
1 parent 44cb670 commit f22c026
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import dev.keva.ioc.annotation.Component;
import dev.keva.protocol.resp.Command;
import dev.keva.store.KevaDatabase;
import dev.keva.store.lock.SpinLock;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -71,14 +72,14 @@ public void executeExpire(byte[] key) {
data[0] = "delete".getBytes();
data[1] = key;
Command command = Command.newInstance(data, false);
Lock lock = database.getLock();
lock.lock();
SpinLock lock = database.getLock();
lock.exclusiveLock();
try {
aof.write(command);
database.remove(key);
clearExpiration(key);
} finally {
lock.unlock();
lock.exclusiveUnlock();
}
} else {
database.remove(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import dev.keva.core.command.mapping.CommandMapper;
import dev.keva.protocol.resp.Command;
import dev.keva.store.lock.SpinLock;
import dev.keva.util.hashbytes.BytesKey;
import dev.keva.util.hashbytes.BytesValue;
import dev.keva.protocol.resp.reply.MultiBulkReply;
Expand Down Expand Up @@ -42,8 +43,8 @@ public void discard() {
isQueuing = false;
}

public Reply<?> exec(ChannelHandlerContext ctx, Lock txLock) throws InterruptedException {
txLock.lock();
public Reply<?> exec(ChannelHandlerContext ctx, SpinLock txLock) throws InterruptedException {
txLock.exclusiveLock();
try {
for (val watch : watchMap.entrySet()) {
val key = watch.getKey();
Expand Down Expand Up @@ -74,7 +75,7 @@ public Reply<?> exec(ChannelHandlerContext ctx, Lock txLock) throws InterruptedE

return new MultiBulkReply(replies);
} finally {
txLock.unlock();
txLock.exclusiveUnlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,17 @@
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

@Component
@Slf4j
public class CommandMapper {

private static final Set<String> EXCLUSIVE_COMMANDS = new HashSet<>(Arrays.asList(
"exec", "expire", "expireat", "restore", "flushdb"));

@Getter
private final Map<BytesKey, CommandWrapper> methods = new HashMap<>();

Expand Down Expand Up @@ -98,25 +104,40 @@ public void init() {

try {
val lock = database.getLock();
lock.lock();
boolean locked = false, exclusive = false, writeToAOF = isAoF && isMutate;
try {
if (ctx != null && isAoF && isMutate) {
try {
aof.write(command);
} catch (Exception e) {
log.error("Error writing to AOF", e);
}
}
Object[] objects = new Object[types.length];
command.toArguments(objects, types, ctx);
if (ctx != null) {
locked = true;
if (isMutate || EXCLUSIVE_COMMANDS.contains(name)) {
lock.exclusiveLock();
exclusive = true;
if (writeToAOF) {
try {
aof.write(command);
} catch (Exception e) {
log.error("Error writing to AOF", e);
}
}
} else {
lock.sharedLock();
}
}
// If not in AOF mode, then recycle(),
// else, the command will be recycled in the AOF dump
if (!kevaConfig.getAof()) {
command.recycle();
}
return (Reply<?>) method.invoke(instance, objects);
} finally {
lock.unlock();
if (locked) {
if (exclusive) {
lock.exclusiveUnlock();
} else {
lock.sharedUnlock();
}
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
Expand Down
3 changes: 2 additions & 1 deletion store/src/main/java/dev/keva/store/KevaDatabase.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package dev.keva.store;

import dev.keva.store.lock.SpinLock;
import dev.keva.util.hashbytes.BytesKey;

import java.util.AbstractMap;
import java.util.concurrent.locks.Lock;

public interface KevaDatabase {
Lock getLock();
SpinLock getLock();

void flush();

Expand Down
Loading

0 comments on commit f22c026

Please sign in to comment.