Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,9 @@ public void setConsistencyLevel(ObTableConsistencyLevel consistencyLevel) {
public void setCredential(ObBytesString credential) {
this.credential = credential;
}

public void setTableId(long tableId) {
this.lsOperation.setTableId(tableId);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute;

import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
import com.alipay.oceanbase.rpc.protocol.payload.Constants;
import com.alipay.oceanbase.rpc.util.Serialization;
import io.netty.buffer.ByteBuf;

Expand All @@ -31,10 +32,19 @@ public class ObTableLSOperation extends AbstractPayload {

private List<ObTableTabletOp> tabletOperations = new ArrayList<ObTableTabletOp>();
private long lsId = INVALID_LS_ID; // i64
private String tableName;

private long tableId = Constants.OB_INVALID_ID;;

// common column names for all single operation
// todo: not used currently, will support soon
private List<String> rowKeyNames = new ArrayList<>();
private List<String> propertiesNames = new ArrayList<>();

private ObTableLSOpFlag optionFlag = new ObTableLSOpFlag();

private static final int LS_ID_SIZE = 8;
private static final long INVALID_LS_ID = -1;
private ObTableLSOpFlag optionFlag = new ObTableLSOpFlag();

/*
OB_UNIS_DEF_SERIALIZE(ObTableLSOp,
Expand All @@ -58,8 +68,38 @@ public byte[] encode() {
System.arraycopy(Serialization.encodeI64(lsId), 0, bytes, idx, 8);
idx += 8;

// 2. encode table name
int len = Serialization.getNeedBytes(tableName);
System.arraycopy(Serialization.encodeVString(tableName), 0, bytes, idx, len);
idx += len;

// 3. encode table id
len = Serialization.getNeedBytes(tableId);
System.arraycopy(Serialization.encodeVi64(tableId), 0, bytes, idx, len);
idx += len;

// 4. encode rowKey names
len = Serialization.getNeedBytes(rowKeyNames.size());
System.arraycopy(Serialization.encodeVi64(rowKeyNames.size()), 0, bytes, idx, len);
idx += len;
for (String rowKeyName : rowKeyNames) {
len = Serialization.getNeedBytes(rowKeyName);
System.arraycopy(Serialization.encodeVString(rowKeyName), 0, bytes, idx, len);
idx += len;
}

// 5. encode properties names
len = Serialization.getNeedBytes(propertiesNames.size());
System.arraycopy(Serialization.encodeVi64(propertiesNames.size()), 0, bytes, idx, len);
idx += len;
for (String propertyName : propertiesNames) {
len = Serialization.getNeedBytes(propertyName);
System.arraycopy(Serialization.encodeVString(propertyName), 0, bytes, idx, len);
idx += len;
}

// 2. encode option flag
int len = Serialization.getNeedBytes(optionFlag.getValue());
len = Serialization.getNeedBytes(optionFlag.getValue());
System.arraycopy(Serialization.encodeVi64(optionFlag.getValue()), 0, bytes, idx, len);
idx += len;

Expand Down Expand Up @@ -87,12 +127,33 @@ public Object decode(ByteBuf buf) {
// 1. decode others
this.lsId = Serialization.decodeI64(buf);

// 2. decode flags
// 2. decode table name
this.tableName = Serialization.decodeVString(buf);

// 3. decode table id
this.tableId = Serialization.decodeVi64(buf);

// 4. decode rowKey names
int len = (int) Serialization.decodeVi64(buf);
for (int i = 0; i < len; i++) {
String rowkeyName = Serialization.decodeVString(buf);
rowKeyNames.add(rowkeyName);
}


// 5. decode properties names
len = (int) Serialization.decodeVi64(buf);
for (int i = 0; i < len; i++) {
String propertyName = Serialization.decodeVString(buf);
propertiesNames.add(propertyName);
}

// 6. decode flags
long flagValue = Serialization.decodeVi64(buf);
optionFlag.setValue(flagValue);

// 3. decode Operation
int len = (int) Serialization.decodeVi64(buf);
// 7. decode Operation
len = (int) Serialization.decodeVi64(buf);
tabletOperations = new ArrayList<ObTableTabletOp>(len);
for (int i = 0; i < len; i++) {
ObTableTabletOp tabletOperation = new ObTableTabletOp();
Expand All @@ -114,7 +175,18 @@ public long getPayloadContentSize() {
payloadContentSize += operation.getPayloadSize();
}

return payloadContentSize + LS_ID_SIZE + Serialization.getNeedBytes(optionFlag.getValue());
payloadContentSize += Serialization.getNeedBytes(rowKeyNames.size());
for (String rowKeyName : rowKeyNames) {
payloadContentSize += Serialization.getNeedBytes(rowKeyName);
}

payloadContentSize += Serialization.getNeedBytes(propertiesNames.size());
for (String propertyName : propertiesNames) {
payloadContentSize += Serialization.getNeedBytes(propertyName);
}

return payloadContentSize + LS_ID_SIZE + Serialization.getNeedBytes(optionFlag.getValue())
+ Serialization.getNeedBytes(tableName) + Serialization.getNeedBytes(tableId);
}

/*
Expand Down Expand Up @@ -156,4 +228,12 @@ public void setIsSameType(boolean isSameType) {
optionFlag.setFlagIsSameType(isSameType);
}

public long getTableId() {
return tableId;
}

public void setTableId(long tableId) {
this.tableId = tableId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,14 @@ public enum ObTableOperationType {
INSERT_OR_UPDATE(4), // INSERT or UPDATE, columns not in arguments will remain unchanged
REPLACE(5), // DELETE & INSERT, columns not in arguments will change to default value
INCREMENT(6), // the column must be can be cast to long. if exist increase, else insert
APPEND(7);// append column value
APPEND(7),// append column value
SCAN(8), // query
TTL(9), // observer internal type, not used by client
CHECK_AND_INSERT_UP(10),
INVALID(11);




private int value;
private static Map<Integer, ObTableOperationType> map = new HashMap<Integer, ObTableOperationType>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute;

import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutate;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange;
import com.alipay.oceanbase.rpc.util.Serialization;
import io.netty.buffer.ByteBuf;

import static com.alipay.oceanbase.rpc.util.Serialization.encodeObUniVersionHeader;
import static com.alipay.oceanbase.rpc.util.Serialization.getObUniVersionHeaderLength;
import java.util.ArrayList;
import java.util.List;

public class ObTableSingleOp extends AbstractPayload {
private ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate();
private ObTableSingleOpType singleOpType;
private ObTableOperationType singleOpType;
private ObTableSingleOpFlag singleOpFlag = new ObTableSingleOpFlag();
private ObTableSingleOpQuery query = new ObTableSingleOpQuery();
private List<ObTableSingleOpEntity> entities = new ArrayList<>();

/*
* Encode.
Expand All @@ -42,15 +43,30 @@ public byte[] encode() {
idx = encodeHeader(bytes, idx);

// 1. encode op type
byte opTypeVal = singleOpType.getValue();
byte opTypeVal = singleOpType.getByteValue();
System.arraycopy(Serialization.encodeI8(opTypeVal), 0, bytes, idx, 1);
idx += 1;

// 1. encode query and mutate/op
int len = (int) queryAndMutate.getPayloadSize();
System.arraycopy(queryAndMutate.encode(), 0, bytes, idx, len);
long flag = singleOpFlag.getValue();
int len = Serialization.getNeedBytes(flag);
System.arraycopy(Serialization.encodeVi64(flag), 0, bytes, idx, len);
idx += len;

// 2. encode single op query
len = (int) query.getPayloadSize();
System.arraycopy(query.encode(), 0, bytes, idx, len);
idx += len;

// 3. encode entities
len = Serialization.getNeedBytes(entities.size());
System.arraycopy(Serialization.encodeVi64(entities.size()), 0, bytes, idx, len);
idx += len;
for (ObTableSingleOpEntity entity : entities) {
len = (int) entity.getPayloadSize();
System.arraycopy(entity.encode(), 0, bytes, idx, len);
idx += len;
}

return bytes;
}

Expand All @@ -61,12 +77,16 @@ public byte[] encode() {
public Object decode(ByteBuf buf) {
super.decode(buf);

this.queryAndMutate = new ObTableQueryAndMutate();
this.queryAndMutate.decode(buf);

byte opTypeVal = Serialization.decodeI8(buf);
this.singleOpType = ObTableOperationType.valueOf(Serialization.decodeI8(buf.readByte()));
this.singleOpFlag.setValue(Serialization.decodeVi64(buf));
this.query.decode(buf);
int len = (int) Serialization.decodeVi64(buf);
for (int i = 0; i < len; i++) {
ObTableSingleOpEntity entity = new ObTableSingleOpEntity();
entity.decode(buf);
entities.add(entity);
}

this.singleOpType.setValue(opTypeVal);
return this;
}

Expand All @@ -76,53 +96,54 @@ public Object decode(ByteBuf buf) {
@Override
public long getPayloadContentSize() {

long opTypeLen = Serialization.getNeedBytes(singleOpType.getValue());
return queryAndMutate.getPayloadSize() + opTypeLen;
long payloadContentSize = Serialization.getNeedBytes(singleOpType.getByteValue());
payloadContentSize += Serialization.getNeedBytes(singleOpFlag.getValue());
payloadContentSize += query.getPayloadSize();
payloadContentSize += Serialization.getNeedBytes(entities.size());
for (ObTableSingleOpEntity entity : entities) {
payloadContentSize += entity.getPayloadSize();
}
return payloadContentSize;
}

public List<ObNewRange> getScanRange() {
return query.getScanRanges();
}

/*
* Get table query.
*/
public ObTableQuery getTableQuery() {
return queryAndMutate.getTableQuery();
public void addScanRange(ObNewRange range)
{
this.addScanRange(range);
}

/*
* Set table query.
*/
public void setTableQuery(ObTableQuery tableQuery) {
this.queryAndMutate.setTableQuery(tableQuery);
public void setIsCheckNoExists(boolean isCheckNoExists) {
singleOpFlag.setIsCheckNotExists(isCheckNoExists);
}

/*
* Get mutations.
*/
public ObTableBatchOperation getMutations() {
return queryAndMutate.getMutations();
public ObTableOperationType getSingleOpType() {
return singleOpType;
}

/*
* Set mutations.
*/
public void setMutations(ObTableBatchOperation mutations) {
this.queryAndMutate.setMutations(mutations);
public void setSingleOpType(ObTableOperationType singleOpType) {
this.singleOpType = singleOpType;
}

public void setIsCheckAndExecute(boolean isCheckAndExecute) {
queryAndMutate.setIsCheckAndExecute(isCheckAndExecute);
public ObTableSingleOpQuery getQuery() {
return query;
}

public void setIsCheckNoExists(boolean isCheckNoExists) {
queryAndMutate.setIsCheckNoExists(isCheckNoExists);
public void setQuery(ObTableSingleOpQuery query) {
this.query = query;
}

public ObTableSingleOpType getSingleOpType() {
return singleOpType;
public List<ObTableSingleOpEntity> getEntities() {
return entities;
}

public void setSingleOpType(ObTableSingleOpType singleOpType) {
this.singleOpType = singleOpType;
public void setEntities(List<ObTableSingleOpEntity> entities) {
this.entities = entities;
}

public void addEntity(ObTableSingleOpEntity entity) {
this.entities.add(entity);
}
}
Loading