Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,28 @@
*/
package io.pixelsdb.pixels.cache;

import io.pixelsdb.pixels.cache.mq.MappedBusMessage;
import io.pixelsdb.pixels.cache.mq.Message;

import java.util.Objects;

import static com.google.common.base.MoreObjects.toStringHelper;

/**
* ColumnletId can be used as the cache miss message.
* @author guodong
* @author hank
*/
public class ColumnletId
implements MappedBusMessage
implements Message
{
public long blockId;
private final static int SIZE = 2 * Short.BYTES + Long.BYTES;

/**
* Issue 115:
* Currently, blockId is not assigned and use externally.
* TODO: assign it with valid value which can be used to lookup the file/object in storage.
*/
private long blockId = 0;
public short rowGroupId;
public short columnId;
public boolean direct;
Expand All @@ -59,9 +67,9 @@ public ColumnletId()
@Override
public void write(MemoryMappedFile mem, long pos)
{
// mem.putLong(0, blockId);
// mem.putShort(8, rowGroupId);
// mem.putShort(12, columnId);
mem.setLong(pos, blockId);
mem.setShort(pos + Long.BYTES, rowGroupId);
mem.setShort(pos + Long.BYTES + Short.BYTES, columnId);
}

/**
Expand All @@ -73,20 +81,18 @@ public void write(MemoryMappedFile mem, long pos)
@Override
public void read(MemoryMappedFile mem, long pos)
{
mem.getLong(0);
mem.getShort(8);
mem.getShort(12);
blockId = mem.getLong(pos);
rowGroupId = mem.getShort(pos + Long.BYTES);
columnId = mem.getShort(pos + Long.BYTES + Short.BYTES);
}

/**
* Returns the message type.
*
* @return the message type
* @return the size of message in bytes.
*/
@Override
public int type()
public int size()
{
return 0;
return SIZE;
}

@Override
Expand Down Expand Up @@ -116,6 +122,16 @@ public String toString()
.toString();
}

@Override
public String print(MemoryMappedFile mem, long pos)
{
return toStringHelper(this)
.add("block id", mem.getLong(pos))
.add("row group id", mem.getShort(pos + Long.BYTES))
.add("column id", mem.getShort(pos + Long.BYTES + Short.BYTES))
.toString();
}

@Override
public int hashCode()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
*/

/*
* This file is derived from MappedBus, with the attribution notice:
* This file is derived from MemoryMappedFile in MappedBus,
* with the attribution notice:
*
* Copyright 2015 Caplogic AB.
* Licensed under the Apache License, Version 2.0.
* This class was inspired from an entry in Bryce Nyeggen's blog.
*
* We changed the visibility of some methods from protect to public.
* We changed the visibility of some methods from protect to public,
* and added direct (i.e. zero-copy) memory access.
*/
package io.pixelsdb.pixels.cache;

Expand All @@ -43,7 +45,6 @@
import java.nio.channels.FileChannel;

/**
* By hank:
* This class has been tested.
* It can read and write memory mapped file larger than 2GB.
* When the backing file is located under /dev/shm/, it works as a shared memory,
Expand Down Expand Up @@ -163,6 +164,11 @@ public void unmap()
unmmap.invoke(null, addr, this.size);
}

public void clear()
{
unsafe.setMemory(addr, size, (byte)0);
}

public static ByteOrder getOrder()
{
return order;
Expand Down Expand Up @@ -250,36 +256,28 @@ public long getLongVolatile(long pos)
* @param pos the position in the memory mapped file
* @param val the value to write
*/
public void putByte(long pos, byte val)
public void setByte(long pos, byte val)
{
unsafe.putByte(pos + addr, val);
}

public void putBytes(long pos, byte[] val)
{
for (byte v : val)
{
unsafe.putByte(pos++ + addr, v);
}
}

/**
* Writes a byte (volatile) to the specified position.
*
* @param pos the position in the memory mapped file
* @param val the value to write
*/
public void putByteVolatile(long pos, byte val)
public void setByteVolatile(long pos, byte val)
{
unsafe.putByteVolatile(null, pos + addr, val);
}

public void putShort(long pos, short val)
public void setShort(long pos, short val)
{
unsafe.putShort(pos + addr, val);
}

