Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for multiple Azkaban Host URL #342

Merged
merged 1 commit into from
Feb 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/com/linkedin/drelephant/AutoTuner.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ public void run() {
BaselineComputeUtil baselineComputeUtil = new BaselineComputeUtil();
FitnessComputeUtil fitnessComputeUtil = new FitnessComputeUtil();
ParamGenerator paramGenerator = new PSOParamGenerator();
JobCompleteDetector jobCompleteDetector = new AzkabanJobCompleteDetector();
while (!Thread.currentThread().isInterrupted()) {
try {
baselineComputeUtil.computeBaseline();
JobCompleteDetector jobCompleteDetector = new AzkabanJobCompleteDetector();
jobCompleteDetector.updateCompletedExecutions();
fitnessComputeUtil.updateFitness();
paramGenerator.getParams();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@
import java.io.File;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;

import org.apache.log4j.Logger;

import com.linkedin.drelephant.AutoTuner;
import com.linkedin.drelephant.clients.WorkflowClient;
import com.linkedin.drelephant.configurations.scheduler.SchedulerConfigurationData;
import com.linkedin.drelephant.util.InfoExtractor;

Expand All @@ -29,37 +35,57 @@
* This class is azkaban scheduler util for getting job status.
*/
public class AzkabanJobStatusUtil {
private AzkabanWorkflowClient _workflowClient;

private static final Logger logger = Logger.getLogger(AzkabanJobStatusUtil.class);
private HashMap<String, AzkabanWorkflowClient> workflowClients = new HashMap<String, AzkabanWorkflowClient>();
private String scheduler = "azkaban";
private static String USERNAME = "username";
private static String PRIVATE_KEY = "private_key";
private static String PASSWORD = "password";
private static final long TOKEN_UPDATE_INTERVAL = AutoTuner.ONE_MIN * 60 * 1;

/**
* Constructor of the class
* @param url
*/
public AzkabanJobStatusUtil(String url) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this class can be marked as a singleton?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is not thread safe, whoever is using this class need to make sure that it runs only one request at a time. So we need to first make it thread safe or make it a utility kind of class before making it singleton.

Copy link
Contributor

@akshayrai akshayrai Feb 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not happy with keeping the login state in a utility method. Ideally, the login should be done in a separate class (workflowClient itself) and then you can call this utility method by passing the workflowClient object to it. This way the login can be reused for other utilities and the utility methods also remain stateless and easily testable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently also login method is called form outside. As we discussed we will create a task to refactor this and take it up with in one month.

_workflowClient = (AzkabanWorkflowClient) InfoExtractor.getWorkflowClientInstance(scheduler, url);
SchedulerConfigurationData schedulerData = InfoExtractor.getSchedulerData(scheduler);

if (schedulerData == null) {
throw new RuntimeException(String.format("Cannot find scheduler %s for url %s", scheduler, url));
public AzkabanWorkflowClient getWorkflowClient(String url) throws MalformedURLException {
String hostAddress = "https://" + new URL(url).getAuthority();
AzkabanWorkflowClient workflowClient = null;
if (workflowClients.containsKey(hostAddress)) {
logger.debug("WorkflowClient Exist " + url + " Host Address is " + hostAddress);
workflowClient = workflowClients.get(hostAddress);
} else {
logger.debug("WorkflowClient Does not Exist " + url + " Host Address is " + hostAddress);
workflowClient = (AzkabanWorkflowClient) InfoExtractor.getWorkflowClientInstance(scheduler, url);
workflowClients.put(hostAddress, workflowClient);
}
doLogin(workflowClient);
return workflowClient;
}

if (!schedulerData.getParamMap().containsKey(USERNAME)) {
throw new RuntimeException(String.format("Cannot find username for login"));
}
public WorkflowClient doLogin(AzkabanWorkflowClient workflowClient) {
Long _currentTime = System.currentTimeMillis();
if (_currentTime - workflowClient.getSessionUpdatedTime() > TOKEN_UPDATE_INTERVAL) {
logger.info("Creating a new session with Azkaban");

String username = schedulerData.getParamMap().get(USERNAME);
SchedulerConfigurationData schedulerData = InfoExtractor.getSchedulerData(scheduler);

if (schedulerData.getParamMap().containsKey(PRIVATE_KEY)) {
_workflowClient.login(username, new File(schedulerData.getParamMap().get(PRIVATE_KEY)));
} else if (schedulerData.getParamMap().containsKey(PASSWORD)) {
_workflowClient.login(username, schedulerData.getParamMap().get(PASSWORD));
} else {
throw new RuntimeException("Neither private key nor password was specified");
if (schedulerData == null) {
throw new RuntimeException(String.format("Cannot find scheduler %s for url %s", scheduler));
}

if (!schedulerData.getParamMap().containsKey(USERNAME)) {
throw new RuntimeException(String.format("Cannot find username for login"));
}

String username = schedulerData.getParamMap().get(USERNAME);

if (schedulerData.getParamMap().containsKey(PRIVATE_KEY)) {
workflowClient.login(username, new File(schedulerData.getParamMap().get(PRIVATE_KEY)));
} else if (schedulerData.getParamMap().containsKey(PASSWORD)) {
workflowClient.login(username, schedulerData.getParamMap().get(PASSWORD));
} else {
throw new RuntimeException("Neither private key nor password was specified");
}
workflowClient.setSessionUpdatedTime(_currentTime);
}
return workflowClient;
}

/**
Expand All @@ -70,7 +96,8 @@ public AzkabanJobStatusUtil(String url) {
* @throws URISyntaxException
*/
public Map<String, String> getJobsFromFlow(String execUrl) throws MalformedURLException, URISyntaxException {
_workflowClient.setURL(execUrl);
return _workflowClient.getJobsFromFlow();
AzkabanWorkflowClient workflowClient = getWorkflowClient(execUrl);
workflowClient.setURL(execUrl);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required? Doesn't the previous line already return the updated client for the url.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, first line is only returning the workflowClient, next line is setting the url.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The url seems to be already set when you call getWorkflowClient(execUrl). The explicit setURL in the next line seems redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are talking about this code getWorkflowClientInstance(scheduler, url), then this is only getting called when first time WorkflowClient is created, after that it will always pick up from the map.

return workflowClient.getJobsFromFlow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class AzkabanWorkflowClient implements WorkflowClient {
private String _sessionId;
private String _username;
private String _password;
private long _sessionUpdatedTime = 0;

private String AZKABAN_LOG_OFFSET = "0";
private String AZKABAN_LOG_LENGTH_LIMIT = "9999999"; // limit the log limit to 10 mb
Expand Down Expand Up @@ -175,6 +176,14 @@ public void login(String username, File _privateKey) {
login(username, decodedPwd);
}

public long getSessionUpdatedTime() {
return _sessionUpdatedTime;
}

public void setSessionUpdatedTime(long sessionUpdatedTime) {
_sessionUpdatedTime = sessionUpdatedTime;
}

/**
* Authenticates Dr. Elephant in Azkaban and sets the sessionId
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,10 @@ protected List<TuningJobExecution> getCompletedExecutions(List<TuningJobExecutio

if (_azkabanJobStatusUtil == null) {
logger.info("Initializing AzkabanJobStatusUtil");
_azkabanJobStatusUtil = new AzkabanJobStatusUtil(jobExecution.flowExecution.flowExecId);
_azkabanJobStatusUtil = new AzkabanJobStatusUtil();
}

try {
Thread.sleep(2000);
Map<String, String> jobStatus = _azkabanJobStatusUtil.getJobsFromFlow(jobExecution.flowExecution.flowExecId);
if (jobStatus != null) {
for (Map.Entry<String, String> job : jobStatus.entrySet()) {
Expand Down