-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
openAi-api-sse #4693
Comments
package com.hwj.ai.data.repository
import com.hwj.ai.global.LLM_API_KEY
import com.hwj.ai.global.baseHostUrl
import com.hwj.ai.global.printD
import com.hwj.ai.global.printE
import com.hwj.ai.global.urlChatCompletions
import com.hwj.ai.models.TextCompletionsParam
import com.hwj.ai.models.toJson
import io.ktor.client.HttpClient
import io.ktor.client.plugins.HttpRequestTimeoutException
import io.ktor.client.request.headers
import io.ktor.client.request.post
import io.ktor.client.request.setBody
import io.ktor.client.statement.HttpResponse
import io.ktor.client.statement.bodyAsChannel
import io.ktor.client.utils.DEFAULT_HTTP_POOL_SIZE
import io.ktor.http.ContentType
import io.ktor.http.HttpHeaders
import io.ktor.http.append
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.pool.ByteArrayPool
import io.ktor.utils.io.readAvailable
import io.ktor.utils.io.readUTF8Line
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.withContext
/**
* @author by jason,2025/1/18
* des:大模型数据接口
*/
class LLMRepository(
private val client: HttpClient
) {
fun textCompletionsWithStream(params: TextCompletionsParam): Flow<String> =
callbackFlow {
withContext(Dispatchers.IO) {
var response: HttpResponse? = null
//https://github.com/ktorio/ktor-documentation/blob/3.1.0/codeSnippets/snippets/client-sse/src/main/kotlin/com.example/Application.kt
//直接sse是get请求,这不对呀
try {
response = client.post(baseHostUrl + urlChatCompletions) {
headers {
append(HttpHeaders.ContentType, ContentType.Application.Json)
append(HttpHeaders.Authorization, "Bearer $LLM_API_KEY")
//加入stream输出,每个包以data:开头,不然字符全拼接乱的
append(HttpHeaders.Accept, ContentType.Text.EventStream)
}
setBody(params.toJson())
}
} catch (e: HttpRequestTimeoutException) {
printE("接口超时")
}
val buffer = ByteArray(DEFAULT_HTTP_POOL_SIZE)
response?.bodyAsChannel()?.let { channel :ByteReadChannel->
try {
while (!channel.isClosedForRead) {
val event = channel.readUTF8Line()?.trim()
// printD("event>$event")
event?.takeIf {
it.startsWith("data:")
}?.let {
//这个输出没有问题,而且是拆词数据
val value = lookupDataFromResponseTurbo(it)
// printD("value>$value")
if (value.isNotEmpty()) {
trySend(value)
}
}
}
} finally {
ByteArrayPool.recycle(buffer)
close()
}
}
}
close()
}
private fun lookupDataFromResponse(jsonString: String): String {
val regex = """"text"\s*:\s*"([^"]+)"""".toRegex()
val matchResult = regex.find(jsonString)
if (matchResult != null && matchResult.groupValues.size > 1) {
val extractedText = matchResult.groupValues[1]
return extractedText
.replace("\\n\\n", " ")
.replace("\\n", " ")
}
return " "
}
private fun lookupDataFromResponseTurbo(jsonString: String): String {
val regex = """"content"\s*:\s*"([^"]+)"""".toRegex()
val matchResult = regex.find(jsonString)
if (matchResult != null && matchResult.groupValues.size > 1) {
val extractedText = matchResult.groupValues[1]
return extractedText
.replace("\\n\\n", " ")
.replace("\\n", " ")
}
return " "
}
}
|
Can you please share a self-contained code snippet to reproduce the problem? |
you can try this ,by the wayyou should use inline fun <reified T> HttpStatement.asFlow(format: StringFormat=ktjson):Flow<T> = flow{
val content:ByteReadChannel=this@asFlow.body()
while (!content.isClosedForRead) {
val line = content.readUTF8Line()
try {
if (!line.isNullOrEmpty()) {
val obj=if (line.startsWith("data:")){
//for SSE text/event-stream
//Log.e("SSE", line)
format.decodeFromString<T>(line.substringAfter("data:"))
}else{
//for NdJson application/x-ndjson
format.decodeFromString<T>(line)
}
emit(obj)
}
}catch (e:Exception){
//Log.e("SSE", e.stackTraceToString())
}
}
}
|
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
package com.hwj.ai.data.repository
import com.hwj.ai.global.LLM_API_KEY
import com.hwj.ai.global.baseHostUrl
import com.hwj.ai.global.printD
import com.hwj.ai.global.printE
import com.hwj.ai.global.urlChatCompletions
import com.hwj.ai.models.TextCompletionsParam
import com.hwj.ai.models.toJson
import io.ktor.client.HttpClient
import io.ktor.client.plugins.HttpRequestTimeoutException
import io.ktor.client.request.headers
import io.ktor.client.request.post
import io.ktor.client.request.setBody
import io.ktor.client.statement.HttpResponse
import io.ktor.client.statement.bodyAsChannel
import io.ktor.client.utils.DEFAULT_HTTP_POOL_SIZE
import io.ktor.http.ContentType
import io.ktor.http.HttpHeaders
import io.ktor.http.append
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.pool.ByteArrayPool
import io.ktor.utils.io.readAvailable
import io.ktor.utils.io.readUTF8Line
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.withContext
/**
@author by jason,2025/1/18
des:大模型数据接口
*/
class LLMRepository(
private val client: HttpClient
) {
fun textCompletionsWithStream(params: TextCompletionsParam): Flow =
callbackFlow {
withContext(Dispatchers.IO) {
var response: HttpResponse? = null
//https://github.com/ktorio/ktor-documentation/blob/3.1.0/codeSnippets/snippets/client-sse/src/main/kotlin/com.example/Application.kt
//直接sse是get请求,这不对呀
try {
response = client.post(baseHostUrl + urlChatCompletions) {
headers {
append(HttpHeaders.ContentType, ContentType.Application.Json)
append(HttpHeaders.Authorization, "Bearer $LLM_API_KEY")
//加入stream输出,每个包以data:开头,不然字符全拼接乱的
append(HttpHeaders.Accept, ContentType.Text.EventStream)
}
setBody(params.toJson())
}
} catch (e: HttpRequestTimeoutException) {
printE("接口超时")
}
// printD("event>$event")
event?.takeIf {
it.startsWith("data:")
}?.let {
//这个输出没有问题,而且是拆词数据
val value = lookupDataFromResponseTurbo(it)
// printD("value>$value")
if (value.isNotEmpty()) {
trySend(value)
}
}
}
} finally {
ByteArrayPool.recycle(buffer)
close()
}
}
}
private fun lookupDataFromResponse(jsonString: String): String {
val regex = """"text"\s*:\s*"([^"]+)"""".toRegex()
val matchResult = regex.find(jsonString)
}
private fun lookupDataFromResponseTurbo(jsonString: String): String {
val regex = """"content"\s*:\s*"([^"]+)"""".toRegex()
val matchResult = regex.find(jsonString)
}
}
//my code can work,but it will not be streaming . 数据接口响应一段时间,一次性把数据包都输出了,没有流式打印的效果
The text was updated successfully, but these errors were encountered: