Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AF-1532 Replaced Chronicle Queue with chronicle Map to expose the log of Maven to the UI #2252

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -80,6 +80,11 @@
<artifactId>kie-wb-common-compiler-testutil</artifactId>
<version>${version.org.kie}</version>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-map</artifactId>
<version>${version.net.openhft.chronicle-map}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -545,6 +550,11 @@
</exclusions>
</dependency>

<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-map</artifactId>
</dependency>

<!-- Test-->
<dependency>
<groupId>org.kie.workbench.services</groupId>
Expand Down
Expand Up @@ -26,19 +26,7 @@
<dependencies>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-queue</artifactId>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-core</artifactId>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-bytes</artifactId>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-wire</artifactId>
<artifactId>chronicle-map</artifactId>
</dependency>
<dependency>
<groupId>org.kie.workbench.services</groupId>
Expand Down
Expand Up @@ -30,6 +30,11 @@
<artifactId>kie-wb-common-compiler-testutil</artifactId>
<version>${version.org.kie}</version>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-map</artifactId>
<version>3.16.4</version>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to use a managed version here.

</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
Expand All @@ -41,21 +46,10 @@
<groupId>org.kie.workbench.services</groupId>
<artifactId>kie-wb-common-compiler-offprocess-classpath</artifactId>
</dependency>

<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-queue</artifactId>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-core</artifactId>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-bytes</artifactId>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-wire</artifactId>
<artifactId>chronicle-map</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
Expand Down
Expand Up @@ -15,121 +15,25 @@
*/
package org.kie.workbench.common.services.backend.compiler.offprocess.impl;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.TailerDirection;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.Wire;
import org.kie.workbench.common.services.backend.compiler.CompilationResponse;
import org.kie.workbench.common.services.backend.compiler.impl.DefaultKieCompilationResponse;
import org.kie.workbench.common.services.backend.compiler.impl.DefaultKieCompilationResponseOffProcess;
import org.kie.workbench.common.services.backend.compiler.impl.kie.KieCompilationResponse;
import org.kie.workbench.common.services.backend.compiler.offprocess.ClientIPC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/***
* Client to access the result of the build executed in a separated process
*/

