Skip to content

Commit

Permalink
issue #29 - fixes after soak test with 2bn row table
Browse files Browse the repository at this point in the history
  • Loading branch information
bluestreak01 committed Feb 14, 2015
1 parent 3d0bebf commit 672f6ab
Show file tree
Hide file tree
Showing 14 changed files with 434 additions and 380 deletions.
49 changes: 28 additions & 21 deletions nfsdb-core/src/main/java/com/nfsdb/JournalKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ public JournalKey(String id) {
this.modelClass = null;
}

public JournalKey(String id, Class<T> modelClass, String location, int recordHint) {
this.id = id;
this.modelClass = modelClass;
this.location = location;
this.recordHint = recordHint;
}

public JournalKey(Class<T> clazz) {
this.modelClass = clazz;
this.id = clazz.getName();
Expand Down Expand Up @@ -117,18 +124,31 @@ public static JournalKey<Object> fromBuffer(ByteBuffer buffer) {
return new JournalKey<>(new String(clazz, Files.UTF_8), location == null ? null : new String(location), partitionType, recordHint, ordered);
}

public String getId() {
return id;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof JournalKey)) return false;
JournalKey that = (JournalKey) o;
return ordered == that.ordered && recordHint == that.recordHint && !(id != null ? !id.equals(that.id) : that.id != null) && !(location != null ? !location.equals(that.location) : that.location != null) && partitionType == that.partitionType;

}

public Class<T> getModelClass() {
return modelClass;
public int getBufferSize() {
return 4 + id.getBytes(Files.UTF_8).length + 4 + 2 * (location == null ? 0 : location.length()) + 1 + 1 + 4;
}

public String getId() {
return id;
}

public String getLocation() {
return location;
}

public Class<T> getModelClass() {
return modelClass;
}

public PartitionType getPartitionType() {
return partitionType;
}
Expand All @@ -137,19 +157,6 @@ public int getRecordHint() {
return recordHint;
}

public boolean isOrdered() {
return ordered;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof JournalKey)) return false;
JournalKey that = (JournalKey) o;
return ordered == that.ordered && recordHint == that.recordHint && !(id != null ? !id.equals(that.id) : that.id != null) && !(location != null ? !location.equals(that.location) : that.location != null) && partitionType == that.partitionType;

}

