Skip to content

Commit

Permalink
Add uncompressed size to the shard metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
nileema committed May 16, 2015
1 parent 954ee41 commit 331536c
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 40 deletions.
Expand Up @@ -197,7 +197,7 @@ private static void insertShardsAndIndex(long tableId, List<ColumnInfo> columns,
{
try (IndexInserter indexInserter = new IndexInserter(handle.getConnection(), tableId, columns)) {
for (ShardInfo shard : shards) {
long shardId = dao.insertShard(shard.getShardUuid(), tableId, shard.getRowCount(), shard.getDataSize());
long shardId = dao.insertShard(shard.getShardUuid(), tableId, shard.getRowCount(), shard.getCompressedSize(), shard.getUncompressedSize());
Set<Integer> shardNodes = shard.getNodeIdentifiers().stream()
.map(nodeIds::get)
.collect(toSet());
Expand Down
Expand Up @@ -32,24 +32,28 @@ public class ShardInfo
private final Set<String> nodeIdentifiers;
private final List<ColumnStats> columnStats;
private final long rowCount;
private final long dataSize;
private final long compressedSize;
private final long uncompressedSize;

@JsonCreator
public ShardInfo(
@JsonProperty("shardUuid") UUID shardUuid,
@JsonProperty("nodeIdentifiers") Set<String> nodeIdentifiers,
@JsonProperty("columnStats") List<ColumnStats> columnStats,
@JsonProperty("rowCount") long rowCount,
@JsonProperty("dataSize") long dataSize)
@JsonProperty("compressedSize") long compressedSize,
@JsonProperty("uncompressedSize") long uncompressedSize)
{
this.shardUuid = checkNotNull(shardUuid, "shardUuid is null");
this.nodeIdentifiers = ImmutableSet.copyOf(checkNotNull(nodeIdentifiers, "nodeIdentifiers is null"));
this.columnStats = ImmutableList.copyOf(checkNotNull(columnStats, "columnStats is null"));

checkArgument(rowCount >= 0, "rowCount must be positive");
checkArgument(dataSize >= 0, "dataSize must be positive");
checkArgument(compressedSize >= 0, "compressedSize must be positive");
checkArgument(uncompressedSize >= 0, "uncompressedSize must be positive");
this.rowCount = rowCount;
this.dataSize = dataSize;
this.compressedSize = compressedSize;
this.uncompressedSize = uncompressedSize;
}

@JsonProperty
Expand Down Expand Up @@ -77,9 +81,15 @@ public long getRowCount()
}

@JsonProperty
public long getDataSize()
public long getCompressedSize()
{
return dataSize;
return compressedSize;
}

@JsonProperty
public long getUncompressedSize()
{
return uncompressedSize;
}

@Override
Expand All @@ -90,7 +100,8 @@ public String toString()
.add("nodeIdentifiers", nodeIdentifiers)
.add("columnStats", columnStats)
.add("rowCount", rowCount)
.add("dataSize", dataSize)
.add("compressedSize", compressedSize)
.add("uncompressedSize", uncompressedSize)
.toString();
}
}
Expand Up @@ -46,7 +46,8 @@ public interface ShardManagerDao
" table_id BIGINT NOT NULL,\n" +
" create_time DATETIME NOT NULL,\n" +
" row_count BIGINT NOT NULL,\n" +
" data_size BIGINT NOT NULL,\n" +
" compressed_size BIGINT NOT NULL,\n" +
" uncompressed_size BIGINT NOT NULL,\n" +
" UNIQUE (shard_uuid)\n" +
")")
void createTableShards();
Expand All @@ -69,14 +70,15 @@ public interface ShardManagerDao
@SqlUpdate("INSERT INTO nodes (node_identifier) VALUES (:nodeIdentifier)")
void insertNode(@Bind("nodeIdentifier") String nodeIdentifier);

@SqlUpdate("INSERT INTO shards (shard_uuid, table_id, create_time, row_count, data_size)\n" +
"VALUES (:shardUuid, :tableId, CURRENT_TIMESTAMP, :rowCount, :dataSize)")
@SqlUpdate("INSERT INTO shards (shard_uuid, table_id, create_time, row_count, compressed_size, uncompressed_size)\n" +
"VALUES (:shardUuid, :tableId, CURRENT_TIMESTAMP, :rowCount, :compressedSize, :uncompressedSize)")
@GetGeneratedKeys
long insertShard(
@Bind("shardUuid") UUID shardUuid,
@Bind("tableId") long tableId,
@Bind("rowCount") long rowCount,
@Bind("dataSize") long dataSize);
@Bind("compressedSize") long compressedSize,
@Bind("uncompressedSize") long uncompressedSize);