public class ClientIPCImpl implements ClientIPC {

private ResponseSharedMap map;
private QueueProvider provider;
private MapProvider mapProvider;
private Logger logger = LoggerFactory.getLogger(ClientIPCImpl.class);

public ClientIPCImpl(ResponseSharedMap map, QueueProvider provider) {
this.map = map;
this.provider = provider;
public ClientIPCImpl(MapProvider provider) {
this.mapProvider = provider;
}

public KieCompilationResponse getResponse(String uuid) {
if(isLoaded(uuid)) {
return (KieCompilationResponse)map.getResponse(uuid);
}else {
return new DefaultKieCompilationResponse(false, "");
}
}

private boolean isLoaded(String uuid) {
ExcerptTailer tailer = provider.getQueue().createTailer();
DefaultKieCompilationResponseOffProcess res = readThisDocument(tailer);
DefaultKieCompilationResponse kres = new DefaultKieCompilationResponse(res);
if (uuid.equals(kres.getRequestUUID())) {
if (!map.contains(kres.getRequestUUID())) {
map.addResponse(uuid, kres);
return true;
}
} else {
//we loop in the queue to find our Response by UUID, from the tail of the queue backward
tailer.toEnd();
tailer.direction(TailerDirection.BACKWARD);
res = loopOverQueue(tailer,uuid, 0l);
}

kres = new DefaultKieCompilationResponse(res);
if (!map.contains(kres.getRequestUUID())) {
map.addResponse(uuid, new DefaultKieCompilationResponse(res));
return true;
} else {
return false;
}
}


private DefaultKieCompilationResponseOffProcess loopOverQueue(ExcerptTailer tailer, String uuid, long previousIndex) {
long currentIndex = tailer.index();
if(logger.isDebugEnabled()) {
logger.debug("current index on loopOverQueue:{}", currentIndex);
}
DefaultKieCompilationResponseOffProcess res = readThisDocument(tailer);
if(uuid.equals(res.getRequestUUID())){
return res;
}else{
if(currentIndex == previousIndex){
// No more elements in the queue
return new DefaultKieCompilationResponseOffProcess(false, "");
}
return loopOverQueue(tailer, uuid, currentIndex);
}
}

private DefaultKieCompilationResponseOffProcess readThisDocument(ExcerptTailer tailer) {
if(logger.isDebugEnabled()) {
logger.debug("current index on readThisDocument:{}", tailer.index());
}
DefaultKieCompilationResponseOffProcess res = null;
try (DocumentContext dc = tailer.readingDocument()) {
if (dc.isPresent()) {
if(logger.isDebugEnabled()) {
logger.debug("Document Context index:{}", dc.index());
}
Wire wire = dc.wire();
Bytes bytes = wire.bytes();
if (!bytes.isEmpty()) {
try {
Object obj = deserialize(bytes.toByteArray());
res = (DefaultKieCompilationResponseOffProcess) obj;
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
}
if(res == null){
res = new DefaultKieCompilationResponseOffProcess(false, "");
}
return res;
}

private static Object deserialize(byte[] bytes) throws IOException, ClassNotFoundException {
try (ByteArrayInputStream b = new ByteArrayInputStream(bytes)) {
try (ObjectInputStream o = new ObjectInputStream(b)) {
return o.readObject();
}
}
CompilationResponse res = mapProvider.getResponse(uuid);
return res != null ? (KieCompilationResponse) res : new DefaultKieCompilationResponse(false,
"");
}
}
Expand Up @@ -35,54 +35,51 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Coordinator of the build executed in a separate process and the cleint to read the result
*/
public class CompilerIPCCoordinatorImpl implements CompilerIPCCoordinator {

private Logger logger = LoggerFactory.getLogger(CompilerIPCCoordinatorImpl.class);
private static final String placeholder = "<maven_repo>";
private static final String mavenModuleName = "kie-wb-common-compiler-offprocess-core";
private static final String classpathFile = "offprocess.classpath.template";
private Logger logger = LoggerFactory.getLogger(CompilerIPCCoordinatorImpl.class);
private String javaHome;
private String javaBin;
private String classpathTemplate;
private ResponseSharedMap responseMap;
private ClientIPC clientIPC;
private QueueProvider provider;
private String queueName;
private String mapName;
private String kieVersion;

public CompilerIPCCoordinatorImpl(QueueProvider provider) {
private MapProvider mapProvider;

public CompilerIPCCoordinatorImpl(MapProvider mapProvider) {
this.kieVersion = getKieVersion();
this.queueName = provider.getAbsolutePath();
this.provider = provider;
responseMap = new ResponseSharedMap();
clientIPC = new ClientIPCImpl(responseMap, provider);
this.mapName = mapProvider.getName();
this.mapProvider = mapProvider;
clientIPC = new ClientIPCImpl(mapProvider);
javaHome = System.getProperty("java.home");
javaBin = javaHome + File.separator + "bin" + File.separator + "java";
try {
classpathTemplate = IOUtils.toString(getClass().getClassLoader().getResourceAsStream(classpathFile), StandardCharsets.UTF_8);
classpathTemplate = IOUtils.toString(getClass().getClassLoader().getResourceAsStream(classpathFile),
StandardCharsets.UTF_8);
} catch (IOException e) {
logger.error(e.getMessage(), e);
logger.error(e.getMessage(),
e);
}
}

@Override
public CompilationResponse compile(CompilationRequest req) {
return internalBuild(req.getMavenRepo(),
req.getInfo().getPrjPath().toAbsolutePath().toString(),
getAlternateSettings(req.getOriginalArgs()), req.getRequestUUID());
getAlternateSettings(req.getOriginalArgs()),
req.getRequestUUID());
}


private String getKieVersion(){
private String getKieVersion() {
ConfigurationPropertiesStrategy prop = new ConfigurationPropertiesStrategy();
Map<ConfigurationKey, String> conf = prop.loadConfiguration();
return conf.get(ConfigurationKey.KIE_VERSION);
}


private String getAlternateSettings(String[] args) {
for (String arg : args) {
if (arg.startsWith(MavenCLIArgs.ALTERNATE_USER_SETTINGS)) {
Expand All @@ -92,17 +89,28 @@ private String getAlternateSettings(String[] args) {
return "";
}

private CompilationResponse internalBuild(String mavenRepo, String projectPath, String alternateSettingsAbsPath, String uuid) {
String classpath = classpathTemplate.replace(placeholder, mavenRepo);
private CompilationResponse internalBuild(String mavenRepo,
String projectPath,
String alternateSettingsAbsPath,
String uuid) {
String classpath = classpathTemplate.replace(placeholder,
mavenRepo);
try {
invokeServerBuild(mavenRepo, projectPath, uuid, classpath, alternateSettingsAbsPath, queueName);
if(logger.isDebugEnabled()) {
invokeServerBuild(mavenRepo,
projectPath,
uuid,
classpath,
alternateSettingsAbsPath,
mapName);
if (logger.isDebugEnabled()) {
logger.debug("invokeServerBuild completed");
}
return getCompilationResponse(uuid);
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new DefaultKieCompilationResponse(false, "");
logger.error(e.getMessage(),
e);
return new DefaultKieCompilationResponse(false,
"");
}
}

Expand All @@ -111,27 +119,36 @@ private CompilationResponse getCompilationResponse(String uuid) {
if (res != null) {
return res;
} else {
return new DefaultKieCompilationResponse(true, "");
return new DefaultKieCompilationResponse(true,
"");
}
}

private void invokeServerBuild(String mavenRepo, String projectPath, String uuid, String classpath, String alternateSettingsAbsPath, String queueName) throws Exception {
private void invokeServerBuild(String mavenRepo,
String projectPath,
String uuid,
String classpath,
String alternateSettingsAbsPath,
String mapName) throws Exception {
String[] commandArrayServer =
{
javaBin,
"-cp",
getClasspathIncludedCurrentModuleDep(mavenRepo, classpath),
getClasspathIncludedCurrentModuleDep(mavenRepo,
classpath),
"-Dorg.uberfire.nio.git.daemon.enabled=false",
"-Dorg.uberfire.nio.ssh.daemon.enabled=false",
ServerIPCImpl.class.getCanonicalName(),
uuid,
projectPath,
mavenRepo,
alternateSettingsAbsPath,
queueName
"/tmp/" + mapName
};

if (logger.isDebugEnabled()) {
logger.debug("************************** \n Invoking server in a separate process with args: \n{} \n{} \n{} \n{} \n{} \n{} \n{} \n{} \n**************************", commandArrayServer);
logger.debug("************************** \n Invoking server in a separate process with args: \n{} \n{} \n{} \n{} \n{} \n{} \n{} \n{} \n**************************",
commandArrayServer);
}
ProcessBuilder serverPb = new ProcessBuilder(commandArrayServer);
serverPb.directory(new File(projectPath));
Expand All @@ -140,7 +157,8 @@ private void invokeServerBuild(String mavenRepo, String projectPath, String uuid
writeStdOut(serverPb);
}

private String getClasspathIncludedCurrentModuleDep(String mavenRepo, String classpath){
private String getClasspathIncludedCurrentModuleDep(String mavenRepo,
String classpath) {
StringBuilder sb = new StringBuilder();
this.getClass().getPackage();
sb.append(mavenRepo).
Expand All @@ -153,7 +171,7 @@ private String getClasspathIncludedCurrentModuleDep(String mavenRepo, String cla
append(File.separator).append(mavenModuleName).
append("-").append(kieVersion).append(".jar").append(":").
append(classpath);
return sb.toString();
return sb.toString();
}

private void writeStdOut(ProcessBuilder builder) throws Exception {
Expand Down