public void putShortVolatile(long pos, short val)
public void setShortVolatile(long pos, short val)
{
unsafe.putShortVolatile(null, pos + addr, val);
}
Expand All @@ -290,7 +288,7 @@ public void putShortVolatile(long pos, short val)
* @param pos the position in the memory mapped file
* @param val the value to write
*/
public void putInt(long pos, int val)
public void setInt(long pos, int val)
{
unsafe.putInt(pos + addr, val);
}
Expand All @@ -301,7 +299,7 @@ public void putInt(long pos, int val)
* @param pos the position in the memory mapped file
* @param val the value to write
*/
public void putIntVolatile(long pos, int val)
public void setIntVolatile(long pos, int val)
{
unsafe.putIntVolatile(null, pos + addr, val);
}
Expand All @@ -312,7 +310,7 @@ public void putIntVolatile(long pos, int val)
* @param pos the position in the memory mapped file
* @param val the value to write
*/
public void putLong(long pos, long val)
public void setLong(long pos, long val)
{
unsafe.putLong(pos + addr, val);
}
Expand All @@ -323,7 +321,7 @@ public void putLong(long pos, long val)
* @param pos the position in the memory mapped file
* @param val the value to write
*/
public void putLongVolatile(long pos, long val)
public void setLongVolatile(long pos, long val)
{
unsafe.putLongVolatile(null, pos + addr, val);
}
Expand Down Expand Up @@ -362,13 +360,24 @@ public ByteBuffer getDirectByteBuffer(long pos, int length)
* @param pos the position in the memory mapped file
* @param data the output buffer
* @param offset the offset in the buffer of the first byte to write
* @param length the length of the data
* @param length the length of the data to write
*/
public void setBytes(long pos, byte[] data, int offset, int length)
{
unsafe.copyMemory(data, BYTE_ARRAY_OFFSET + offset, null, pos + addr, length);
}

/**
* Writes a buffer of data.
*
* @param pos the position in the memory mapped file
* @param data the output buffer
*/
public void setBytes(long pos, byte[] data)
{
unsafe.copyMemory(data, BYTE_ARRAY_OFFSET, null, pos + addr, data.length);
}

public void copyMemory(long srcPos, long destPos, long length)
{
unsafe.copyMemory(srcPos, destPos, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public static void initialize(MemoryMappedFile indexFile, MemoryMappedFile cache

private static void setMagic(MemoryMappedFile file)
{
file.putBytes(0, Constants.MAGIC.getBytes(StandardCharsets.UTF_8));
file.setBytes(0, Constants.MAGIC.getBytes(StandardCharsets.UTF_8));
}

public static String getMagic(MemoryMappedFile file)
Expand All @@ -157,13 +157,13 @@ public static boolean checkMagic(MemoryMappedFile file)

private static void clearIndexRWAndCount(MemoryMappedFile indexFile)
{
indexFile.putIntVolatile(6, 0);
indexFile.setIntVolatile(6, 0);
}

public static void beginIndexWrite(MemoryMappedFile indexFile) throws InterruptedException
{
// Set the rw flag.
indexFile.putByteVolatile(6, (byte) 1);
indexFile.setByteVolatile(6, (byte) 1);
final int sleepMs = 10;
int waitMs = 0;
while ((indexFile.getIntVolatile(6) & READER_COUNT_MASK) > 0)
Expand All @@ -180,15 +180,15 @@ public static void beginIndexWrite(MemoryMappedFile indexFile) throws Interrupte
if (waitMs > CACHE_READ_LEASE_MS)
{
// clear reader count to continue writing.
indexFile.putIntVolatile(6, ZERO_READER_COUNT_WITH_RW_FLAG);
indexFile.setIntVolatile(6, ZERO_READER_COUNT_WITH_RW_FLAG);
break;
}
}
}

public static void endIndexWrite(MemoryMappedFile indexFile)
{
indexFile.putByteVolatile(6, (byte) 0);
indexFile.setByteVolatile(6, (byte) 0);
}

/**
Expand Down Expand Up @@ -248,7 +248,7 @@ public static boolean endIndexRead(MemoryMappedFile indexFile, long lease)

public static void setIndexVersion(MemoryMappedFile indexFile, int version)
{
indexFile.putIntVolatile(10, version);
indexFile.setIntVolatile(10, version);
}

public static int getIndexVersion(MemoryMappedFile indexFile)
Expand Down Expand Up @@ -369,7 +369,7 @@ public static void flushRadix(MemoryMappedFile indexFile, PixelsRadix radix)

public static void setCacheStatus(MemoryMappedFile cacheFile, short status)
{
cacheFile.putShortVolatile(6, status);
cacheFile.setShortVolatile(6, status);
}

public static short getCacheStatus(MemoryMappedFile cacheFile)
Expand All @@ -379,7 +379,7 @@ public static short getCacheStatus(MemoryMappedFile cacheFile)

public static void setCacheSize(MemoryMappedFile cacheFile, long size)
{
cacheFile.putLongVolatile(8, size);
cacheFile.setLongVolatile(8, size);
}

public static long getCacheSize(MemoryMappedFile cacheFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public int updateIncremental (int version, Layout layout)
public void write(PixelsCacheKey key, byte[] value)
{
PixelsCacheIdx cacheIdx = new PixelsCacheIdx(cacheOffset, value.length);
cacheFile.putBytes(cacheOffset, value);
cacheFile.setBytes(cacheOffset, value);
cacheOffset += value.length;
radix.put(key.blockId, key.rowGroupId, key.columnId, cacheIdx);
}
Expand Down Expand Up @@ -387,7 +387,7 @@ private int internalUpdateAll(int version, Layout layout, String[] files)
radix.put(pixelsPhysicalReader.getCurrentBlockId(), rowGroupId, columnId,
new PixelsCacheIdx(currCacheOffset, physicalLen));
byte[] columnlet = pixelsPhysicalReader.read(physicalOffset, physicalLen);
cacheFile.putBytes(currCacheOffset, columnlet);
cacheFile.setBytes(currCacheOffset, columnlet);
logger.debug(
"Cache write: " + file + "-" + rowGroupId + "-" + columnId + ", offset: " + currCacheOffset + ", length: " + columnlet.length);
currCacheOffset += physicalLen;
Expand Down Expand Up @@ -557,7 +557,7 @@ private int internalUpdateIncremental(int version, Layout layout, String[] files
//radix.put(pixelsPhysicalReader.getCurrentBlockId(), rowGroupId, columnId,
// new PixelsCacheIdx(newCacheOffset, physicalLen));
byte[] columnlet = pixelsPhysicalReader.read(physicalOffset, physicalLen);
cacheFile.putBytes(newCacheOffset, columnlet);
cacheFile.setBytes(newCacheOffset, columnlet);
logger.debug(
"Cache write: " + file + "-" + rowGroupId + "-" + columnId + ", offset: " + newCacheOffset + ", length: " + columnlet.length);
newCacheOffset += physicalLen;
Expand Down Expand Up @@ -766,7 +766,7 @@ private boolean flushNode(RadixNode node)
header = header | isKeyMask;
}
header = header | node.getChildren().size();
indexFile.putInt(currentIndexOffset, header); // header
indexFile.setInt(currentIndexOffset, header); // header
currentIndexOffset += 4;
for (Byte key : node.getChildren().keySet())
{ // children
Expand All @@ -784,14 +784,14 @@ private boolean flushNode(RadixNode node)
byte[] nodeBytes = new byte[node.getChildren().size() * 8];
nodeBuffer.flip();
nodeBuffer.get(nodeBytes);
indexFile.putBytes(currentIndexOffset, nodeBytes); // children
indexFile.setBytes(currentIndexOffset, nodeBytes); // children
currentIndexOffset += nodeBytes.length;
indexFile.putBytes(currentIndexOffset, node.getEdge()); // edge
indexFile.setBytes(currentIndexOffset, node.getEdge()); // edge
currentIndexOffset += node.getEdge().length;
if (node.isKey())
{ // value
node.getValue().getBytes(cacheIdxBuffer);
indexFile.putBytes(currentIndexOffset, cacheIdxBuffer.array());
indexFile.setBytes(currentIndexOffset, cacheIdxBuffer.array());
currentIndexOffset += 12;
}
return true;
Expand Down
Loading