@SqlUpdate("INSERT INTO shard_nodes (shard_id, node_id)\n" +
"VALUES (:shardId, :nodeId)\n")
Expand Down Expand Up @@ -106,7 +108,7 @@ long insertShard(
"WHERE n.node_identifier = :nodeIdentifier")
Set<UUID> getNodeShards(@Bind("nodeIdentifier") String nodeIdentifier);

@SqlQuery("SELECT s.shard_id, s.shard_uuid, s.row_count, s.data_size\n" +
@SqlQuery("SELECT s.shard_id, s.shard_uuid, s.row_count, s.compressed_size, s.uncompressed_size\n" +
"FROM shards s\n" +
"JOIN shard_nodes sn ON (s.shard_id = sn.shard_id)\n" +
"JOIN nodes n ON (sn.node_id = n.node_id)\n" +
Expand Down
Expand Up @@ -26,25 +26,27 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.airlift.units.DataSize.Unit.BYTE;

public class ShardMetadata
{
private final long shardId;
private final UUID shardUuid;
private final long rowCount;
private final long dataSize;
private final long compressedSize;
private final long uncompressedSize;

public ShardMetadata(long shardId, UUID shardUuid, long rowCount, long dataSize)
public ShardMetadata(long shardId, UUID shardUuid, long rowCount, long compressedSize, long uncompressedSize)
{
checkArgument(shardId > 0, "shardId must be > 0");
checkArgument(rowCount >= 0, "rowCount must be >= 0");
checkArgument(dataSize >= 0, "dataSize must be >= 0");
checkArgument(compressedSize >= 0, "compressedSize must be >= 0");
checkArgument(uncompressedSize >= 0, "uncompressedSize must be >= 0");

this.shardId = shardId;
this.shardUuid = checkNotNull(shardUuid, "shardUuid is null");
this.rowCount = rowCount;
this.dataSize = dataSize;
this.compressedSize = compressedSize;
this.uncompressedSize = uncompressedSize;
}

public UUID getShardUuid()
Expand All @@ -62,9 +64,14 @@ public long getRowCount()
return rowCount;
}

public long getDataSize()
public long getCompressedSize()
{
return dataSize;
return compressedSize;
}

public long getUncompressedSize()
{
return uncompressedSize;
}

@Override
Expand All @@ -74,7 +81,8 @@ public String toString()
.add("shardId", shardId)
.add("shardUuid", shardUuid)
.add("rowCount", rowCount)
.add("dataSize", new DataSize(dataSize, BYTE).convertToMostSuccinctDataSize())
.add("compressedSize", DataSize.succinctBytes(compressedSize))
.add("uncompressedSize", DataSize.succinctBytes(uncompressedSize))
.toString();
}

Expand All @@ -93,13 +101,14 @@ public boolean equals(Object other)
return Objects.equals(this.shardId, that.shardId) &&
Objects.equals(this.shardUuid, that.shardUuid) &&
Objects.equals(this.rowCount, that.rowCount) &&
Objects.equals(this.dataSize, that.dataSize);
Objects.equals(this.compressedSize, that.compressedSize) &&
Objects.equals(this.uncompressedSize, that.uncompressedSize);
}

@Override
public int hashCode()
{
return Objects.hash(shardId, shardUuid, rowCount, dataSize);
return Objects.hash(shardId, shardUuid, rowCount, compressedSize, uncompressedSize);
}

public static class Mapper
Expand All @@ -113,7 +122,8 @@ public ShardMetadata map(int index, ResultSet r, StatementContext ctx)
r.getLong("shard_id"),
uuidFromBytes(r.getBytes("shard_uuid")),
r.getLong("row_count"),
r.getLong("data_size"));
r.getLong("compressed_size"),
r.getLong("uncompressed_size"));
}
}
}
Expand Up @@ -341,9 +341,9 @@ public void flush()
List<ColumnStats> columns = computeShardStats(stagingFile, columnIds, columnTypes);
Set<String> nodes = ImmutableSet.of(nodeId);
long rowCount = writer.getRowCount();
long dataSize = stagingFile.length(); // compressed size
long compressedSize = stagingFile.length();

