Skip to content

Commit

Permalink
(issue #29) cluster controller
Browse files Browse the repository at this point in the history
  • Loading branch information
bluestreak01 committed Jan 21, 2015
1 parent ad54d77 commit 45cba78
Show file tree
Hide file tree
Showing 44 changed files with 1,336 additions and 425 deletions.
12 changes: 11 additions & 1 deletion nfsdb-core/src/main/java/com/nfsdb/Journal.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -696,4 +696,14 @@ void refreshInternal() throws JournalException {
configureIrregularPartition();
}
}

@Override
public boolean equals(Object o) {
return this == o || !(o == null || getClass() != o.getClass()) && key.equals(((Journal) o).key);
}

@Override
public int hashCode() {
return key.hashCode();
}
}
132 changes: 132 additions & 0 deletions nfsdb-core/src/main/java/com/nfsdb/collections/IntIntHashMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nfsdb.collections;

import java.util.Arrays;


public class IntIntHashMap {

private static final int FREE = -1;
private final double loadFactor;
private int[] values;
private int[] keys;
private int free;

public IntIntHashMap() {
this(11);
}

public IntIntHashMap(int initialCapacity) {
this(initialCapacity, 0.5f);
}

@SuppressWarnings("unchecked")
public IntIntHashMap(int initialCapacity, double loadFactor) {
int capacity = Math.max(initialCapacity, (int) (initialCapacity / loadFactor));
this.loadFactor = loadFactor;
values = new int[capacity];
keys = new int[capacity];
free = initialCapacity;
clear();
}

@SuppressWarnings({"unchecked"})
protected void rehash() {

int newCapacity = Primes.next(values.length << 1);

free = (int) (newCapacity * loadFactor);

int[] oldValues = values;
int[] oldKeys = keys;
this.keys = new int[newCapacity];
this.values = new int[newCapacity];
Arrays.fill(values, 0, values.length, FREE);

for (int i = oldKeys.length; i-- > 0; ) {
if (oldValues[i] != FREE) {
insertKey(oldKeys[i], oldValues[i]);
}
}
}

public int get(int key) {
int index = (key & 0x7fffffff) % keys.length;
if (values[index] == FREE || keys[index] == key) {
return values[index];
}
return probe(key, index);
}

private int probe(int key, int index) {
do {
index = (index + 1) % keys.length;
if (values[index] == FREE || keys[index] == key) {
return values[index];
}
} while (true);
}

public int put(int key, int value) {
int old = insertKey(key, value);
if (free == 0) {
rehash();
}
return old;
}

private int insertKey(int key, int value) {
int index = (key & 0x7fffffff) % keys.length;
if (values[index] == FREE) {
keys[index] = key;
values[index] = value;
free--;
return FREE;
}

if (keys[index] == key) {
int r = values[index];
values[index] = value;
return r;
}

return probeInsert(key, index, value);
}

private int probeInsert(int key, int index, int value) {
do {
index = (index + 1) % keys.length;
if (values[index] == FREE) {
keys[index] = key;
values[index] = value;
free--;
return FREE;
}

if (key == keys[index]) {
int r = values[index];
values[index] = value;
return r;
}
} while (true);
}

public void clear() {
Arrays.fill(values, FREE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@

package com.nfsdb.collections;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import java.util.Arrays;
import java.util.Iterator;


public class ObjIntHashMap<V> {
public class ObjIntHashMap<V> implements Iterable<ObjIntHashMap.Entry<V>> {

private static final Object FREE = new Object();
private final int noKeyValue;
private final double loadFactor;
private final EntryIterator iterator = new EntryIterator();
private V[] keys;
private int[] values;
private int free;
Expand Down Expand Up @@ -183,4 +187,41 @@ public void clear() {
public int size() {
return capacity - free;
}

@Override
public Iterator<Entry<V>> iterator() {
iterator.index = 0;
return iterator;
}

public static class Entry<V> {
public V key;
public int value;
}

public class EntryIterator extends AbstractImmutableIterator<Entry<V>> {

private final Entry<V> entry = new Entry<>();
private int index;

@Override
public boolean hasNext() {
return index < values.length && (keys[index] != FREE || scan());
}

private boolean scan() {
while (index < values.length && keys[index] == FREE) {
index++;
}
return index < values.length;
}

@SuppressFBWarnings({"IT_NO_SUCH_ELEMENT"})
@Override
public Entry<V> next() {
entry.key = keys[index];
entry.value = values[index++];
return entry;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class PartitionCleaner {
private boolean started = false;

public PartitionCleaner(JournalWriter writer, String name) {
this.executor = Executors.newCachedThreadPool(new NamedDaemonThreadFactory("jj-cleaner-" + name, true));
this.executor = Executors.newCachedThreadPool(new NamedDaemonThreadFactory("nfsdb-journal-cleaner-" + name, true));
this.batchEventProcessor = new BatchEventProcessor<>(ringBuffer, ringBuffer.newBarrier(), h = new PartitionCleanerEventHandler(writer));
ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

public class TimerCache {
private final long updateFrequency;
private final ExecutorService service = Executors.newCachedThreadPool(new NamedDaemonThreadFactory("jj-timer-cache", true));
private final ExecutorService service = Executors.newCachedThreadPool(new NamedDaemonThreadFactory("nfsdb-timer-cache", true));
private volatile long millis = System.currentTimeMillis();

public TimerCache() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014. Vlad Ilyushchenko
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,6 @@

public class ClusterLossException extends JournalNetworkException {
public ClusterLossException() {
super("Cluster already has a master. LOST.");
super("Cluster loss");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@ public <T> Journal<T> reader(Class<T> clazz) throws JournalException {
return reader(new JournalKey<>(clazz));
}

@Override
public <T> Journal<T> reader(JournalKey<T> key) throws JournalException {
return new Journal<>(getOrCreateMetadata(key), key, getTimerCache());
}

@Override
public <T> JournalBulkReader<T> bulkReader(Class<T> clazz, String location) throws JournalException {
return bulkReader(new JournalKey<>(clazz, location));
Expand All @@ -83,11 +78,6 @@ public JournalBulkReader bulkReader(String location) throws JournalException {
return bulkReader(new JournalKey<>(location));
}

@Override
public <T> JournalBulkReader<T> bulkReader(JournalKey<T> key) throws JournalException {
return new JournalBulkReader<>(getOrCreateMetadata(key), key, getTimerCache());
}

public JournalConfiguration getConfiguration() {
return configuration;
}
Expand All @@ -103,7 +93,7 @@ protected TimerCache getTimerCache() {
return timerCache;
}

private <T> JournalMetadata<T> getOrCreateMetadata(JournalKey<T> key) throws JournalException {
protected <T> JournalMetadata<T> getOrCreateMetadata(JournalKey<T> key) throws JournalException {
JournalMetadata<T> metadata = configuration.createMetadata(key);
File location = new File(metadata.getLocation());
if (!location.exists()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nfsdb.factory;

import com.nfsdb.JournalBulkWriter;
import com.nfsdb.JournalKey;
import com.nfsdb.JournalWriter;
import com.nfsdb.PartitionType;
import com.nfsdb.concurrent.TimerCache;
import com.nfsdb.exceptions.JournalException;
import com.nfsdb.factory.configuration.JournalConfiguration;

public abstract class AbstractJournalWriterFactory extends AbstractJournalReaderFactory implements JournalWriterFactory {

public AbstractJournalWriterFactory(JournalConfiguration configuration) {
super(configuration);
}

public AbstractJournalWriterFactory(JournalConfiguration configuration, TimerCache timerCache) {
super(configuration, timerCache);
}

@Override
public <T> JournalWriter<T> writer(Class<T> clazz) throws JournalException {
return writer(new JournalKey<>(clazz));
}

@Override
public <T> JournalWriter<T> writer(Class<T> clazz, String location) throws JournalException {
return writer(new JournalKey<>(clazz, location));
}

@Override
public <T> JournalWriter<T> writer(Class<T> clazz, String location, int recordHint) throws JournalException {
return writer(new JournalKey<>(clazz, location, PartitionType.DEFAULT, recordHint));
}

@Override
public JournalWriter writer(String location) throws JournalException {
return writer(new JournalKey<>(location));
}

@Override
public <T> JournalBulkWriter<T> bulkWriter(JournalKey<T> key) throws JournalException {
return new JournalBulkWriter<>(getConfiguration().createMetadata(key), key, getTimerCache());
}

@Override
public <T> JournalBulkWriter<T> bulkWriter(Class<T> clazz) throws JournalException {
return bulkWriter(new JournalKey<>(clazz));
}

@Override
public <T> JournalBulkWriter<T> bulkWriter(Class<T> clazz, String location) throws JournalException {
return bulkWriter(new JournalKey<>(clazz, location));
}

@Override
public <T> JournalBulkWriter<T> bulkWriter(Class<T> clazz, String location, int recordHint) throws JournalException {
return bulkWriter(new JournalKey<>(clazz, location, PartitionType.DEFAULT, recordHint));
}
}

0 comments on commit 45cba78

Please sign in to comment.