Description
package com.hwj.ai.data.repository
import com.hwj.ai.global.LLM_API_KEY
import com.hwj.ai.global.baseHostUrl
import com.hwj.ai.global.printD
import com.hwj.ai.global.printE
import com.hwj.ai.global.urlChatCompletions
import com.hwj.ai.models.TextCompletionsParam
import com.hwj.ai.models.toJson
import io.ktor.client.HttpClient
import io.ktor.client.plugins.HttpRequestTimeoutException
import io.ktor.client.request.headers
import io.ktor.client.request.post
import io.ktor.client.request.setBody
import io.ktor.client.statement.HttpResponse
import io.ktor.client.statement.bodyAsChannel
import io.ktor.client.utils.DEFAULT_HTTP_POOL_SIZE
import io.ktor.http.ContentType
import io.ktor.http.HttpHeaders
import io.ktor.http.append
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.pool.ByteArrayPool
import io.ktor.utils.io.readAvailable
import io.ktor.utils.io.readUTF8Line
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.withContext
/**
-
@author by jason,2025/1/18
-
des:大模型数据接口
*/
class LLMRepository(
private val client: HttpClient
) {
fun textCompletionsWithStream(params: TextCompletionsParam): Flow =
callbackFlow {
withContext(Dispatchers.IO) {
var response: HttpResponse? = null
//https://github.com/ktorio/ktor-documentation/blob/3.1.0/codeSnippets/snippets/client-sse/src/main/kotlin/com.example/Application.kt
//直接sse是get请求,这不对呀
try {
response = client.post(baseHostUrl + urlChatCompletions) {
headers {
append(HttpHeaders.ContentType, ContentType.Application.Json)
append(HttpHeaders.Authorization, "Bearer $LLM_API_KEY")
//加入stream输出,每个包以data:开头,不然字符全拼接乱的
append(HttpHeaders.Accept, ContentType.Text.EventStream)
}
setBody(params.toJson())
}
} catch (e: HttpRequestTimeoutException) {
printE("接口超时")
}val buffer = ByteArray(DEFAULT_HTTP_POOL_SIZE) response?.bodyAsChannel()?.let { channel :ByteReadChannel-> try { while (!channel.isClosedForRead) { val event = channel.readUTF8Line()?.trim()
// printD("event>$event")
event?.takeIf {
it.startsWith("data:")
}?.let {
//这个输出没有问题,而且是拆词数据
val value = lookupDataFromResponseTurbo(it)
// printD("value>$value")
if (value.isNotEmpty()) {
trySend(value)
}
}
}
} finally {
ByteArrayPool.recycle(buffer)
close()
}
}
}close() }
private fun lookupDataFromResponse(jsonString: String): String {
val regex = """"text"\s*:\s*"([^"]+)"""".toRegex()
val matchResult = regex.find(jsonString)if (matchResult != null && matchResult.groupValues.size > 1) { val extractedText = matchResult.groupValues[1] return extractedText .replace("\\n\\n", " ") .replace("\\n", " ") } return " "
}
private fun lookupDataFromResponseTurbo(jsonString: String): String {
val regex = """"content"\s*:\s*"([^"]+)"""".toRegex()
val matchResult = regex.find(jsonString)if (matchResult != null && matchResult.groupValues.size > 1) { val extractedText = matchResult.groupValues[1] return extractedText .replace("\\n\\n", " ") .replace("\\n", " ") } return " "
}
}
//my code can work,but it will not be streaming . 数据接口响应一段时间,一次性把数据包都输出了,没有流式打印的效果