Skip to content

Commit

Permalink
Move globalfs launchers to globalfs project, more test hacking
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton Bulakh committed Nov 19, 2018
1 parent 94d5b13 commit c2fe6be
Show file tree
Hide file tree
Showing 26 changed files with 400 additions and 173 deletions.
24 changes: 22 additions & 2 deletions boot/src/main/java/io/datakernel/config/Config.java
@@ -1,3 +1,19 @@
/*
* Copyright (C) 2015-2018 SoftIndex LLC.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.datakernel.config;

import io.datakernel.annotation.Nullable;
Expand Down Expand Up @@ -308,9 +324,13 @@ static Config lazyConfig(Supplier<Config> configSupplier) {
return new Config() {
private Config actualConfig;

private synchronized Config ensureConfig() {
private Config ensureConfig() {
if (actualConfig == null) {
actualConfig = configSupplier.get();
synchronized (this) {
if (actualConfig == null) {
actualConfig = configSupplier.get();
}
}
}
return actualConfig;
}
Expand Down
18 changes: 14 additions & 4 deletions boot/src/main/java/io/datakernel/config/ConfigConverter.java
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2015 SoftIndex LLC.
* Copyright (C) 2015-2018 SoftIndex LLC.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,8 @@
package io.datakernel.config;

import io.datakernel.annotation.Nullable;
import io.datakernel.exception.ParseException;
import io.datakernel.util.ParserFunction;

import java.util.function.Function;
import java.util.function.Predicate;
Expand All @@ -36,20 +38,28 @@ public interface ConfigConverter<T> {
* @param <V> return type
* @return converter that knows how to get V value from T value saved in config
*/
default <V> ConfigConverter<V> transform(Function<T, V> to, Function<V, T> from) {
default <V> ConfigConverter<V> transform(ParserFunction<T, V> to, Function<V, T> from) {
ConfigConverter<T> thisConverter = this;
return new ConfigConverter<V>() {
@Override
@Nullable
public V get(Config config, @Nullable V defaultValue) {
T value = thisConverter.get(config, defaultValue == null ? null : from.apply(defaultValue));
return value != null ? to.apply(value) : null;
try {
return value != null ? to.parse(value) : null;
} catch (ParseException e) {
throw new IllegalArgumentException(e);
}
}

@Override
@Nullable
public V get(Config config) {
return to.apply(thisConverter.get(config));
try {
return to.parse(thisConverter.get(config));
} catch (ParseException e) {
throw new IllegalArgumentException(e);
}
}
};
}
Expand Down
5 changes: 5 additions & 0 deletions global/pom.xml
Expand Up @@ -38,6 +38,11 @@
<artifactId>datakernel-json</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.datakernel</groupId>
Expand Down
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.datakernel.launchers.globalfs;
package io.global.fs.launchers;

import com.google.inject.AbstractModule;
import com.google.inject.Inject;
Expand All @@ -40,8 +40,6 @@
import static com.google.inject.util.Modules.override;
import static io.datakernel.config.Config.ofProperties;
import static io.datakernel.config.ConfigConverters.ofInetSocketAddress;
import static io.datakernel.launchers.Initializers.ofEventloop;
import static io.datakernel.launchers.Initializers.ofHttpServer;
import static java.lang.Boolean.parseBoolean;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -73,7 +71,7 @@ private Collection<com.google.inject.Module> getBaseModules() {
@Singleton
Eventloop provide(Config config, OptionalDependency<ThrottlingController> maybeThrottlingController) {
return Eventloop.create()
.initialize(ofEventloop(config.getChild("eventloop")))
.initialize(Initializers.ofEventloop(config.getChild("eventloop")))
.initialize(eventloop -> maybeThrottlingController.ifPresent(eventloop::withInspector));
}

Expand All @@ -86,7 +84,7 @@ DiscoveryService provide() {
@Provides
@Singleton
AsyncHttpServer provide(Eventloop eventloop, DiscoveryServlet servlet, Config config) {
return AsyncHttpServer.create(eventloop, servlet).initialize(ofHttpServer(config.getChild("http")));
return AsyncHttpServer.create(eventloop, servlet).initialize(Initializers.ofHttpServer(config.getChild("http")));
}
}
);
Expand Down
Expand Up @@ -14,10 +14,8 @@
* limitations under the License.
*/

package io.datakernel.launchers.globalfs;
package io.global.fs.launchers;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Provides;
Expand All @@ -44,7 +42,6 @@
import io.global.fs.http.RemoteFsServlet;
import io.global.fs.local.GlobalFsDriver;
import io.global.fs.local.GlobalFsGatewayAdapter;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.Collection;
Expand All @@ -54,13 +51,10 @@

import static com.google.inject.util.Modules.override;
import static io.datakernel.config.Config.ofProperties;
import static io.datakernel.config.ConfigConverters.ofInetSocketAddress;
import static io.datakernel.config.ConfigConverters.ofPath;
import static io.datakernel.launchers.Initializers.ofEventloop;
import static io.datakernel.launchers.Initializers.ofHttpServer;
import static io.datakernel.launchers.globalfs.GlobalFsConfigConverters.ofCheckpointPositionStrategy;
import static io.datakernel.launchers.globalfs.GlobalFsConfigConverters.ofPrivKey;
import static io.datakernel.config.ConfigConverters.*;
import static io.datakernel.util.CollectionUtils.list;
import static io.global.fs.launchers.GlobalFsConfigConverters.ofCheckpointPositionStrategy;
import static io.global.fs.launchers.GlobalFsConfigConverters.ofPrivKey;
import static java.lang.Boolean.parseBoolean;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -104,9 +98,9 @@ private Collection<com.google.inject.Module> getBaseModules() {
// storage path for this node
.with("storage", "/tmp/TESTS/server" + server)
// this node manages Alice and Bob repos both named 'testFs'
.with("managedRepos",
/* alice(p) = */"cb78f3ac392aa96ec7a1ba3d1848423097cb5d892638ab297149ea03e9b7ba7d:10d6096aaff36c5b11d5abf063e0499e68e63270ef70d6dc18f0c47566ffdac5/testFs," +
/* bob(p) = */"aed50797fe8950ea25745c5cee391156905033ee4e3f5a2df418f687df78a7f1:784ca80eaa2fc2f643052a7469ec23fa2f72dd9ce248044e34ae986d7ce9ef8d/testFs")
.with("managedPubKeys",
/* alice(p) = */"cb78f3ac392aa96ec7a1ba3d1848423097cb5d892638ab297149ea03e9b7ba7d:10d6096aaff36c5b11d5abf063e0499e68e63270ef70d6dc18f0c47566ffdac5," +
/* bob(p) = */"aed50797fe8950ea25745c5cee391156905033ee4e3f5a2df418f687df78a7f1:784ca80eaa2fc2f643052a7469ec23fa2f72dd9ce248044e34ae986d7ce9ef8d")

// very short latency margin so it will actually do the task each time we call it *testing*
.with("fetching.latencyMargin", "1 second")
Expand Down Expand Up @@ -139,7 +133,10 @@ private Collection<com.google.inject.Module> getBaseModules() {
@Singleton
Eventloop provide(Config config, OptionalDependency<ThrottlingController> maybeThrottlingController) {
return Eventloop.create()
.initialize(ofEventloop(config.getChild("eventloop")))
.initialize(eventloop -> eventloop
.withFatalErrorHandler(config.get(ofFatalErrorHandler(), "fatalErrorHandler", eventloop.getFatalErrorHandler()))
.withIdleInterval(config.get(ofDuration(), "idleInterval", eventloop.getIdleInterval()))
.withThreadPriority(config.get(ofInteger(), "threadPriority", eventloop.getThreadPriority())))
.initialize(eventloop -> maybeThrottlingController.ifPresent(eventloop::withInspector));
}

Expand Down Expand Up @@ -176,15 +173,15 @@ GlobalFsGatewayAdapter provide(Config config, GlobalFsNode node) {
@Singleton
AsyncHttpServer provide(Eventloop eventloop, GlobalFsNodeServlet servlet, Config config) {
return AsyncHttpServer.create(eventloop, servlet)
.initialize(ofHttpServer(config.getChild("globalfs.http")));
.initialize(Initializers.ofHttpServer(config.getChild("globalfs.http")));
}

@Provides
@Named("gateway")
@Singleton
AsyncHttpServer provide(Eventloop eventloop, RemoteFsServlet servlet, Config config) {
return AsyncHttpServer.create(eventloop, servlet)
.initialize(ofHttpServer(config.getChild("globalfs.gateway.http")));
.initialize(Initializers.ofHttpServer(config.getChild("globalfs.gateway.http")));
}
}
);
Expand All @@ -203,8 +200,6 @@ protected Collection<com.google.inject.Module> getOverrideModules() {
}

public static void main(String[] args) throws Exception {
Logger logger = (Logger) LoggerFactory.getLogger("ROOT");
logger.setLevel(Level.TRACE);
new GatewayGlobalFsNodeLauncher().launch(parseBoolean(System.getProperty(EAGER_SINGLETONS_MODE)), args);
}
}
Expand Up @@ -14,22 +14,16 @@
* limitations under the License.
*/

package io.datakernel.launchers.globalfs;
package io.global.fs.launchers;

import io.datakernel.annotation.Nullable;
import io.datakernel.config.Config;
import io.datakernel.config.ConfigConverter;
import io.datakernel.config.SimpleConfigConverter;
import io.datakernel.exception.ParseException;
import io.datakernel.util.ParserFunction;
import io.global.common.*;
import io.global.fs.api.CheckpointPosStrategy;
import io.global.ot.api.RepoID;

import java.util.function.Function;

import static io.datakernel.config.ConfigConverters.ofInetSocketAddress;
import static io.datakernel.config.ConfigConverters.ofMemSizeAsLong;
import static io.datakernel.config.ConfigConverters.*;
import static io.global.fs.api.CheckpointPosStrategy.*;

public final class GlobalFsConfigConverters {
Expand All @@ -41,43 +35,24 @@ public static ConfigConverter<RawServerId> ofRawServerId() {
return ofInetSocketAddress().transform(RawServerId::new, RawServerId::getInetSocketAddress);
}

private static <T> ConfigConverter<T> ofStringIdentity(ParserFunction<String, T> from, Function<T, String> to) {
return new SimpleConfigConverter<T>() {
@Override
protected T fromString(String string) {
try {
return from.parse(string);
} catch (ParseException e) {
throw new IllegalArgumentException(e);
}
}

@Override
protected String toString(T value) {
return to.apply(value);
}
};

}

public static ConfigConverter<RepoID> ofRepoID() {
return ofStringIdentity(RepoID::fromString, RepoID::asString);
return ofString().transform(RepoID::fromString, RepoID::asString);
}

public static ConfigConverter<PubKey> ofPubKey() {
return ofStringIdentity(PubKey::fromString, PubKey::asString);
return ofString().transform(PubKey::fromString, PubKey::asString);
}

public static ConfigConverter<PrivKey> ofPrivKey() {
return ofStringIdentity(PrivKey::fromString, PrivKey::asString);
return ofString().transform(PrivKey::fromString, PrivKey::asString);
}

public static ConfigConverter<SimKey> ofSimKey() {
return ofStringIdentity(SimKey::fromString, SimKey::asString);
return ofString().transform(SimKey::fromString, SimKey::asString);
}

public static ConfigConverter<Hash> ofHash() {
return ofStringIdentity(Hash::parseString, Hash::asString);
return ofString().transform(Hash::parseString, Hash::asString);
}

public static ConfigConverter<CheckpointPosStrategy> ofCheckpointPositionStrategy() {
Expand Down

0 comments on commit c2fe6be

Please sign in to comment.