@Override
public int hashCode() {
int result = id != null ? id.hashCode() : 0;
Expand All @@ -162,6 +169,10 @@ public int hashCode() {

//////////////////////// REPLICATION CODE //////////////////////

public boolean isOrdered() {
return ordered;
}

@Override
public String toString() {
return "JournalKey{" +
Expand All @@ -173,10 +184,6 @@ public String toString() {
'}';
}

public int getBufferSize() {
return 4 + id.getBytes(Files.UTF_8).length + 4 + 2 * (location == null ? 0 : location.length()) + 1 + 1 + 4;
}

public void write(ByteBuffer buffer) {
// id
buffer.putInt(id.length());
Expand Down
138 changes: 70 additions & 68 deletions nfsdb-core/src/main/java/com/nfsdb/Partition.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -69,6 +69,7 @@ public class Partition<T> implements Iterable<T>, Closeable {
}

public void applyTx(long txLimit, long[] indexTxAddresses) {
access();
if (this.txLimit != txLimit) {
this.txLimit = txLimit;
for (int i = 0, indexProxiesSize = indexProxies.size(); i < indexProxiesSize; i++) {
Expand Down Expand Up @@ -416,6 +417,7 @@ public String toString() {
}

public void updateIndexes(long oldSize, long newSize) {
access();
if (oldSize < newSize) {
try {
for (int i1 = 0, sz = indexProxies.size(); i1 < sz; i1++) {
Expand Down Expand Up @@ -500,25 +502,6 @@ void append(T obj) throws JournalException {
}
}

private void appendBin(T obj, int i, ColumnMetadata meta) {
ByteBuffer buf = (ByteBuffer) Unsafe.getUnsafe().getObject(obj, meta.offset);
if (buf == null || buf.remaining() == 0) {
((VariableColumn) columns[i]).putNull();
} else {
((VariableColumn) columns[i]).putBin(buf);
}
}

private void checkColumnIndex(int i) {
if (columns == null) {
throw new JournalRuntimeException("Partition is closed: %s", this);
}

if (i < 0 || i >= columns.length) {
throw new JournalRuntimeException("Invalid column index: %d in %s", i, this);
}
}

void clearTx() {
applyTx(Journal.TX_LIMIT_EVAL, null);
}
Expand All @@ -530,24 +513,6 @@ void commit() throws JournalException {
}
}

@SuppressWarnings("unchecked")
private void createSymbolIndexProxies(long[] indexTxAddresses) {
indexProxies.clear();
if (sparseIndexProxies == null || sparseIndexProxies.length != columnCount) {
sparseIndexProxies = new SymbolIndexProxy[columnCount];
}

for (int i = 0; i < columnCount; i++) {
if (columnMetadata[i].indexed) {
SymbolIndexProxy<T> proxy = new SymbolIndexProxy<>(this, i, indexTxAddresses == null ? 0 : indexTxAddresses[i]);
indexProxies.add(proxy);
sparseIndexProxies[i] = proxy;
} else {
sparseIndexProxies[i] = null;
}
}
}

void expireOpenIndices() {
long expiry = System.currentTimeMillis() - journal.getMetadata().getOpenFileTTL();
for (int i = 0, indexProxiesSize = indexProxies.size(); i < indexProxiesSize; i++) {
Expand All @@ -574,18 +539,80 @@ void force() throws JournalException {
}
}

private FixedColumn getFixedWidthColumn(int i) {
checkColumnIndex(i);
return (FixedColumn) columns[i];
}

void getIndexPointers(long[] pointers) throws JournalException {
for (int i = 0, indexProxiesSize = indexProxies.size(); i < indexProxiesSize; i++) {
SymbolIndexProxy<T> proxy = indexProxies.get(i);
pointers[proxy.getColumnIndex()] = proxy.getIndex().getTxAddress();
}
}

void setPartitionDir(File partitionDir, long[] indexTxAddresses) {
boolean create = partitionDir != null && !partitionDir.equals(this.partitionDir);
this.partitionDir = partitionDir;
if (create) {
createSymbolIndexProxies(indexTxAddresses);
}
}

void truncate(long newSize) throws JournalException {
if (isOpen() && size() > newSize) {
for (int i = 0, sz = indexProxies.size(); i < sz; i++) {
SymbolIndexProxy<T> proxy = indexProxies.get(i);
proxy.getIndex().truncate(newSize);
}
for (int i = 0; i < columns.length; i++) {
if (columns[i] != null) {
columns[i].truncate(newSize);
}
}

commitColumns();
clearTx();
}
}

private void appendBin(T obj, int i, ColumnMetadata meta) {
ByteBuffer buf = (ByteBuffer) Unsafe.getUnsafe().getObject(obj, meta.offset);
if (buf == null || buf.remaining() == 0) {
((VariableColumn) columns[i]).putNull();
} else {
((VariableColumn) columns[i]).putBin(buf);
}
}

private void checkColumnIndex(int i) {
if (columns == null) {
throw new JournalRuntimeException("Partition is closed: %s", this);
}

if (i < 0 || i >= columns.length) {
throw new JournalRuntimeException("Invalid column index: %d in %s", i, this);
}
}

@SuppressWarnings("unchecked")
private void createSymbolIndexProxies(long[] indexTxAddresses) {
indexProxies.clear();
if (sparseIndexProxies == null || sparseIndexProxies.length != columnCount) {
sparseIndexProxies = new SymbolIndexProxy[columnCount];
}

for (int i = 0; i < columnCount; i++) {
if (columnMetadata[i].indexed) {
SymbolIndexProxy<T> proxy = new SymbolIndexProxy<>(this, i, indexTxAddresses == null ? 0 : indexTxAddresses[i]);
indexProxies.add(proxy);
sparseIndexProxies[i] = proxy;
} else {
sparseIndexProxies[i] = null;
}
}
}

private FixedColumn getFixedWidthColumn(int i) {
checkColumnIndex(i);
return (FixedColumn) columns[i];
}

private void open(int idx) throws JournalException {

switch (columnMetadata[idx].type) {
Expand Down Expand Up @@ -622,29 +649,4 @@ private void readBin(long localRowID, T obj, int i, ColumnMetadata m) {
buf.flip();
}
}

void setPartitionDir(File partitionDir, long[] indexTxAddresses) {
boolean create = partitionDir != null && !partitionDir.equals(this.partitionDir);
this.partitionDir = partitionDir;
if (create) {
createSymbolIndexProxies(indexTxAddresses);
}
}

void truncate(long newSize) throws JournalException {
if (isOpen() && size() > newSize) {
for (int i = 0, sz = indexProxies.size(); i < sz; i++) {
SymbolIndexProxy<T> proxy = indexProxies.get(i);
proxy.getIndex().truncate(newSize);
}
for (int i = 0; i < columns.length; i++) {
if (columns[i] != null) {
columns[i].truncate(newSize);
}
}

commitColumns();
clearTx();
}
}
}

0 comments on commit 672f6ab

Please sign in to comment.