Skip to content

Commit

Permalink
[CARBONDATA-2018][DataLoad] Optimization in reading/writing for sort …
Browse files Browse the repository at this point in the history
…temp row

Pick up the no-sort fields in the row and pack them as bytes array and skip parsing them during merge sort to reduce CPU consumption

This closes apache#1792
  • Loading branch information
xuchuanyin authored and jackylk committed Feb 8, 2018
1 parent cd7eed6 commit de92ea9
Show file tree
Hide file tree
Showing 27 changed files with 1,131 additions and 1,239 deletions.
Expand Up @@ -82,18 +82,26 @@ private static int calculateTotalBytes(byte[][] byteBufferArr) {
}

/**
* Method to get the required Dimension from obj []
* Method to get the required dictionary Dimension from obj []
*
* @param index
* @param row
* @return
*/
public static Integer getDimension(int index, Object[] row) {

Integer[] dimensions = (Integer[]) row[WriteStepRowUtil.DICTIONARY_DIMENSION];

public static int getDictDimension(int index, Object[] row) {
int[] dimensions = (int[]) row[WriteStepRowUtil.DICTIONARY_DIMENSION];
return dimensions[index];
}

/**
* Method to get the required non-dictionary & complex from 3-parted row
* @param index
* @param row
* @return
*/
public static byte[] getNoDictOrComplex(int index, Object[] row) {
byte[][] nonDictArray = (byte[][]) row[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
return nonDictArray[index];
}

/**
Expand All @@ -108,60 +116,11 @@ public static Object getMeasure(int index, Object[] row) {
return measures[index];
}

public static byte[] getByteArrayForNoDictionaryCols(Object[] row) {

return (byte[]) row[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
}

public static void prepareOutObj(Object[] out, int[] dimArray, byte[][] byteBufferArr,
Object[] measureArray) {

out[WriteStepRowUtil.DICTIONARY_DIMENSION] = dimArray;
out[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX] = byteBufferArr;
out[WriteStepRowUtil.MEASURE] = measureArray;

}

/**
* This method will extract the single dimension from the complete high card dims byte[].+ *
* The format of the byte [] will be, Totallength,CompleteStartOffsets,Dat
*
* @param highCardArr
* @param index
* @param highCardinalityCount
* @param outBuffer
*/
public static void extractSingleHighCardDims(byte[] highCardArr, int index,
int highCardinalityCount, ByteBuffer outBuffer) {
ByteBuffer buff = null;
short secIndex = 0;
short firstIndex = 0;
int length;
// if the requested index is a last one then we need to calculate length
// based on byte[] length.
if (index == highCardinalityCount - 1) {
// need to read 2 bytes(1 short) to determine starting offset and
// length can be calculated by array length.
buff = ByteBuffer.wrap(highCardArr, (index * 2) + 2, 2);
} else {
// need to read 4 bytes(2 short) to determine starting offset and
// length.
buff = ByteBuffer.wrap(highCardArr, (index * 2) + 2, 4);
}

firstIndex = buff.getShort();
// if it is a last dimension in high card then this will be last
// offset.so calculate length from total length
if (index == highCardinalityCount - 1) {
secIndex = (short) highCardArr.length;
} else {
secIndex = buff.getShort();
}

length = secIndex - firstIndex;

outBuffer.position(firstIndex);
outBuffer.limit(outBuffer.position() + length);

}
}
Expand Up @@ -383,7 +383,6 @@ object CarbonDataStoreCreator {
.getInstance.createCache(CacheType.REVERSE_DICTIONARY)

for (i <- set.indices) {
// val dim = getDimension(dims, i).get
val columnIdentifier: ColumnIdentifier =
new ColumnIdentifier(dims.get(i).getColumnId, null, null)

Expand Down
Expand Up @@ -35,7 +35,7 @@ import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl
import org.apache.carbondata.processing.loading.sort.SortStepRowUtil
import org.apache.carbondata.processing.loading.sort.SortStepRowHandler
import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl
import org.apache.carbondata.processing.sort.sortdata.SortParameters
import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
Expand Down Expand Up @@ -152,7 +152,7 @@ object DataLoadProcessorStepOnSpark {
val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
val conf = DataLoadProcessBuilder.createConfiguration(model)
val sortParameters = SortParameters.createSortParameters(conf)
val sortStepRowUtil = new SortStepRowUtil(sortParameters)
val sortStepRowHandler = new SortStepRowHandler(sortParameters)
TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
wrapException(e, model)
}
Expand All @@ -162,7 +162,7 @@ object DataLoadProcessorStepOnSpark {

override def next(): CarbonRow = {
val row =
new CarbonRow(sortStepRowUtil.convertRow(rows.next().getData))
new CarbonRow(sortStepRowHandler.convertRawRowTo3Parts(rows.next().getData))
rowCounter.add(1)
row
}
Expand Down
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.processing.loading.row;

import java.nio.ByteBuffer;

import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.DataTypeUtil;

/**
* During sort procedure, each row will be written to sort temp file in this logic format.
* an intermediate sort temp row consists 3 parts:
* dictSort, noDictSort, noSortDimsAndMeasures(dictNoSort, noDictNoSort, measure)
*/
public class IntermediateSortTempRow {
private int[] dictSortDims;
private byte[][] noDictSortDims;
private byte[] noSortDimsAndMeasures;

public IntermediateSortTempRow(int[] dictSortDims, byte[][] noDictSortDims,
byte[] noSortDimsAndMeasures) {
this.dictSortDims = dictSortDims;
this.noDictSortDims = noDictSortDims;
this.noSortDimsAndMeasures = noSortDimsAndMeasures;
}

public int[] getDictSortDims() {
return dictSortDims;
}

public byte[][] getNoDictSortDims() {
return noDictSortDims;
}

public byte[] getNoSortDimsAndMeasures() {
return noSortDimsAndMeasures;
}

/**
* deserialize from bytes array to get the no sort fields
* @param outDictNoSort stores the dict & no-sort fields
* @param outNoDictNoSort stores the no-dict & no-sort fields, including complex
* @param outMeasures stores the measure fields
* @param dataTypes data type for the measure
*/
public void unpackNoSortFromBytes(int[] outDictNoSort, byte[][] outNoDictNoSort,
Object[] outMeasures, DataType[] dataTypes) {
ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures);
// read dict_no_sort
int dictNoSortCnt = outDictNoSort.length;
for (int i = 0; i < dictNoSortCnt; i++) {
outDictNoSort[i] = rowBuffer.getInt();
}

// read no_dict_no_sort (including complex)
int noDictNoSortCnt = outNoDictNoSort.length;
for (int i = 0; i < noDictNoSortCnt; i++) {
short len = rowBuffer.getShort();
byte[] bytes = new byte[len];
rowBuffer.get(bytes);
outNoDictNoSort[i] = bytes;
}

// read measure
int measureCnt = outMeasures.length;
DataType tmpDataType;
Object tmpContent;
for (short idx = 0 ; idx < measureCnt; idx++) {
if ((byte) 0 == rowBuffer.get()) {
outMeasures[idx] = null;
continue;
}

tmpDataType = dataTypes[idx];
if (DataTypes.BOOLEAN == tmpDataType) {
if ((byte) 1 == rowBuffer.get()) {
tmpContent = true;
} else {
tmpContent = false;
}
} else if (DataTypes.SHORT == tmpDataType) {
tmpContent = rowBuffer.getShort();
} else if (DataTypes.INT == tmpDataType) {
tmpContent = rowBuffer.getInt();
} else if (DataTypes.LONG == tmpDataType) {
tmpContent = rowBuffer.getLong();
} else if (DataTypes.DOUBLE == tmpDataType) {
tmpContent = rowBuffer.getDouble();
} else if (DataTypes.isDecimal(tmpDataType)) {
short len = rowBuffer.getShort();
byte[] decimalBytes = new byte[len];
rowBuffer.get(decimalBytes);
tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes);
} else {
throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
}
outMeasures[idx] = tmpContent;
}
}


}

0 comments on commit de92ea9

Please sign in to comment.