Skip to content

Commit

Permalink
Addressed review comments from Jacques
Browse files Browse the repository at this point in the history
  • Loading branch information
mehant authored and StevenMPhillips committed Oct 31, 2013
1 parent 6c78890 commit 30ada5d
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public PhysicalOperator visitOp(PhysicalOperator op, FragmentContext context) th
/* For every child operator create a trace operator as its parent */
for (int i = 0; i < newChildren.size(); i++)
{
String traceTag = newChildren.toString() + Integer.toString(traceTagCount++);
String traceTag = newChildren.get(i).toString() + Integer.toString(traceTagCount++);

/* Trace operator */
Trace traceOp = new Trace(newChildren.get(i), traceTag);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.nio.channels.FileChannel;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Formatter;

import com.google.common.collect.Iterators;
import io.netty.buffer.CompositeByteBuf;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
Expand All @@ -43,6 +46,19 @@

import io.netty.buffer.ByteBuf;

/* TraceRecordBatch contains value vectors which are exactly the same
* as the incoming record batch's value vectors. If the incoming
* record batch has a selection vector (type 2) then TraceRecordBatch
* will also contain a selection vector.
*
* Purpose of this record batch is to dump the data associated with all
* the value vectors and selection vector to disk.
*
* This record batch does not modify any data or schema, it simply
* consumes the incoming record batch's data, dump to disk and pass the
* same set of value vectors (and selection vectors) to its parent record
* batch
*/
public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceRecordBatch.class);
Expand All @@ -55,11 +71,33 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
/* Location where the log should be dumped */
private final String logLocation;

/* File descriptors needed to be able to dump to log file */
private FileOutputStream fos;
private FileChannel fc;

public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context)
{
super(pop, context, incoming);
this.traceTag = pop.traceTag;
logLocation = context.getConfig().getString(ExecConstants.TRACE_DUMP_DIRECTORY);

String fileName = getFileName();

/* Create the log file we will dump to and initialize the file descriptors */
try
{
File file = new File(fileName);

/* create the file */
file.createNewFile();

fos = new FileOutputStream(file, true);
fc = fos.getChannel();

} catch (IOException e)
{
logger.error("Unable to create file: " + fileName);
}
}

@Override
Expand All @@ -75,6 +113,13 @@ public int getRecordCount()
* Function is invoked for every record batch and it simply
* dumps the buffers associated with all the value vectors in
* this record batch to a log file.
*
* Function is divided into three main parts
* 1. Get all the buffers(ByteBuf's) associated with incoming
* record batch's value vectors and selection vector
* 2. Dump these buffers to the log file (performed by writeToFile())
* 3. Construct the record batch with these buffers to look exactly like
* the incoming record batch (performed by reconstructRecordBatch())
*/
@Override
protected void doWork()
Expand All @@ -85,112 +130,171 @@ protected void doWork()
/* Get the array of buffers from the incoming record batch */
WritableBatch batch = incoming.getWritableBatch();

BufferAllocator allocator = context.getAllocator();
ByteBuf[] incomingBuffers = batch.getBuffers();
RecordBatchDef batchDef = batch.getDef();

/* Total length of buffers across all value vectors */
int totalBufferLength = 0;
/* ByteBuf associated with the selection vector */
ByteBuf svBuf = null;

String fileName = getFileName();
/* Size of the selection vector */
int svCount = 0;

try
if (svMode == SelectionVectorMode.TWO_BYTE)
{
File file = new File(fileName);
SelectionVector2 sv2 = incoming.getSelectionVector2();
svCount = sv2.getCount();
svBuf = sv2.getBuffer();
}

if (!file.exists())
file.createNewFile();
/* Write the ByteBuf for the value vectors and selection vectors to disk
* totalBufferLength is the sum of size of all the ByteBuf across all value vectors
*/
int totalBufferLength = writeToFile(batchDef, incomingBuffers, svBuf, svCount);

FileOutputStream fos = new FileOutputStream(file, true);
/* Reconstruct the record batch from the ByteBuf's */
reconstructRecordBatch(batchDef, context, incomingBuffers, totalBufferLength, svBuf, svCount, svMode);
}