shards.add(new ShardInfo(shardUuid, nodes, columns, rowCount, dataSize));
shards.add(new ShardInfo(shardUuid, nodes, columns, rowCount, compressedSize, writer.getUncompressedSize()));

writer = null;
shardUuid = null;
Expand Down
Expand Up @@ -441,7 +441,7 @@ static ShardInfo shardInfo(UUID shardUuid, String nodeIdentifier)

private static ShardInfo shardInfo(UUID shardUuid, String nodeId, List<ColumnStats> columnStats)
{
return new ShardInfo(shardUuid, ImmutableSet.of(nodeId), columnStats, 0, 0);
return new ShardInfo(shardUuid, ImmutableSet.of(nodeId), columnStats, 0, 0, 0);
}

private static Set<ShardNodes> toShardNodes(List<ShardInfo> shards)
Expand Down
Expand Up @@ -107,18 +107,19 @@ public void testNodeInsert()
@Test
public void testInsertShard()
{
long shardId = dao.insertShard(UUID.randomUUID(), 5, 13, 42);
long shardId = dao.insertShard(UUID.randomUUID(), 5, 13, 42, 84);

List<Map<String, Object>> shards = handle.select(
"SELECT table_id , row_count, data_size FROM shards WHERE shard_id = ?",
"SELECT table_id , row_count, compressed_size, uncompressed_size FROM shards WHERE shard_id = ?",
shardId);

assertEquals(shards.size(), 1);
Map<String, Object> shard = shards.get(0);

assertEquals(shard.get("table_id"), 5L);
assertEquals(shard.get("row_count"), 13L);
assertEquals(shard.get("data_size"), 42L);
assertEquals(shard.get("compressed_size"), 42L);
assertEquals(shard.get("uncompressed_size"), 84L);
}

@Test
Expand All @@ -130,7 +131,7 @@ public void testInsertShardNodeUsingShardUuid()

long tableId = 1;
UUID shard = UUID.randomUUID();
dao.insertShard(shard, tableId, 0, 0);
dao.insertShard(shard, tableId, 0, 0, 0);

dao.insertShardNode(shard, nodeId);

Expand Down Expand Up @@ -162,10 +163,10 @@ public void testNodeShards()

long tableId = 1;

long shardId1 = dao.insertShard(shardUuid1, tableId, 0, 0);
long shardId2 = dao.insertShard(shardUuid2, tableId, 0, 0);
long shardId3 = dao.insertShard(shardUuid3, tableId, 0, 0);
long shardId4 = dao.insertShard(shardUuid4, tableId, 0, 0);
long shardId1 = dao.insertShard(shardUuid1, tableId, 0, 0, 0);
long shardId2 = dao.insertShard(shardUuid2, tableId, 0, 0, 0);
long shardId3 = dao.insertShard(shardUuid3, tableId, 0, 0, 0);
long shardId4 = dao.insertShard(shardUuid4, tableId, 0, 0, 0);

assertEquals(dao.getShards(tableId), ImmutableList.of(shardUuid1, shardUuid2, shardUuid3, shardUuid4));

Expand Down Expand Up @@ -218,10 +219,10 @@ public void testShardSelection()
UUID shardUuid3 = UUID.randomUUID();
UUID shardUuid4 = UUID.randomUUID();

long shardId1 = dao.insertShard(shardUuid1, tableId, 0, 0);
long shardId2 = dao.insertShard(shardUuid2, tableId, 0, 0);
long shardId3 = dao.insertShard(shardUuid3, tableId, 0, 0);
long shardId4 = dao.insertShard(shardUuid4, tableId, 0, 0);
long shardId1 = dao.insertShard(shardUuid1, tableId, 0, 0, 0);
long shardId2 = dao.insertShard(shardUuid2, tableId, 0, 0, 0);
long shardId3 = dao.insertShard(shardUuid3, tableId, 0, 0, 0);
long shardId4 = dao.insertShard(shardUuid4, tableId, 0, 0, 0);

List<UUID> shards = dao.getShards(tableId);
assertEquals(shards.size(), 4);
Expand Down
Expand Up @@ -174,7 +174,7 @@ public void testWriter()
File backupFile = storageService.getBackupFile(shardUuid);

assertEquals(shardInfo.getRowCount(), 2);
assertEquals(shardInfo.getDataSize(), file.length());
assertEquals(shardInfo.getCompressedSize(), file.length());

// verify primary and backup shard exist
assertFile(file, "primary shard");
Expand Down

0 comments on commit 331536c

Please sign in to comment.