Skip to content

Commit

Permalink
reduce memory copy in table Serialize
Browse files Browse the repository at this point in the history
  • Loading branch information
wxingda committed Apr 26, 2024
1 parent b9d2454 commit 17b6de8
Show file tree
Hide file tree
Showing 9 changed files with 16 additions and 149 deletions.
11 changes: 4 additions & 7 deletions internal/engine/sdk/go/examples/test.go
Expand Up @@ -272,16 +272,13 @@ func Search() {

response := &vearchpb.SearchResponse{}
reqByte := gamma.SearchRequestSerialize(request)
code, respByte := gamma.Search(opt.Engine, reqByte)
respByte, status := gamma.Search(opt.Engine, reqByte)

if respByte != nil {
gamma.DeSerialize(respByte, response)
}

if code != 0 {
fmt.Printf("code %v\n", code)
if status.Code != 0 {
fmt.Printf("code %v\n", status.Code)
return
}
gamma.DeSerialize(respByte, response)

fmt.Printf("%v\n", *(response.Results[0]))
}
Expand Down
5 changes: 1 addition & 4 deletions internal/engine/sdk/go/gamma/doc.go
Expand Up @@ -36,8 +36,7 @@ func (doc *Doc) Serialize() []byte {
i++
}

var fields []flatbuffers.UOffsetT
fields = make([]flatbuffers.UOffsetT, len(doc.Fields))
fields := make([]flatbuffers.UOffsetT, len(doc.Fields))
for i := 0; i < len(doc.Fields); i++ {
gamma_api.FieldStart(builder)
gamma_api.FieldAddName(builder, names[i])
Expand All @@ -57,8 +56,6 @@ func (doc *Doc) Serialize() []byte {
builder.Finish(builder.EndObject())

return builder.FinishedBytes()
// bufferLen := len(builder.FinishedBytes())
// return bufferLen
}

func (doc *Doc) DeSerialize(buffer []byte) {
Expand Down
5 changes: 2 additions & 3 deletions internal/engine/sdk/go/gamma/gamma.go
Expand Up @@ -35,9 +35,8 @@ func Close(engine unsafe.Pointer) int {
}

func CreateTable(engine unsafe.Pointer, table *Table) *Status {
var buffer []byte
table.Serialize(&buffer)
cstatus := C.CreateTable(engine, (*C.char)(unsafe.Pointer(&buffer[0])), C.int(len(buffer)))
tableBytes := table.Serialize()
cstatus := C.CreateTable(engine, (*C.char)(unsafe.Pointer(&tableBytes[0])), C.int(len(tableBytes)))

status := &Status{
Code: int32(cstatus.code),
Expand Down
7 changes: 2 additions & 5 deletions internal/engine/sdk/go/gamma/memory_info.go
Expand Up @@ -22,7 +22,7 @@ type MemoryInfo struct {
memoryInfo *gamma_api.MemoryInfo
}

func (status *MemoryInfo) Serialize(buffer *[]byte) int {
func (status *MemoryInfo) Serialize() []byte {
builder := flatbuffers.NewBuilder(0)

gamma_api.MemoryInfoStart(builder)
Expand All @@ -32,10 +32,7 @@ func (status *MemoryInfo) Serialize(buffer *[]byte) int {
gamma_api.MemoryInfoAddFieldRangeMem(builder, status.FieldRangeMem)
gamma_api.MemoryInfoAddBitmapMem(builder, status.BitmapMem)
builder.Finish(builder.EndObject())
bufferLen := len(builder.FinishedBytes())
*buffer = make([]byte, bufferLen)
copy(*buffer, builder.FinishedBytes())
return bufferLen
return builder.FinishedBytes()
}

func (status *MemoryInfo) DeSerialize(buffer []byte) {
Expand Down
117 changes: 0 additions & 117 deletions internal/engine/sdk/go/gamma/request.go
Expand Up @@ -49,122 +49,6 @@ type Request struct {
MultiVectorRank int32
L2Sqrt bool
Ranker string

request *gamma_api.Request
}

func (request *Request) Serialize(buffer *[]byte) int {
builder := flatbuffers.NewBuilder(0)
indexParams := builder.CreateString(request.IndexParams)

var fields, vectorQuerys, rangeFilters, termFilters []flatbuffers.UOffsetT
fields = make([]flatbuffers.UOffsetT, len(request.Fields))
vectorQuerys = make([]flatbuffers.UOffsetT, len(request.VecFields))
rangeFilters = make([]flatbuffers.UOffsetT, len(request.RangeFilters))
termFilters = make([]flatbuffers.UOffsetT, len(request.TermFilters))

for i := 0; i < len(request.Fields); i++ {
fields[i] = builder.CreateString(request.Fields[i])
}

for i := 0; i < len(request.VecFields); i++ {
name := builder.CreateString(request.VecFields[i].Name)
gamma_api.VectorQueryStartValueVector(builder, len(request.VecFields[i].Value))
for j := len(request.VecFields[i].Value) - 1; j >= 0; j-- {
builder.PrependByte(request.VecFields[i].Value[j])
}
value := builder.EndVector(len(request.VecFields[i].Value))
gamma_api.VectorQueryStart(builder)
gamma_api.VectorQueryAddName(builder, name)
gamma_api.VectorQueryAddValue(builder, value)
gamma_api.VectorQueryAddMinScore(builder, request.VecFields[i].MinScore)
gamma_api.VectorQueryAddMaxScore(builder, request.VecFields[i].MaxScore)
gamma_api.VectorQueryAddIndexType(builder, builder.CreateString(request.VecFields[i].IndexType))
vectorQuerys[i] = gamma_api.VectorQueryEnd(builder)
}

for i := 0; i < len(request.RangeFilters); i++ {
field := builder.CreateString(request.RangeFilters[i].Field)
gamma_api.RangeFilterStartLowerValueVector(builder, len(request.RangeFilters[i].LowerValue))
for j := len(request.RangeFilters[i].LowerValue) - 1; j >= 0; j-- {
builder.PrependByte(request.RangeFilters[i].LowerValue[j])
}
lowerValue := builder.EndVector(len(request.RangeFilters[i].LowerValue))

gamma_api.RangeFilterStartUpperValueVector(builder, len(request.RangeFilters[i].UpperValue))
for j := len(request.RangeFilters[i].UpperValue) - 1; j >= 0; j-- {
builder.PrependByte(request.RangeFilters[i].UpperValue[j])
}
upperValue := builder.EndVector(len(request.RangeFilters[i].UpperValue))

gamma_api.RangeFilterStart(builder)
gamma_api.RangeFilterAddField(builder, field)
gamma_api.RangeFilterAddLowerValue(builder, lowerValue)
gamma_api.RangeFilterAddUpperValue(builder, upperValue)
gamma_api.RangeFilterAddIncludeLower(builder, request.RangeFilters[i].IncludeLower)
gamma_api.RangeFilterAddIncludeUpper(builder, request.RangeFilters[i].IncludeUpper)
rangeFilters[i] = gamma_api.RangeFilterEnd(builder)
}

for i := 0; i < len(request.TermFilters); i++ {
field := builder.CreateString(request.TermFilters[i].Field)
gamma_api.TermFilterStartValueVector(builder, len(request.TermFilters[i].Value))
for j := len(request.TermFilters[i].Value) - 1; j >= 0; j-- {
builder.PrependByte(request.TermFilters[i].Value[j])
}
value := builder.EndVector(len(request.TermFilters[i].Value))
gamma_api.TermFilterStart(builder)
gamma_api.TermFilterAddField(builder, field)
gamma_api.TermFilterAddValue(builder, value)
gamma_api.TermFilterAddIsUnion(builder, request.TermFilters[i].IsUnion)
termFilters[i] = gamma_api.TermFilterEnd(builder)
}

gamma_api.RequestStartFieldsVector(builder, len(request.Fields))
for i := 0; i < len(request.Fields); i++ {
builder.PrependUOffsetT(fields[i])
}
f := builder.EndVector(len(request.Fields))

gamma_api.RequestStartVecFieldsVector(builder, len(request.VecFields))
for i := 0; i < len(request.VecFields); i++ {
builder.PrependUOffsetT(vectorQuerys[i])
}
v := builder.EndVector(len(request.VecFields))

gamma_api.RequestStartRangeFiltersVector(builder, len(request.RangeFilters))
for i := 0; i < len(request.RangeFilters); i++ {
builder.PrependUOffsetT(rangeFilters[i])
}
r := builder.EndVector(len(request.RangeFilters))

gamma_api.RequestStartTermFiltersVector(builder, len(request.TermFilters))
for i := 0; i < len(request.TermFilters); i++ {
builder.PrependUOffsetT(termFilters[i])
}
t := builder.EndVector(len(request.TermFilters))

ranker := builder.CreateString(request.Ranker)

gamma_api.RequestStart(builder)
gamma_api.RequestAddReqNum(builder, request.ReqNum)
gamma_api.RequestAddTopn(builder, request.TopN)
gamma_api.RequestAddBruteForceSearch(builder, request.BruteForceSearch)
gamma_api.RequestAddFields(builder, f)
gamma_api.RequestAddVecFields(builder, v)
gamma_api.RequestAddRangeFilters(builder, r)
gamma_api.RequestAddTermFilters(builder, t)
gamma_api.RequestAddIndexParams(builder, indexParams)
gamma_api.RequestAddMultiVectorRank(builder, request.MultiVectorRank)
gamma_api.RequestAddL2Sqrt(builder, request.L2Sqrt)
gamma_api.RequestAddRanker(builder, ranker)

builder.Finish(builder.EndObject())

bufferLen := len(builder.FinishedBytes())
*buffer = make([]byte, bufferLen)
copy(*buffer, builder.FinishedBytes())
return bufferLen
}

func SearchRequestSerialize(request *vearchpb.SearchRequest) []byte {
Expand Down Expand Up @@ -281,7 +165,6 @@ func SearchRequestSerialize(request *vearchpb.SearchRequest) []byte {
func QueryRequestSerialize(request *vearchpb.QueryRequest) []byte {
builder := flatbuffers.NewBuilder(0)
var fields, vectorQuerys, rangeFilters, termFilters []flatbuffers.UOffsetT
fields = make([]flatbuffers.UOffsetT, len(request.Fields))
vectorQuerys = make([]flatbuffers.UOffsetT, 0)
fields = make([]flatbuffers.UOffsetT, len(request.Fields))
rangeFilters = make([]flatbuffers.UOffsetT, len(request.RangeFilters))
Expand Down
7 changes: 2 additions & 5 deletions internal/engine/sdk/go/gamma/table.go
Expand Up @@ -50,7 +50,7 @@ type Table struct {
table *gamma_api.Table
}

func (table *Table) Serialize(out *[]byte) int {
func (table *Table) Serialize() []byte {
builder := flatbuffers.NewBuilder(0)
name := builder.CreateString(table.Name)

Expand Down Expand Up @@ -115,10 +115,7 @@ func (table *Table) Serialize(out *[]byte) int {
gamma_api.TableAddIndexType(builder, indexType)
gamma_api.TableAddIndexParams(builder, indexParams)
builder.Finish(builder.EndObject())
outLen := len(builder.FinishedBytes())
*out = make([]byte, outLen)
copy(*out, builder.FinishedBytes())
return outLen
return builder.FinishedBytes()
}

func (table *Table) DeSerialize(buffer []byte) {
Expand Down
6 changes: 2 additions & 4 deletions internal/engine/table/table.cc
Expand Up @@ -270,14 +270,12 @@ int Table::GetKeyByDocid(int docid, std::string &key) {
int Table::Add(const std::string &key,
const std::unordered_map<std::string, struct Field> &fields,
int docid) {
if (key.size() == 0) {
if (key.empty()) {
LOG(ERROR) << name_ << " add item error : _id is null!";
return -3;
}

char vChar[sizeof(docid)];
memcpy(vChar, &docid, sizeof(docid));
std::string v = std::string(vChar, sizeof(docid));
std::string v(reinterpret_cast<char *>(&docid), sizeof(docid));
item_to_docid_->Put(key, v);

for (size_t i = 0; i < attrs_.size(); i++) {
Expand Down
Empty file modified internal/engine/table/table.h 100755 → 100644
Empty file.
7 changes: 3 additions & 4 deletions internal/ps/handler_document.go
Expand Up @@ -277,7 +277,7 @@ func deleteDocs(ctx context.Context, store PartitionStore, items []*vearchpb.Ite

func bulk(ctx context.Context, store PartitionStore, items []*vearchpb.Item) {
wg := sync.WaitGroup{}
gammaArray := make([][]byte, len(items))
docBytes := make([][]byte, len(items))
for i, item := range items {
wg.Add(1)
go func(item *vearchpb.Item, n int) {
Expand All @@ -288,14 +288,13 @@ func bulk(ctx context.Context, store PartitionStore, items []*vearchpb.Item) {
}
}()
docGamma := &gamma.Doc{Fields: item.Doc.Fields}
docBytes := docGamma.Serialize()
gammaArray[n] = docBytes
docBytes[n] = docGamma.Serialize()
item.Doc.Fields = nil
item.Err = vearchpb.NewError(vearchpb.ErrorEnum_SUCCESS, nil).GetError()
}(item, i)
}
wg.Wait()
docCmd := &vearchpb.DocCmd{Type: vearchpb.OpType_BULK, Docs: gammaArray}
docCmd := &vearchpb.DocCmd{Type: vearchpb.OpType_BULK, Docs: docBytes}

err := store.Write(ctx, docCmd)
vErr := vearchpb.NewError(vearchpb.ErrorEnum_INTERNAL_ERROR, err)
Expand Down

0 comments on commit 17b6de8

Please sign in to comment.