@Override
protected void setupNewSchema() throws SchemaChangeException
{
/* Trace operator does not deal with hyper vectors yet */
if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
throw new SchemaChangeException("Trace operator does not work with hyper vectors");

/* we have a new schema, clear our existing container to
* load the new value vectors
*/
container.clear();

/* Add all the value vectors in the container */
for(VectorWrapper<?> vv : incoming)
{
TransferPair tp = vv.getValueVector().getTransferPair();
container.add(tp.getTo());
}
}

@Override
public SelectionVector2 getSelectionVector2() {
return sv;
}

private String getFileName()
{
/* From the context, get the query id, major fragment id,
* minor fragment id. This will be used as the file name
* to which we will dump the incoming buffer data
*/
FragmentHandle handle = incoming.getContext().getHandle();

String qid = QueryIdHelper.getQueryId(handle.getQueryId());

int majorFragmentId = handle.getMajorFragmentId();
int minorFragmentId = handle.getMinorFragmentId();

String fileName = String.format("%s//%s_%s_%s_%s", logLocation, qid, majorFragmentId, minorFragmentId, traceTag);

return fileName;
}

private int writeToFile(RecordBatchDef batchDef, ByteBuf[] vvBufs, ByteBuf svBuf, int svCount)
{
String fileName = getFileName();
int totalBufferLength = 0;

try
{
/* Write the metadata to the file */
batchDef.writeDelimitedTo(fos);

FileChannel fc = fos.getChannel();

/* If we have a selection vector, dump it to file first */
if (svMode == SelectionVectorMode.TWO_BYTE)
if (svBuf != null)
{
SelectionVector2 incomingSV2 = incoming.getSelectionVector2();
int recordCount = incomingSV2.getCount();
int sv2Size = recordCount * SelectionVector2.RECORD_SIZE;

ByteBuf buf = incomingSV2.getBuffer();

/* For writing to the selection vectors we use
* setChar() method which does not modify the
* reader and writer index. To copy the entire buffer
* without having to get each byte individually we need
* to set the writer index
*/
buf.writerIndex(sv2Size);

/* dump the selection vector to log */
dumpByteBuf(fc, buf);

if (sv == null)
sv = new SelectionVector2(allocator);
svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);

sv.setRecordCount(recordCount);

/* create our selection vector from the
* incoming selection vector's buffer
*/
sv.setBuffer(buf);

buf.release();
fc.write(svBuf.nioBuffers());
}

/* For each buffer dump it to log and compute total length */
for (ByteBuf buf : incomingBuffers)
/* Dump the array of ByteBuf's associated with the value vectors */
for (ByteBuf buf : vvBufs)
{
/* dump the buffer into the file channel */
dumpByteBuf(fc, buf);

/* Reset reader index on the ByteBuf so we can read it again */
buf.resetReaderIndex();
fc.write(buf.nioBuffers());

/* compute total length of buffer, will be used when
* we create a compound buffer
*/
totalBufferLength += buf.readableBytes();
}

fc.close();
fos.close();
fc.force(true);
fos.flush();

} catch (IOException e)
{
logger.error("Unable to write buffer to file: " + fileName);
}

/* allocate memory for the compound buffer, compound buffer
* will contain the data from all the buffers across all the
* value vectors
*/
ByteBuf byteBuf = allocator.buffer(totalBufferLength);
return totalBufferLength;
}

