Skip to content

Commit

Permalink
modify that next index will be processed when bulk and pipe opeartion…
Browse files Browse the repository at this point in the history
… moved by switchover
  • Loading branch information
whchoi83 committed Jan 29, 2016
1 parent f0f9561 commit 0de1c2e
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 47 deletions.
38 changes: 22 additions & 16 deletions src/main/java/net/spy/memcached/collection/CollectionBulkStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package net.spy.memcached.collection;

import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;

import net.spy.memcached.CachedData;
Expand All @@ -39,6 +38,16 @@ public abstract class CollectionBulkStore<T> extends CollectionObject {

protected CollectionAttributes attribute;

protected int nextOpIndex = 0;

/**
* set next index of operation
* that will be processed after when operation moved by switchover
*/
public void setNextOperatedIndex(int i) {
this.nextOpIndex = i;
}

public abstract ByteBuffer getAsciiCommand();

public abstract ByteBuffer getBinaryCommand();
Expand Down Expand Up @@ -96,10 +105,9 @@ public ByteBuffer getAsciiCommand() {
ByteBuffer bb = ByteBuffer.allocate(capacity);

// create ascii operation string
Iterator<String> iterator = keyList.iterator();

while (iterator.hasNext()) {
String key = iterator.next();
int kSize = this.keyList.size();
for (int i = this.nextOpIndex; i < kSize; i++) {
String key = keyList.get(i);
byte[] value = cachedData.getData();

setArguments(
Expand All @@ -119,7 +127,7 @@ public ByteBuffer getAsciiCommand() {
.getMaxCount() != null) ? attribute
.getMaxCount()
: CollectionAttributes.DEFAULT_MAXCOUNT : "",
(iterator.hasNext()) ? PIPE : "");
(i < kSize - 1) ? PIPE : "");
bb.put(value);
bb.put(CRLF);
}
Expand Down Expand Up @@ -164,10 +172,9 @@ public ByteBuffer getAsciiCommand() {
ByteBuffer bb = ByteBuffer.allocate(capacity);

// create ascii operation string
Iterator<String> iterator = keyList.iterator();

while (iterator.hasNext()) {
String key = iterator.next();
int kSize = this.keyList.size();
for (int i = this.nextOpIndex; i < kSize; i++) {
String key = keyList.get(i);
byte[] value = cachedData.getData();

setArguments(
Expand All @@ -185,7 +192,7 @@ public ByteBuffer getAsciiCommand() {
.getMaxCount() != null) ? attribute
.getMaxCount()
: CollectionAttributes.DEFAULT_MAXCOUNT : "",
(iterator.hasNext()) ? PIPE : "");
(i < kSize - 1) ? PIPE : "");
bb.put(value);
bb.put(CRLF);
}
Expand Down Expand Up @@ -232,10 +239,9 @@ public ByteBuffer getAsciiCommand() {
ByteBuffer bb = ByteBuffer.allocate(capacity);

// create ascii operation string
Iterator<String> iterator = keyList.iterator();

while (iterator.hasNext()) {
String key = iterator.next();
int kSize = keyList.size();
for (int i = this.nextOpIndex; i < kSize; i++) {
String key = this.keyList.get(i);
byte[] value = cachedData.getData();

setArguments(
Expand All @@ -254,7 +260,7 @@ public ByteBuffer getAsciiCommand() {
.getMaxCount() != null) ? attribute
.getMaxCount()
: CollectionAttributes.DEFAULT_MAXCOUNT : "",
(iterator.hasNext()) ? PIPE : "");
(i < kSize - 1) ? PIPE : "");
bb.put(value);
bb.put(CRLF);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand All @@ -39,6 +38,16 @@ public abstract class CollectionPipedStore<T> extends CollectionObject {

protected CollectionAttributes attribute;

protected int nextOpIndex = 0;

/**
* set next index of operation
* that will be processed after when operation moved by switchover
*/
public void setNextOperatedIndex(int i) {
this.nextOpIndex = i;
}

public abstract ByteBuffer getAsciiCommand();
public abstract ByteBuffer getBinaryCommand();

Expand Down Expand Up @@ -66,7 +75,7 @@ public ByteBuffer getAsciiCommand() {
int capacity = 0;

// decode values
Collection<byte[]> encodedList = new ArrayList<byte[]>(list.size());
List<byte[]> encodedList = new ArrayList<byte[]>(list.size());
CachedData cd = null;
for (T each : list) {
cd = tc.encode(each);
Expand All @@ -84,14 +93,14 @@ public ByteBuffer getAsciiCommand() {
ByteBuffer bb = ByteBuffer.allocate(capacity);

// create ascii operation string
Iterator<byte[]> iterator = encodedList.iterator();
while (iterator.hasNext()) {
byte[] each = iterator.next();
int eSize = encodedList.size();
for (int i = this.nextOpIndex; i < eSize; i++) {
byte[] each = encodedList.get(i);
setArguments(bb, COMMAND, key, index, each.length,
(createKeyIfNotExists) ? "create" : "", (createKeyIfNotExists) ? cd.getFlags() : "",
(createKeyIfNotExists) ? (attribute != null && attribute.getExpireTime() != null) ? attribute.getExpireTime() : CollectionAttributes.DEFAULT_EXPIRETIME : "",
(createKeyIfNotExists) ? (attribute != null && attribute.getMaxCount() != null) ? attribute.getMaxCount() : CollectionAttributes.DEFAULT_MAXCOUNT : "",
(iterator.hasNext()) ? PIPE : "");
(i < eSize - 1) ? PIPE : "");
bb.put(each);
bb.put(CRLF);
}
Expand Down Expand Up @@ -129,7 +138,7 @@ public ByteBuffer getAsciiCommand() {
int capacity = 0;

// decode values
Collection<byte[]> encodedList = new ArrayList<byte[]>(set.size());
List<byte[]> encodedList = new ArrayList<byte[]>(set.size());
CachedData cd = null;
for (T each : set) {
cd = tc.encode(each);
Expand All @@ -147,15 +156,15 @@ public ByteBuffer getAsciiCommand() {
ByteBuffer bb = ByteBuffer.allocate(capacity);

// create ascii operation string
Iterator<byte[]> iterator = encodedList.iterator();
while (iterator.hasNext()) {
byte[] each = iterator.next();
int eSize = encodedList.size();
for (int i = this.nextOpIndex; i < eSize; i++) {
byte[] each = encodedList.get(i);

setArguments(bb, COMMAND, key, each.length,
(createKeyIfNotExists) ? "create" : "", (createKeyIfNotExists) ? cd.getFlags() : "",
(createKeyIfNotExists) ? (attribute != null && attribute.getExpireTime() != null) ? attribute.getExpireTime() : CollectionAttributes.DEFAULT_EXPIRETIME : "",
(createKeyIfNotExists) ? (attribute != null && attribute.getMaxCount() != null) ? attribute.getMaxCount() : CollectionAttributes.DEFAULT_MAXCOUNT : "",
(iterator.hasNext()) ? PIPE : "");
(i < eSize - 1) ? PIPE : "");
bb.put(each);
bb.put(CRLF);
}
Expand Down Expand Up @@ -212,17 +221,17 @@ public ByteBuffer getAsciiCommand() {
ByteBuffer bb = ByteBuffer.allocate(capacity);

// create ascii operation string
i = 0;
Iterator<Long> iterator = map.keySet().iterator();
while (iterator.hasNext()) {
Long bkey = iterator.next();
byte[] value = decodedList.get(i++);
int keySize = map.keySet().size();
List<Long> keyList = new ArrayList<Long>(map.keySet());
for (i = this.nextOpIndex; i < keySize; i++) {
Long bkey = keyList.get(i);
byte[] value = decodedList.get(i);

setArguments(bb, COMMAND, key, bkey, value.length,
(createKeyIfNotExists) ? "create" : "", (createKeyIfNotExists) ? cd.getFlags() : "",
(createKeyIfNotExists) ? (attribute != null && attribute.getExpireTime() != null) ? attribute.getExpireTime() : CollectionAttributes.DEFAULT_EXPIRETIME : "",
(createKeyIfNotExists) ? (attribute != null && attribute.getMaxCount() != null) ? attribute.getMaxCount() : CollectionAttributes.DEFAULT_MAXCOUNT : "",
(iterator.hasNext()) ? PIPE : "");
(i < keySize - 1) ? PIPE : "");
bb.put(value);
bb.put(CRLF);
}
Expand Down Expand Up @@ -284,11 +293,10 @@ public ByteBuffer getAsciiCommand() {
ByteBuffer bb = ByteBuffer.allocate(capacity);

// create ascii operation string
i = 0;
Iterator<Element<T>> iterator = elements.iterator();
while (iterator.hasNext()) {
Element<T> element = iterator.next();
byte[] value = decodedList.get(i++);
int eSize = elements.size();
for (i = this.nextOpIndex; i < eSize; i++) {
Element<T> element = elements.get(i);
byte[] value = decodedList.get(i);

setArguments(
bb,
Expand All @@ -315,7 +323,7 @@ public ByteBuffer getAsciiCommand() {
(createKeyIfNotExists) ? (attribute != null && attribute
.getReadable() != null && !attribute.getReadable()) ?
"unreadable" : "" : "",
(iterator.hasNext()) ? PIPE : "");
(i < eSize - 1) ? PIPE : "");
bb.put(value);
bb.put(CRLF);
}
Expand Down
22 changes: 15 additions & 7 deletions src/main/java/net/spy/memcached/collection/SetPipedExist.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;

import net.spy.memcached.CachedData;
Expand All @@ -38,6 +36,16 @@ public class SetPipedExist<T> extends CollectionObject {
private final Transcoder<T> tc;
private int itemCount;

protected int nextOpIndex = 0;

/**
* set next index of operation
* that will be processed after when operation moved by switchover
*/
public void setNextOperatedIndex(int i) {
this.nextOpIndex = i;
}

public List<T> getValues() {
return this.values;
}
Expand All @@ -57,7 +65,7 @@ public ByteBuffer getAsciiCommand() {
int capacity = 0;

// decode values
Collection<byte[]> encodedList = new ArrayList<byte[]>(values.size());
List<byte[]> encodedList = new ArrayList<byte[]>(values.size());
CachedData cd = null;
for (T each : values) {
cd = tc.encode(each);
Expand All @@ -75,12 +83,12 @@ public ByteBuffer getAsciiCommand() {
ByteBuffer bb = ByteBuffer.allocate(capacity);

// create ascii operation string
Iterator<byte[]> iterator = encodedList.iterator();
while (iterator.hasNext()) {
byte[] each = iterator.next();
int eSize = encodedList.size();
for (int i = this.nextOpIndex; i < eSize; i++) {
byte[] each = encodedList.get(i);

setArguments(bb, COMMAND, key, each.length,
(iterator.hasNext()) ? PIPE : "");
(i < eSize - 1) ? PIPE : "");
bb.put(each);
bb.put(CRLF);
}
Expand Down

0 comments on commit 0de1c2e

Please sign in to comment.