Skip to content

Commit

Permalink
SAMZA-1428: Fix scala 2.10 compilation issue with java 8 interface st…
Browse files Browse the repository at this point in the history
…atic methods

Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Boris Shkolnik <boryas@apache.org>

Closes apache#300 from jmakes/samza-1428
  • Loading branch information
Jacob Maes committed Sep 21, 2017
1 parent 3773ec6 commit 987180a
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.google.common.base.Strings;
import org.apache.samza.SamzaException;
import org.apache.samza.coordinator.CoordinationUtilsFactory;
import org.apache.samza.util.ClassLoaderHelper;
import org.apache.samza.zk.ZkCoordinationUtilsFactory;

public class JobCoordinatorConfig extends MapConfig {
Expand Down Expand Up @@ -49,6 +51,13 @@ public String getJobCoordinationUtilsFactoryClassName() {
return className;
}

public CoordinationUtilsFactory getCoordinationUtilsFactory() {
// load the class
String coordinationUtilsFactoryClass = getJobCoordinationUtilsFactoryClassName();

return ClassLoaderHelper.fromClassName(coordinationUtilsFactoryClass, CoordinationUtilsFactory.class);
}

public String getJobCoordinatorFactoryClassName() {
String jobCoordinatorFactoryClassName = get(JOB_COORDINATOR_FACTORY);
if (Strings.isNullOrEmpty(jobCoordinatorFactoryClassName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,13 @@
package org.apache.samza.coordinator;

import org.apache.samza.config.Config;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.util.*;


/**
* factory to instantiate a c{@link CoordinationUtils} service
*/
public interface CoordinationUtilsFactory {

public static CoordinationUtilsFactory getCoordinationUtilsFactory(Config config) {
// load the class
JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
String coordinationUtilsFactoryClass = jcConfig.getJobCoordinationUtilsFactoryClassName();

return ClassLoaderHelper.fromClassName(coordinationUtilsFactoryClass, CoordinationUtilsFactory.class);
}

/**
* get a unique service instance
* @param groupId - unique id to identify the service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.CoordinationUtilsFactory;
import org.apache.samza.coordinator.DistributedLockWithState;
import org.apache.samza.execution.ExecutionPlan;
import org.apache.samza.job.ApplicationStatus;
Expand Down Expand Up @@ -217,9 +217,10 @@ public void waitForFinish() {
LOG.info("A single processor must create the intermediate streams. Processor {} will attempt to acquire the lock.", uid);
// Move the scope of coordination utils within stream creation to address long idle connection problem.
// Refer SAMZA-1385 for more details
JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
String coordinationId = new ApplicationConfig(config).getGlobalAppId() + APPLICATION_RUNNER_PATH_SUFFIX;
CoordinationUtils coordinationUtils =
CoordinationUtilsFactory.getCoordinationUtilsFactory(config).getCoordinationUtils(coordinationId, uid, config);
jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(coordinationId, uid, config);
if (coordinationUtils == null) {
LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", uid);
// each application process will try creating the streams, which
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.coordinator.CoordinationUtils;
Expand All @@ -44,6 +45,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

Expand All @@ -53,11 +55,10 @@
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;
import static org.powermock.api.mockito.PowerMockito.mockStatic;


@RunWith(PowerMockRunner.class)
@PrepareForTest(CoordinationUtilsFactory.class)
@PrepareForTest(LocalApplicationRunner.class)
public class TestLocalApplicationRunner {

private static final String PLAN_JSON =
Expand Down Expand Up @@ -105,9 +106,10 @@ public String getPlanAsJson()
};
when(planner.plan(anyObject())).thenReturn(plan);

mockStatic(CoordinationUtilsFactory.class);
CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class);
when(CoordinationUtilsFactory.getCoordinationUtilsFactory(anyObject())).thenReturn(coordinationUtilsFactory);
JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class);
when(mockJcConfig.getCoordinationUtilsFactory()).thenReturn(coordinationUtilsFactory);
PowerMockito.whenNew(JobCoordinatorConfig.class).withAnyArguments().thenReturn(mockJcConfig);

LocalApplicationRunner spy = spy(runner);
try {
Expand Down Expand Up @@ -164,8 +166,9 @@ public String getPlanAsJson()

CoordinationUtils coordinationUtils = mock(CoordinationUtils.class);
CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class);
mockStatic(CoordinationUtilsFactory.class);
when(CoordinationUtilsFactory.getCoordinationUtilsFactory(anyObject())).thenReturn(coordinationUtilsFactory);
JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class);
when(mockJcConfig.getCoordinationUtilsFactory()).thenReturn(coordinationUtilsFactory);
PowerMockito.whenNew(JobCoordinatorConfig.class).withAnyArguments().thenReturn(mockJcConfig);

DistributedLockWithState lock = mock(DistributedLockWithState.class);
when(lock.lockIfNotSet(anyLong(), anyObject())).thenReturn(true);
Expand Down

0 comments on commit 987180a

Please sign in to comment.