/* Copy data from each buffer into the compound buffer */
for (int i = 0; i < incomingBuffers.length; i++)
private void reconstructRecordBatch(RecordBatchDef batchDef, FragmentContext context,
ByteBuf[] vvBufs, int totalBufferLength,
ByteBuf svBuf, int svCount, SelectionVectorMode svMode)
{
if (vvBufs.length > 0) /* If we have ByteBuf's associated with value vectors */
{
byteBuf.writeBytes(incomingBuffers[i], incomingBuffers[i].readableBytes());
}
CompositeByteBuf cbb = new CompositeByteBuf(vvBufs[0].alloc(), true, vvBufs.length);

/* Copy data from each buffer into the compound buffer */
for (int i = 0; i < vvBufs.length; i++)
{
cbb.addComponent(vvBufs[i]);
}

List<FieldMetadata> fields = batchDef.getFieldList();
List<FieldMetadata> fields = batchDef.getFieldList();

int bufferOffset = 0;
int bufferOffset = 0;

/* For each value vector slice up the appropriate size from
* the compound buffer and load it into the value vector
/* For each value vector slice up the appropriate size from
* the compound buffer and load it into the value vector
*/
int vectorIndex = 0;

for(VectorWrapper<?> vv : container)
{
FieldMetadata fmd = fields.get(vectorIndex);
ValueVector v = vv.getValueVector();
v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
vectorIndex++;
bufferOffset += fmd.getBufferLength();
}
}

/* Set the selection vector for the record batch if the
* incoming batch had a selection vector
*/
int vectorIndex = 0;
for(VectorWrapper<?> vv : container)
if (svMode == SelectionVectorMode.TWO_BYTE)
{
FieldMetadata fmd = fields.get(vectorIndex);
ValueVector v = vv.getValueVector();
v.load(fmd, byteBuf.slice(bufferOffset, fmd.getBufferLength()));
vectorIndex++;
bufferOffset += fmd.getBufferLength();
if (sv == null)
sv = new SelectionVector2(context.getAllocator());

sv.setRecordCount(svCount);

/* create our selection vector from the
* incoming selection vector's buffer
*/
sv.setBuffer(svBuf);

svBuf.release();
}

container.buildSchema(svMode);
Expand All @@ -204,56 +308,21 @@ protected void doWork()
}

@Override
protected void setupNewSchema() throws SchemaChangeException
protected void cleanup()
{
/* Trace operator does not deal with hyper vectors yet */
if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
throw new SchemaChangeException("Trace operator does not work with hyper vectors");

/* we have a new schema, clear our existing container to
* load the new value vectors
*/
container.clear();
/* Release the selection vector */
if (sv != null)
sv.clear();

/* Add all the value vectors in the container */
for(VectorWrapper<?> vv : incoming)
/* Close the file descriptors */
try
{
TransferPair tp = vv.getValueVector().getTransferPair();
container.add(tp.getTo());
fos.close();
fc.close();
} catch (IOException e)
{
logger.error("Unable to close file descriptors for file: " + getFileName());
}
}

@Override
public SelectionVector2 getSelectionVector2() {
return sv;
}

private String getFileName()
{
/* From the context, get the query id, major fragment id,
* minor fragment id. This will be used as the file name
* to which we will dump the incoming buffer data
*/
FragmentHandle handle = incoming.getContext().getHandle();

String qid = QueryIdHelper.getQueryId(handle.getQueryId());

int majorFragmentId = handle.getMajorFragmentId();
int minorFragmentId = handle.getMinorFragmentId();

return new String(logLocation + "/" + traceTag + "_" + qid + "_" + majorFragmentId + "_" + minorFragmentId);
}

private void dumpByteBuf(FileChannel fc, ByteBuf buf) throws IOException
{
int bufferLength = buf.readableBytes();

byte[] byteArray = new byte[bufferLength];

/* Transfer bytes to a byte array */
buf.readBytes(byteArray);

/* Drop the byte array into the file channel */
fc.write(ByteBuffer.wrap(byteArray));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public IterOutcome next() {
throw new UnsupportedOperationException();
}
}

protected abstract void setupNewSchema() throws SchemaChangeException;
protected abstract void doWork();
}
Loading

0 comments on commit 30ada5d

Please sign in to comment.