Skip to content

Commit

Permalink
reading assignment from heartbeat and wiring WebsocketMessageConsumer…
Browse files Browse the repository at this point in the history
…Service to initstate
  • Loading branch information
ahmed-ali-55 committed Nov 20, 2023
1 parent 602b3ad commit bb63217
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,5 @@ import net.leanix.vsm.gitlab.broker.connector.adapter.feign.data.GitlabUser
interface GitlabClientProvider {
fun getCurrentUser(): GitlabUser
fun getGroupByFullPath(fullPath: String): GitlabGroup?

fun getVersion(): String
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import net.leanix.vsm.gitlab.broker.connector.adapter.feign.data.DeleteServiceRe
import net.leanix.vsm.gitlab.broker.connector.adapter.feign.data.DoraRequest
import net.leanix.vsm.gitlab.broker.connector.adapter.feign.data.ServiceRequest
import net.leanix.vsm.gitlab.broker.connector.adapter.feign.data.UpdateRunStateRequest
import net.leanix.vsm.gitlab.broker.connector.application.DummyRequest
import net.leanix.vsm.gitlab.broker.connector.domain.GitLabAssignment
import net.leanix.vsm.gitlab.broker.connector.domain.GitLabHeartbeatResponse
import net.leanix.vsm.gitlab.broker.shared.Constants.EVENT_TYPE_HEADER
Expand Down Expand Up @@ -38,6 +39,12 @@ interface VsmClient {
@RequestBody serviceRequest: ServiceRequest,
)

@PostMapping("/v2/services")
fun saveServiceV2(
@RequestHeader(name = EVENT_TYPE_HEADER) eventType: String,
@RequestBody dummyRequest: DummyRequest,
): String?

@PostMapping("/services/bulk")
fun bulkSaveServices(
@RequestHeader(name = EVENT_TYPE_HEADER) eventType: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package net.leanix.vsm.gitlab.broker.connector.adapter.graphql
import com.expediagroup.graphql.client.spring.GraphQLWebClient
import com.expediagroup.graphql.client.types.GraphQLClientRequest
import com.fasterxml.jackson.databind.ObjectMapper
import jakarta.annotation.PostConstruct
import kotlinx.coroutines.runBlocking
import net.leanix.vsm.gitlab.broker.shared.properties.GitLabOnPremProperties
import org.springframework.http.HttpHeaders
Expand All @@ -24,58 +23,10 @@ class BasicGraphQLClient(private val gitLabOnPremProperties: GitLabOnPremPropert
}
)

@PostConstruct
fun dummy() {
execute(
query = """
query AllGroupsQuery(${'$'}group: ID!, ${'$'}pageCount: Int!, ${'$'}cursor: String) {
group(fullPath: ${'$'}group) {
id
name
projects(first: ${'$'}pageCount, after: ${'$'}cursor, includeSubgroups: true) {
pageInfo {
hasNextPage
endCursor
}
nodes {
name
path
id
archived
visibility
topics
webUrl
description
lastActivityAt
languages {
name
share
}
repository {
diskPath
rootRef
}
group {
fullPath
}
}
}
}
}
""".trimIndent(),
variables = mapOf(
"pageCount" to 10,
"cursor" to null,
"group" to "cider"
)
)
}


fun execute(
fun query(
query: String,
variables: Map<String, Any?>
): Any? {
): Pair<Any?, Boolean> {
val pagingNeeded = variables.containsKey("pageCount")
var cursor: String? = null
val resultList = mutableListOf<Any?>()
Expand All @@ -100,13 +51,15 @@ class BasicGraphQLClient(private val gitLabOnPremProperties: GitLabOnPremPropert
.readTree(objectMapper.writeValueAsString(result.data))
.findValue("pageInfo")
hasNextPage = pageInfoNode.get("hasNextPage").booleanValue()
cursor = pageInfoNode.get("hasNextPage").textValue()
cursor = pageInfoNode.get("endCursor").textValue()
}

resultList.add(result.data)
} while (hasNextPage)

return if (pagingNeeded) resultList else resultList.get(0)
return if (pagingNeeded)
resultList to true
else resultList.get(0) to false
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ open class BaseConnectorService {
)
}

fun logInfoMessages(code: String, arguments: Array<Any>, assignment: GitLabAssignment) {
val message = messageSource.getMessage(
code,
arguments,
Locale.ENGLISH,
)
fun logInfoMessages(
code: String? = null,
arguments: Array<Any>,
assignment: GitLabAssignment,
message: String = messageSource.getMessage(code!!, arguments, Locale.ENGLISH)
) {
loggingService.sendAdminLog(
AdminLog(
runId = assignment.runId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package net.leanix.vsm.gitlab.broker.connector.application

import io.github.oshai.kotlinlogging.KotlinLogging
import net.leanix.vsm.gitlab.broker.connector.adapter.feign.VsmClient
import net.leanix.vsm.gitlab.broker.connector.adapter.graphql.BasicGraphQLClient
import net.leanix.vsm.gitlab.broker.connector.domain.EventType
import net.leanix.vsm.gitlab.broker.connector.domain.GitLabAssignment
import net.leanix.vsm.gitlab.broker.logs.domain.LogStatus
import net.leanix.vsm.gitlab.broker.shared.Constants
import org.springframework.stereotype.Component
import java.util.UUID

data class WebSocketMessageData(
val type: String,
val successMessage: String,
val failureMessage: String,
val query: String,
val variables: Map<String, Any?>,
val statusLoggingEnabled: Boolean,
val adminLoggingEnabled: Boolean
)

@Component
class WebsocketMessageConsumerService(
private val basicGraphQLClient: BasicGraphQLClient,
private val vsmClient: VsmClient
) : BaseConnectorService() {

private val logger = KotlinLogging.logger {}

fun consume(
messageData: WebSocketMessageData,
assignment: GitLabAssignment
) {
logger.info { "Processing message of type ${messageData.type}" }
// improvement may want to add some validations here before executing query

logInfoStatusIfNeeded(messageData.statusLoggingEnabled, assignment)
val queryResult = executeQuery(messageData.query, messageData.variables)

queryResult
.getOrNull()
?.let {
if (it.second) {
it.first as List<Any>
} else {
listOf(it.first)
}
}
?.filterNotNull()
?.forEach {
vsmClient.saveServiceV2(
eventType = EventType.STATE.type,
dummyRequest = DummyRequest(
type = messageData.type,
orgName = assignment.connectorConfiguration.orgName,
runId = assignment.runId,
configurationId = assignment.configurationId,
data = it,
source = Constants.GITLAB_ENTERPRISE
)
).also {
logger.info { "response received: $it" }
}
}

performResultLogging(
queryResult,
messageData.statusLoggingEnabled,
messageData.adminLoggingEnabled,
messageData.successMessage,
messageData.failureMessage,
assignment
)
}

private fun logInfoStatusIfNeeded(statusLoggingEnabled: Boolean, assignment: GitLabAssignment) {
if (statusLoggingEnabled) {
logInfoStatus(
assignment = assignment,
status = LogStatus.IN_PROGRESS,
)
}
}

fun executeQuery(
query: String,
variables: Map<String, Any?>
) =
runCatching {
basicGraphQLClient.query(query, variables)
}

private fun performResultLogging(
result: Result<Any?>,
statusLoggingEnabled: Boolean,
adminLoggingEnabled: Boolean,
successMessage: String,
failureMessage: String,
assignment: GitLabAssignment
) {
if (result.isFailure) {
if (statusLoggingEnabled) {
logInfoStatus(
assignment = assignment,
status = LogStatus.FAILED,
message = failureMessage
)
}
} else {
if (adminLoggingEnabled) {
logInfoMessages(arguments = arrayOf(), assignment = assignment, message = successMessage)
}

if (statusLoggingEnabled) {
logInfoStatus(assignment = assignment, status = LogStatus.SUCCESSFUL)
}
}
}

// @PostConstruct
fun dummy(
gitLabAssignment: GitLabAssignment
) {
consume(
WebSocketMessageData(
type = "GET_ALL_REPOS",
statusLoggingEnabled = true,
adminLoggingEnabled = true,
variables = mapOf(
"pageCount" to 10,
"cursor" to null,
"group" to "cider"
),
failureMessage = "",
successMessage = "",
query = """
query AllGroupsQuery(${'$'}group: ID!, ${'$'}pageCount: Int!, ${'$'}cursor: String) {
group(fullPath: ${'$'}group) {
id
name
projects(first: ${'$'}pageCount, after: ${'$'}cursor, includeSubgroups: true) {
pageInfo {
hasNextPage
endCursor
}
nodes {
name
path
id
archived
visibility
topics
webUrl
description
lastActivityAt
languages {
name
share
}
repository {
diskPath
rootRef
}
group {
fullPath
}
}
}
}
}
""".trimIndent()
),
gitLabAssignment,
)
}
}

data class DummyRequest(
val type: String,
val orgName: String,
val runId: UUID,
val configurationId: UUID,
val data: Any,
val source: String
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package net.leanix.vsm.gitlab.broker.connector.runner
import io.github.oshai.kotlinlogging.KotlinLogging
import net.leanix.vsm.gitlab.broker.connector.application.AssignmentService
import net.leanix.vsm.gitlab.broker.connector.application.InitialStateService
import net.leanix.vsm.gitlab.broker.connector.application.WebsocketMessageConsumerService
import net.leanix.vsm.gitlab.broker.shared.cache.AssignmentsCache
import net.leanix.vsm.gitlab.broker.webhook.domain.WebhookService
import org.springframework.beans.factory.annotation.Value
Expand All @@ -16,6 +17,7 @@ class InitialStateRunner(
private val assignmentService: AssignmentService,
private val initialStateService: InitialStateService,
private val webhookService: WebhookService,
private val websocketMessageConsumerService: WebsocketMessageConsumerService
) : ApplicationRunner {

private val logger = KotlinLogging.logger {}
Expand All @@ -29,7 +31,10 @@ class InitialStateRunner(
private fun fetchAssignments() {
runCatching {
assignmentService.getAssignments()?.let {
initialStateService.initState(it)
// initialStateService.initState(it)
it.forEach {
websocketMessageConsumerService.dummy(it)
}
}
}.onSuccess {
logger.info { "Cached ${AssignmentsCache.getAll().size} assignments" }
Expand Down

0 comments on commit bb63217

Please sign in to comment.