Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add database and some other commands in pipeline #2832

Merged
merged 1 commit into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
73 changes: 71 additions & 2 deletions src/main/java/redis/clients/jedis/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.json.JSONArray;

import redis.clients.jedis.args.*;
import redis.clients.jedis.commands.DatabasePipelineCommands;
import redis.clients.jedis.commands.PipelineBinaryCommands;
import redis.clients.jedis.commands.PipelineCommands;
import redis.clients.jedis.commands.ProtocolCommand;
Expand All @@ -25,8 +26,8 @@
import redis.clients.jedis.search.aggr.AggregationBuilder;
import redis.clients.jedis.search.aggr.AggregationResult;

public class Pipeline extends Queable implements PipelineCommands, PipelineBinaryCommands,
RedisModulePipelineCommands, Closeable {
public class Pipeline extends Queable implements PipelineCommands, PipelineBinaryCommands,
DatabasePipelineCommands, RedisModulePipelineCommands, Closeable {

protected final Connection connection;
// private final Jedis jedis;
Expand Down Expand Up @@ -3330,6 +3331,74 @@ public Response<Long> waitReplicas(int replicas, long timeout) {
return appendCommand(commandObjects.waitReplicas(replicas, timeout));
}

public Response<List<String>> time() {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.TIME), BuilderFactory.STRING_LIST));
}

@Override
public Response<String> select(final int index) {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.SELECT), BuilderFactory.STRING));
}

@Override
public Response<Long> dbSize() {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.DBSIZE), BuilderFactory.LONG));
}

@Override
public Response<String> swapDB(final int index1, final int index2) {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.SWAPDB)
.add(index1).add(index2), BuilderFactory.STRING));
}

@Override
public Response<Long> move(String key, int dbIndex) {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.MOVE)
.key(key).add(dbIndex), BuilderFactory.LONG));
}

@Override
public Response<Long> move(final byte[] key, final int dbIndex) {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.MOVE)
.key(key).add(dbIndex), BuilderFactory.LONG));
}

@Override
public Response<Boolean> copy(String srcKey, String dstKey, int db, boolean replace) {
return appendCommand(commandObjects.copy(srcKey, dstKey, db, replace));
}

@Override
public Response<Boolean> copy(byte[] srcKey, byte[] dstKey, int db, boolean replace) {
return appendCommand(commandObjects.copy(srcKey, dstKey, db, replace));
}

@Override
public Response<String> migrate(String host, int port, byte[] key, int destinationDB, int timeout) {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.MIGRATE)
.add(host).add(port).key(key).add(destinationDB).add(timeout), BuilderFactory.STRING));
}

@Override
public Response<String> migrate(String host, int port, String key, int destinationDB, int timeout) {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.MIGRATE)
.add(host).add(port).key(key).add(destinationDB).add(timeout), BuilderFactory.STRING));
}

@Override
public Response<String> migrate(String host, int port, int destinationDB, int timeout, MigrateParams params, byte[]... keys) {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.MIGRATE)
.add(host).add(port).add(new byte[0]).add(destinationDB).add(timeout).addParams(params)
.add(Protocol.Keyword.KEYS).keys((Object[]) keys), BuilderFactory.STRING));
}

@Override
public Response<String> migrate(String host, int port, int destinationDB, int timeout, MigrateParams params, String... keys) {
return appendCommand(new CommandObject<>(commandObjects.commandArguments(Protocol.Command.MIGRATE)
.add(host).add(port).add(new byte[0]).add(destinationDB).add(timeout).addParams(params)
.add(Protocol.Keyword.KEYS).keys((Object[]) keys), BuilderFactory.STRING));
}

public Response<Object> sendCommand(ProtocolCommand cmd, String... args) {
return sendCommand(new CommandArguments(cmd).addObjects((Object[]) args));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package redis.clients.jedis.commands;

import redis.clients.jedis.Response;
import redis.clients.jedis.params.MigrateParams;

public interface DatabasePipelineCommands {

Response<String> select(int index);

Response<Long> dbSize();

Response<String> swapDB(int index1, int index2);

Response<Long> move(String key, int dbIndex);

Response<Long> move(byte[] key, int dbIndex);

Response<Boolean> copy(String srcKey, String dstKey, int db, boolean replace);

Response<Boolean> copy(byte[] srcKey, byte[] dstKey, int db, boolean replace);

Response<String> migrate(String host, int port, byte[] key, int destinationDB, int timeout);

Response<String> migrate(String host, int port, int destinationDB, int timeout, MigrateParams params, byte[]... keys);

Response<String> migrate(String host, int port, String key, int destinationDB, int timeout);

Response<String> migrate(String host, int port, int destinationDB, int timeout, MigrateParams params, String... keys);

}
14 changes: 7 additions & 7 deletions src/test/java/redis/clients/jedis/PipeliningTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,13 @@ private void verifyHasBothValues(byte[] firstKey, byte[] secondKey, byte[] value
assertTrue(Arrays.equals(firstKey, value1) || Arrays.equals(firstKey, value2));
assertTrue(Arrays.equals(secondKey, value1) || Arrays.equals(secondKey, value2));
}
//
// @Test
// public void pipelineSelect() {
// Pipeline p = jedis.pipelined();
// p.select(1);
// p.sync();
// }

@Test
public void pipelineSelect() {
Pipeline p = jedis.pipelined();
p.select(1);
p.sync();
}

@Test
public void pipelineResponseWithoutData() {
Expand Down