Skip to content

Commit

Permalink
Modify exec() and discard() in Transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed Jan 18, 2022
1 parent 9209b83 commit 5e1dfc3
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 44 deletions.
3 changes: 2 additions & 1 deletion src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,9 @@ public void resetState() {
// connection.resetState();
if (isInWatch) {
connection.sendCommand(UNWATCH);
connection.getStatusCodeReply();
isInWatch = false;
connection.getStatusCodeReply();
// isInWatch = false;
}
}

Expand Down
20 changes: 14 additions & 6 deletions src/main/java/redis/clients/jedis/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,26 @@ protected final void processPipelinedResponses() {

@Override
public final List<Object> exec() {
List<Object> ret = super.exec();
if (jedis != null) {
jedis.resetState();
List<Object> ret;
try {
ret = super.exec();
} finally {
if (jedis != null) {
jedis.resetState();
}
}
return ret;
}

@Override
public final String discard() {
String ret = super.discard();
if (jedis != null) {
jedis.resetState();
String ret;
try {
ret = super.discard();
} finally {
if (jedis != null) {
jedis.resetState();
}
}
return ret;
}
Expand Down
78 changes: 49 additions & 29 deletions src/main/java/redis/clients/jedis/TransactionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import redis.clients.jedis.commands.PipelineCommands;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.commands.RedisModulePipelineCommands;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.json.JsonSetParams;
import redis.clients.jedis.json.Path;
Expand All @@ -34,6 +35,7 @@ public abstract class TransactionBase extends Queable implements PipelineCommand
protected final Connection connection;
private final CommandObjects commandObjects;

private boolean broken = false;
private boolean inWatch = false;
private boolean inMulti = false;

Expand Down Expand Up @@ -93,6 +95,9 @@ public final void close() {
}

public final void clear() {
if (broken) {
return;
}
if (inMulti) {
discard();
} else if (inWatch) {
Expand All @@ -103,41 +108,56 @@ public final void clear() {
protected abstract void processPipelinedResponses();

public List<Object> exec() {
if (!inMulti) throw new IllegalStateException("EXEC without MULTI");
// ignore QUEUED or ERROR
// connection.getMany(1 + getPipelinedResponseLength());
processPipelinedResponses();
connection.sendCommand(EXEC);
inMulti = false;
inWatch = false;

List<Object> unformatted = connection.getObjectMultiBulkReply();
if (unformatted == null) {
clean();
return null;
if (!inMulti) {
throw new IllegalStateException("EXEC without MULTI");
}
List<Object> formatted = new ArrayList<>(unformatted.size());
for (Object o : unformatted) {
try {
formatted.add(generateResponse(o).get());
} catch (JedisDataException e) {
formatted.add(e);

try {
processPipelinedResponses();
connection.sendCommand(EXEC);

List<Object> unformatted = connection.getObjectMultiBulkReply();
if (unformatted == null) {
clean();
return null;
}

List<Object> formatted = new ArrayList<>(unformatted.size());
for (Object o : unformatted) {
try {
formatted.add(generateResponse(o).get());
} catch (JedisDataException e) {
formatted.add(e);
}
}
return formatted;
} catch (JedisConnectionException jce) {
broken = true;
throw jce;
} finally {
inMulti = false;
inWatch = false;
clean();
}
return formatted;
}

public String discard() {
if (!inMulti) throw new IllegalStateException("DISCARD without MULTI");
// ignore QUEUED or ERROR
// connection.getMany(1 + getPipelinedResponseLength());
processPipelinedResponses();
connection.sendCommand(DISCARD);
String status = connection.getStatusCodeReply(); // OK
inMulti = false;
inWatch = false;
clean();
return status;
if (!inMulti) {
throw new IllegalStateException("DISCARD without MULTI");
}

try {
processPipelinedResponses();
connection.sendCommand(DISCARD);
return connection.getStatusCodeReply();
} catch (JedisConnectionException jce) {
broken = true;
throw jce;
} finally {
inMulti = false;
inWatch = false;
clean();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import static org.junit.Assert.*;

import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.any;
import static redis.clients.jedis.Protocol.Command.INCR;
import static redis.clients.jedis.Protocol.Command.GET;
import static redis.clients.jedis.Protocol.Command.SET;
Expand All @@ -15,11 +15,12 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import redis.clients.jedis.Connection;
import redis.clients.jedis.DefaultJedisClientConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.Response;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.exceptions.JedisConnectionException;
Expand Down Expand Up @@ -151,14 +152,44 @@ public void discard() {
assertEquals("OK", status);
}

@Test(expected = JedisConnectionException.class)
public void discardMock() {
Connection mock = Mockito.spy(jedis.getConnection());
Mockito.doThrow(new JedisConnectionException("mock")).when(mock).getMany(anyInt());
Transaction trans = new Jedis(mock).multi();
@Test
public void discardFail() {
Transaction trans = jedis.multi();
trans.set("a", "a");
trans.set("b", "b");

try (MockedStatic<Protocol> protocol = Mockito.mockStatic(Protocol.class)) {
protocol.when(() -> Protocol.read(any())).thenThrow(JedisConnectionException.class);

trans.discard();
fail("Should get mocked JedisConnectionException.");
} catch (JedisConnectionException jce) {
// should be here
} finally {
// close() should pass
trans.close();
}
assertTrue(jedis.isBroken());
}

@Test
public void execFail() {
Transaction trans = jedis.multi();
trans.set("a", "a");
trans.set("b", "b");
trans.discard();

try (MockedStatic<Protocol> protocol = Mockito.mockStatic(Protocol.class)) {
protocol.when(() -> Protocol.read(any())).thenThrow(JedisConnectionException.class);

trans.exec();
fail("Should get mocked JedisConnectionException.");
} catch (JedisConnectionException jce) {
// should be here
} finally {
// close() should pass
trans.close();
}
assertTrue(jedis.isBroken());
}

@Test
Expand Down

0 comments on commit 5e1dfc3

Please sign in to comment.