Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable email notifier to send emails for health check failures #319

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,50 @@ public void observe(List<ClusterStats> clustersStats)

private void notifyUnhealthyCluster(ClusterStats clusterStats)
{
notifier.sendNotification(format("%s - Cluster unhealthy", clusterStats.clusterId()), clusterStats.toString());
String clusterId = clusterStats.clusterId();
String subject = format("Cluster name '%s' is unhealthy", clusterId);
String content = buildContent(clusterStats);
notifier.sendNotification(subject, content);
}

private void notifyForTooManyQueuedQueries(ClusterStats clusterStats)
{
notifier.sendNotification(format("%s - Too many queued queries", clusterStats.clusterId()), clusterStats.toString());
String clusterId = clusterStats.clusterId();
String subject = format("Cluster name '%s' has too many queued queries", clusterId);
String content = buildContent(clusterStats);
notifier.sendNotification(subject, content);
}

private void notifyForNoWorkers(ClusterStats clusterStats)
{
notifier.sendNotification(format("%s - Number of workers", clusterStats.clusterId()), clusterStats.toString());
String clusterId = clusterStats.clusterId();
String subject = format("Cluster name '%s' has no workers running", clusterId);
String content = buildContent(clusterStats);
notifier.sendNotification(subject, content);
}

private String buildContent(ClusterStats clusterStats)
{
return format("""
Please check below information for the cluster:
Cluster Id : %s
Cluster Health : %s
Routing Group : %s
Number of Worker Nodes : %s
Running Queries : %s
Queued Queries : %s
User Queued Count : %s
Proxy To : %s
External URL : %s
""",
clusterStats.clusterId(),
clusterStats.healthy(),
clusterStats.routingGroup(),
clusterStats.numWorkerNodes(),
clusterStats.runningQueryCount(),
clusterStats.queuedQueryCount(),
clusterStats.userQueuedCount(),
clusterStats.proxyTo(),
clusterStats.externalUrl());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public class NotifierConfiguration
private String smtpPassword;
private String sender;
private List<String> recipients;
private String gatewayInstance = "Trino";
private String customContent;
private boolean enabled;

public NotifierConfiguration() {}

Expand Down Expand Up @@ -107,4 +110,34 @@ public void setRecipients(List<String> recipients)
{
this.recipients = recipients;
}

public String getGatewayInstance()
{
return gatewayInstance;
}

public void setGatewayInstance(String gatewayInstance)
{
this.gatewayInstance = gatewayInstance;
}

public String getCustomContent()
{
return customContent;
}

public void setCustomContent(String customContent)
{
this.customContent = customContent;
}

public boolean isEnabled()
{
return enabled;
}

public void setEnabled(boolean enabled)
{
this.enabled = enabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import io.trino.gateway.ha.clustermonitor.ActiveClusterMonitor;
import io.trino.gateway.ha.clustermonitor.ClusterStatsObserver;
import io.trino.gateway.ha.clustermonitor.HealthCheckObserver;
import io.trino.gateway.ha.clustermonitor.HealthChecker;
import io.trino.gateway.ha.clustermonitor.TrinoClusterStatsObserver;
import io.trino.gateway.ha.config.HaGatewayConfiguration;
import io.trino.gateway.ha.config.MonitorConfiguration;
import io.trino.gateway.ha.config.NotifierConfiguration;
import io.trino.gateway.ha.notifier.EmailNotifier;
import io.trino.gateway.ha.router.BackendStateManager;
import io.trino.gateway.ha.router.RoutingManager;

Expand All @@ -50,10 +53,14 @@ public List<TrinoClusterStatsObserver> getClusterStatsObservers(
RoutingManager mgr,
BackendStateManager backendStateManager)
{
return ImmutableList.<TrinoClusterStatsObserver>builder()
.add(new HealthCheckObserver(mgr))
.add(new ClusterStatsObserver(backendStateManager))
.build();
NotifierConfiguration notifierConfiguration = getConfiguration().getNotifier();
ImmutableList.Builder<TrinoClusterStatsObserver> observerBuilder = ImmutableList.builder();
observerBuilder.add(new HealthCheckObserver(mgr));
observerBuilder.add(new ClusterStatsObserver(backendStateManager));
if (notifierConfiguration.isEnabled()) {
observerBuilder.add(new HealthChecker(new EmailNotifier(notifierConfiguration)));
}
return observerBuilder.build();
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.util.List;
import java.util.Properties;

import static java.lang.String.format;
import static java.util.Objects.requireNonNullElse;

public class EmailNotifier
implements Notifier
{
Expand Down Expand Up @@ -54,10 +57,11 @@ public EmailNotifier(NotifierConfiguration notifierConfiguration)
@Override
public void sendNotification(String subject, String content)
{
content = requireNonNullElse(notifierConfiguration.getCustomContent(), content);
sendNotification(
notifierConfiguration.getSender(),
notifierConfiguration.getRecipients(),
"Trino Error: " + subject,
format("%s error: %s", notifierConfiguration.getGatewayInstance(), subject),
content);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.clustermonitor;

import io.trino.gateway.ha.notifier.Notifier;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.mockito.Mockito;

import java.util.Collections;

import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class TestHealthChecker
{
@Test
public void testNotifyUnhealthyCluster()
{
Notifier notifier = Mockito.mock(Notifier.class);
HealthChecker healthChecker = new HealthChecker(notifier);
ClusterStats clusterStats = mock(ClusterStats.class);
when(clusterStats.healthy()).thenReturn(false);
when(clusterStats.clusterId()).thenReturn("testCluster");

healthChecker.observe(Collections.singletonList(clusterStats));

verify(notifier, times(1)).sendNotification(eq("Cluster name 'testCluster' is unhealthy"), anyString());
}

@Test
public void testNotifyForTooManyQueuedQueries()
{
Notifier notifier = Mockito.mock(Notifier.class);
HealthChecker healthChecker = new HealthChecker(notifier);
ClusterStats clusterStats = mock(ClusterStats.class);
when(clusterStats.healthy()).thenReturn(true);
when(clusterStats.clusterId()).thenReturn("testCluster");
when(clusterStats.queuedQueryCount()).thenReturn(101);

healthChecker.observe(Collections.singletonList(clusterStats));

verify(notifier, times(1)).sendNotification(eq("Cluster name 'testCluster' has too many queued queries"), anyString());
}

@Test
public void testNotifyForNoWorkers()
{
Notifier notifier = Mockito.mock(Notifier.class);
HealthChecker healthChecker = new HealthChecker(notifier);
ClusterStats clusterStats = mock(ClusterStats.class);
when(clusterStats.healthy()).thenReturn(true);
when(clusterStats.clusterId()).thenReturn("testCluster");
when(clusterStats.numWorkerNodes()).thenReturn(0);

healthChecker.observe(Collections.singletonList(clusterStats));

verify(notifier, times(1)).sendNotification(eq("Cluster name 'testCluster' has no workers running"), anyString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.notifier;

import io.trino.gateway.ha.config.NotifierConfiguration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.mockito.Mockito;

import java.util.Arrays;
import java.util.List;

import static org.mockito.Mockito.anyList;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class TestEmailNotifier
{
@Test
public void testSendNotificationCustomContent()
{
NotifierConfiguration notifierConfiguration = new NotifierConfiguration();
List<String> recipients = Arrays.asList("recipient1@example.com", "recipient2@example.com");
notifierConfiguration.setRecipients(recipients);
notifierConfiguration.setCustomContent("Custom content");

EmailNotifier emailNotifier = new EmailNotifier(notifierConfiguration);

emailNotifier.sendNotification("Test Subject", "Test Content");
}

@Test
public void testSendNotificationNoCustomContent()
{
NotifierConfiguration notifierConfiguration = new NotifierConfiguration();
List<String> recipients = Arrays.asList("recipient1@example.com", "recipient2@example.com");
notifierConfiguration.setRecipients(recipients);
notifierConfiguration.setCustomContent(null);

EmailNotifier emailNotifier = new EmailNotifier(notifierConfiguration);

emailNotifier.sendNotification("Test Subject", "Test Content");
}

@Test
public void testSendEmailNotification()
{
NotifierConfiguration notifierConfiguration = new NotifierConfiguration();
List<String> recipients = Arrays.asList("recipient1@example.com", "recipient2@example.com");
notifierConfiguration.setRecipients(recipients);

EmailNotifier emailNotifier = Mockito.mock(EmailNotifier.class);

emailNotifier.sendNotification(anyString(), anyList(), anyString(), anyString());

verify(emailNotifier, times(1)).sendNotification(
anyString(), anyList(), anyString(), anyString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.module;

import io.dropwizard.core.setup.Environment;
import io.trino.gateway.ha.clustermonitor.HealthChecker;
import io.trino.gateway.ha.clustermonitor.TrinoClusterStatsObserver;
import io.trino.gateway.ha.config.HaGatewayConfiguration;
import io.trino.gateway.ha.config.MonitorConfiguration;
import io.trino.gateway.ha.config.NotifierConfiguration;
import io.trino.gateway.ha.module.ClusterStateListenerModule;
import io.trino.gateway.ha.router.BackendStateManager;
import io.trino.gateway.ha.router.RoutingManager;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class TestClusterStateListenerModule
{
@Test
public void testClusterStateListenerModule()
{
HaGatewayConfiguration config = mock(HaGatewayConfiguration.class);
MonitorConfiguration monitorConfig = mock(MonitorConfiguration.class);
NotifierConfiguration notifierConfiguration = mock(NotifierConfiguration.class);
when(config.getMonitor()).thenReturn(monitorConfig);
when(config.getNotifier()).thenReturn(notifierConfiguration);
when(notifierConfiguration.isEnabled()).thenReturn(true);

RoutingManager routingManager = mock(RoutingManager.class);
BackendStateManager backendStateManager = mock(BackendStateManager.class);

ClusterStateListenerModule module = new ClusterStateListenerModule(config, mock(Environment.class));

List<TrinoClusterStatsObserver> observers = module.getClusterStatsObservers(routingManager, backendStateManager);

assertThat(observers).hasSize(3);
assertThat(observers.get(2)).isInstanceOf(HealthChecker.class);
assertThat(module.getMonitorConfiguration()).isEqualTo(monitorConfig);
}
}