Skip to content

openAi-api-sse #4693

Open
Open
@1006245347

Description

@1006245347

package com.hwj.ai.data.repository

import com.hwj.ai.global.LLM_API_KEY
import com.hwj.ai.global.baseHostUrl
import com.hwj.ai.global.printD
import com.hwj.ai.global.printE
import com.hwj.ai.global.urlChatCompletions
import com.hwj.ai.models.TextCompletionsParam
import com.hwj.ai.models.toJson
import io.ktor.client.HttpClient
import io.ktor.client.plugins.HttpRequestTimeoutException
import io.ktor.client.request.headers
import io.ktor.client.request.post
import io.ktor.client.request.setBody
import io.ktor.client.statement.HttpResponse
import io.ktor.client.statement.bodyAsChannel
import io.ktor.client.utils.DEFAULT_HTTP_POOL_SIZE
import io.ktor.http.ContentType
import io.ktor.http.HttpHeaders
import io.ktor.http.append
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.pool.ByteArrayPool
import io.ktor.utils.io.readAvailable
import io.ktor.utils.io.readUTF8Line
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.withContext

/**

  • @author by jason,2025/1/18

  • des:大模型数据接口
    */
    class LLMRepository(
    private val client: HttpClient
    ) {
    fun textCompletionsWithStream(params: TextCompletionsParam): Flow =
    callbackFlow {
    withContext(Dispatchers.IO) {
    var response: HttpResponse? = null
    //https://github.com/ktorio/ktor-documentation/blob/3.1.0/codeSnippets/snippets/client-sse/src/main/kotlin/com.example/Application.kt
    //直接sse是get请求,这不对呀
    try {
    response = client.post(baseHostUrl + urlChatCompletions) {
    headers {
    append(HttpHeaders.ContentType, ContentType.Application.Json)
    append(HttpHeaders.Authorization, "Bearer $LLM_API_KEY")
    //加入stream输出,每个包以data:开头,不然字符全拼接乱的
    append(HttpHeaders.Accept, ContentType.Text.EventStream)
    }
    setBody(params.toJson())
    }
    } catch (e: HttpRequestTimeoutException) {
    printE("接口超时")
    }

             val buffer = ByteArray(DEFAULT_HTTP_POOL_SIZE)
    
    
    
              response?.bodyAsChannel()?.let { channel :ByteReadChannel->
    
                    try {
                        while (!channel.isClosedForRead) {
                            val event = channel.readUTF8Line()?.trim()
    

    // printD("event>$event")
    event?.takeIf {
    it.startsWith("data:")
    }?.let {
    //这个输出没有问题,而且是拆词数据
    val value = lookupDataFromResponseTurbo(it)
    // printD("value>$value")
    if (value.isNotEmpty()) {
    trySend(value)
    }
    }
    }
    } finally {
    ByteArrayPool.recycle(buffer)
    close()
    }
    }
    }

         close()
     }
    

    private fun lookupDataFromResponse(jsonString: String): String {
    val regex = """"text"\s*:\s*"([^"]+)"""".toRegex()
    val matchResult = regex.find(jsonString)

     if (matchResult != null && matchResult.groupValues.size > 1) {
         val extractedText = matchResult.groupValues[1]
         return extractedText
             .replace("\\n\\n", " ")
             .replace("\\n", " ")
     }
    
     return " "
    

    }

    private fun lookupDataFromResponseTurbo(jsonString: String): String {
    val regex = """"content"\s*:\s*"([^"]+)"""".toRegex()
    val matchResult = regex.find(jsonString)

     if (matchResult != null && matchResult.groupValues.size > 1) {
         val extractedText = matchResult.groupValues[1]
         return extractedText
             .replace("\\n\\n", " ")
             .replace("\\n", " ")
     }
    
     return " "
    

    }

}

//my code can work,but it will not be streaming . 数据接口响应一段时间,一次性把数据包都输出了,没有流式打印的效果

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions