/
DestinationMigrationCoordinator.kt
106 lines (91 loc) 路 3.72 KB
/
DestinationMigrationCoordinator.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.alerting.util.destinationmigration
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.ClusterStateListener
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.component.LifecycleListener
import org.opensearch.common.unit.TimeValue
import org.opensearch.threadpool.Scheduler
import org.opensearch.threadpool.ThreadPool
import kotlin.coroutines.CoroutineContext
class DestinationMigrationCoordinator(
private val client: Client,
private val clusterService: ClusterService,
private val threadPool: ThreadPool,
private val scheduledJobIndices: ScheduledJobIndices
) : ClusterStateListener, CoroutineScope, LifecycleListener() {
private val logger = LogManager.getLogger(javaClass)
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + CoroutineName("DestinationMigrationCoordinator")
private var scheduledMigration: Scheduler.Cancellable? = null
@Volatile
private var runningLock = false
init {
clusterService.addListener(this)
clusterService.addLifecycleListener(this)
}
override fun clusterChanged(event: ClusterChangedEvent) {
logger.info("Detected cluster change event for destination migration")
if (DestinationMigrationUtilService.finishFlag) {
logger.info("Reset destination migration process.")
scheduledMigration?.cancel()
DestinationMigrationUtilService.finishFlag = false
}
if (
event.localNodeClusterManager() &&
!runningLock &&
(scheduledMigration == null || scheduledMigration!!.isCancelled)
) {
try {
runningLock = true
initMigrateDestinations()
} finally {
runningLock = false
}
} else if (!event.localNodeClusterManager()) {
scheduledMigration?.cancel()
}
}
private fun initMigrateDestinations() {
if (!scheduledJobIndices.scheduledJobIndexExists()) {
logger.debug("Alerting config index is not initialized")
scheduledMigration?.cancel()
return
}
if (!clusterService.state().nodes().isLocalNodeElectedMaster) {
scheduledMigration?.cancel()
return
}
if (DestinationMigrationUtilService.finishFlag) {
logger.info("Destination migration is already complete, cancelling migration process.")
scheduledMigration?.cancel()
return
}
val scheduledJob = Runnable {
launch {
try {
if (DestinationMigrationUtilService.finishFlag) {
logger.info("Cancel background destination migration process.")
scheduledMigration?.cancel()
}
logger.info("Performing migration of destination data.")
DestinationMigrationUtilService.migrateDestinations(client as NodeClient)
} catch (e: Exception) {
logger.error("Failed to migrate destination data", e)
}
}
}
scheduledMigration = threadPool.scheduleWithFixedDelay(scheduledJob, TimeValue.timeValueMinutes(1), ThreadPool.Names.MANAGEMENT)
}
}