In [None]:
直接调用本地音频，调用火山平台的流式语音识别API.

In [2]:
#coding=utf-8

"""
requires Python 3.6 or later

pip install asyncio
pip install websockets
"""

import asyncio
import base64
import gzip
import hmac
import json
import logging
import os
import uuid
import wave
from enum import Enum
from hashlib import sha256
from io import BytesIO
from typing import List
from urllib.parse import urlparse
import time
import websockets

# 火山平台API配置
appid = "4166554764"    # 项目的 appid
token = "ggmUTHHMXio-nJlKMkRvqEgkcWyfDK0K"    # 项目的 token
cluster = "volcengine_streaming_common"  # 请求的集群

# 获取当前脚本的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
audio_path = os.path.join(current_dir, "kangqiao.wav")
audio_format = "wav"

print(f"Audio file path: {audio_path}")
print(f"Audio file exists: {os.path.exists(audio_path)}")

PROTOCOL_VERSION = 0b0001
DEFAULT_HEADER_SIZE = 0b0001

PROTOCOL_VERSION_BITS = 4
HEADER_BITS = 4
MESSAGE_TYPE_BITS = 4
MESSAGE_TYPE_SPECIFIC_FLAGS_BITS = 4
MESSAGE_SERIALIZATION_BITS = 4
MESSAGE_COMPRESSION_BITS = 4
RESERVED_BITS = 8

# Message Type:
CLIENT_FULL_REQUEST = 0b0001
CLIENT_AUDIO_ONLY_REQUEST = 0b0010
SERVER_FULL_RESPONSE = 0b1001
SERVER_ACK = 0b1011
SERVER_ERROR_RESPONSE = 0b1111

# Message Type Specific Flags
NO_SEQUENCE = 0b0000  # no check sequence
POS_SEQUENCE = 0b0001
NEG_SEQUENCE = 0b0010
NEG_SEQUENCE_1 = 0b0011

# Message Serialization
NO_SERIALIZATION = 0b0000
JSON = 0b0001
THRIFT = 0b0011
CUSTOM_TYPE = 0b1111

# Message Compression
NO_COMPRESSION = 0b0000
GZIP = 0b0001
CUSTOM_COMPRESSION = 0b1111


def generate_header(
    version=PROTOCOL_VERSION,
    message_type=CLIENT_FULL_REQUEST,
    message_type_specific_flags=NO_SEQUENCE,
    serial_method=JSON,
    compression_type=GZIP,
    reserved_data=0x00,
    extension_header=bytes()
):
    """
    protocol_version(4 bits), header_size(4 bits),
    message_type(4 bits), message_type_specific_flags(4 bits)
    serialization_method(4 bits) message_compression(4 bits)
    reserved （8bits) 保留字段
    header_extensions 扩展头(大小等于 8 * 4 * (header_size - 1) )
    """
    header = bytearray()
    header_size = int(len(extension_header) / 4) + 1
    header.append((version << 4) | header_size)
    header.append((message_type << 4) | message_type_specific_flags)
    header.append((serial_method << 4) | compression_type)
    header.append(reserved_data)
    header.extend(extension_header)
    return header


def generate_full_default_header():
    return generate_header()


def generate_audio_default_header():
    return generate_header(
        message_type=CLIENT_AUDIO_ONLY_REQUEST
    )


def generate_last_audio_default_header():
    return generate_header(
        message_type=CLIENT_AUDIO_ONLY_REQUEST,
        message_type_specific_flags=NEG_SEQUENCE
    )

def parse_response(res):
    """
    protocol_version(4 bits), header_size(4 bits),
    message_type(4 bits), message_type_specific_flags(4 bits)
    serialization_method(4 bits) message_compression(4 bits)
    reserved （8bits) 保留字段
    header_extensions 扩展头(大小等于 8 * 4 * (header_size - 1) )
    payload 类似与http 请求体
    """
    protocol_version = res[0] >> 4
    header_size = res[0] & 0x0f
    message_type = res[1] >> 4
    message_type_specific_flags = res[1] & 0x0f
    serialization_method = res[2] >> 4
    message_compression = res[2] & 0x0f
    reserved = res[3]
    header_extensions = res[4:header_size * 4]
    payload = res[header_size * 4:]
    result = {}
    payload_msg = None
    payload_size = 0
    if message_type == SERVER_FULL_RESPONSE:
        payload_size = int.from_bytes(payload[:4], "big", signed=True)
        payload_msg = payload[4:]
    elif message_type == SERVER_ACK:
        seq = int.from_bytes(payload[:4], "big", signed=True)
        result['seq'] = seq
        if len(payload) >= 8:
            payload_size = int.from_bytes(payload[4:8], "big", signed=False)
            payload_msg = payload[8:]
    elif message_type == SERVER_ERROR_RESPONSE:
        code = int.from_bytes(payload[:4], "big", signed=False)
        result['code'] = code
        payload_size = int.from_bytes(payload[4:8], "big", signed=False)
        payload_msg = payload[8:]
    if payload_msg is None:
        return result
    if message_compression == GZIP:
        payload_msg = gzip.decompress(payload_msg)
    if serialization_method == JSON:
        payload_msg = json.loads(str(payload_msg, "utf-8"))
    elif serialization_method != NO_SERIALIZATION:
        payload_msg = str(payload_msg, "utf-8")
    result['payload_msg'] = payload_msg
    result['payload_size'] = payload_size
    return result


def read_wav_info(data: bytes = None) -> (int, int, int, int, int):
    with BytesIO(data) as _f:
        wave_fp = wave.open(_f, 'rb')
        nchannels, sampwidth, framerate, nframes = wave_fp.getparams()[:4]
        wave_bytes = wave_fp.readframes(nframes)
    return nchannels, sampwidth, framerate, nframes, len(wave_bytes)

class AudioType(Enum):
    LOCAL = 1  # 使用本地音频文件

class AsrWsClient:
    def __init__(self, audio_path, cluster, **kwargs):
        print(f"Initializing AsrWsClient with audio_path: {audio_path}")
        self.audio_path = audio_path
        self.cluster = cluster
        self.success_code = 1000  # success code, default is 1000
        self.seg_duration = int(kwargs.get("seg_duration", 15000))
        self.nbest = int(kwargs.get("nbest", 1))
        self.appid = kwargs.get("appid", "")
        self.token = kwargs.get("token", "")
        self.ws_url = kwargs.get("ws_url", "wss://openspeech.bytedance.com/api/v2/asr")
        self.uid = kwargs.get("uid", "streaming_asr_demo")
        self.workflow = kwargs.get("workflow", "audio_in,resample,partition,vad,fe,decode,itn,nlu_punctuate")
        self.show_language = kwargs.get("show_language", False)
        self.show_utterances = kwargs.get("show_utterances", False)
        self.result_type = kwargs.get("result_type", "full")
        self.format = kwargs.get("format", "wav")
        self.rate = kwargs.get("sample_rate", 16000)
        self.language = kwargs.get("language", "zh-CN")
        self.bits = kwargs.get("bits", 16)
        self.channel = kwargs.get("channel", 1)
        self.codec = kwargs.get("codec", "raw")
        self.audio_type = kwargs.get("audio_type", AudioType.LOCAL)
        self.secret = kwargs.get("secret", "access_secret")
        self.auth_method = kwargs.get("auth_method", "token")
        self.mp3_seg_size = int(kwargs.get("mp3_seg_size", 10000))

    def construct_request(self, reqid):
        req = {
            'app': {
                'appid': self.appid,
                'cluster': self.cluster,
                'token': self.token,
            },
            'user': {
                'uid': self.uid
            },
            'request': {
                'reqid': reqid,
                'nbest': self.nbest,
                'workflow': self.workflow,
                'show_language': self.show_language,
                'show_utterances': self.show_utterances,
                'result_type': self.result_type,
                "sequence": 1
            },
            'audio': {
                'format': self.format,
                'rate': self.rate,
                'language': self.language,
                'bits': self.bits,
                'channel': self.channel,
                'codec': self.codec
            }
        }
        return req

    @staticmethod
    def slice_data(data: bytes, chunk_size: int) -> (list, bool):
        """
        slice data
        :param data: wav data
        :param chunk_size: the segment size in one request
        :return: segment data, last flag
        """
        data_len = len(data)
        offset = 0
        while offset + chunk_size < data_len:
            yield data[offset: offset + chunk_size], False
            offset += chunk_size
        else:
            yield data[offset: data_len], True

    def _real_processor(self, request_params: dict) -> dict:
        pass

    def token_auth(self):
        return {'Authorization': 'Bearer; {}'.format(self.token)}

    def signature_auth(self, data):
        header_dicts = {
            'Custom': 'auth_custom',
        }

        url_parse = urlparse(self.ws_url)
        input_str = 'GET {} HTTP/1.1\n'.format(url_parse.path)
        auth_headers = 'Custom'
        for header in auth_headers.split(','):
            input_str += '{}\n'.format(header_dicts[header])
        input_data = bytearray(input_str, 'utf-8')
        input_data += data
        mac = base64.urlsafe_b64encode(
            hmac.new(self.secret.encode('utf-8'), input_data, digestmod=sha256).digest())
        header_dicts['Authorization'] = 'HMAC256; access_token="{}"; mac="{}"; h="{}"'.format(self.token,
                                                                                              str(mac, 'utf-8'), auth_headers)
        return header_dicts

    async def segment_data_processor(self, wav_data: bytes, segment_size: int):
        reqid = str(uuid.uuid4())
        # 构建 full client request，序列化压缩
        request_params = self.construct_request(reqid)
        payload_bytes = str.encode(json.dumps(request_params))
        payload_bytes = gzip.compress(payload_bytes)
        full_client_request = bytearray(generate_full_default_header())
        full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big'))  # payload size(4 bytes)
        full_client_request.extend(payload_bytes)  # payload
        header = None
        if self.auth_method == "token":
            header = self.token_auth()
        elif self.auth_method == "signature":
            header = self.signature_auth(full_client_request)
        async with websockets.connect(self.ws_url, extra_headers=header, max_size=1000000000) as ws:
            # 发送 full client request
            await ws.send(full_client_request)
            res = await ws.recv()
            result = parse_response(res)
            if 'payload_msg' in result and result['payload_msg']['code'] != self.success_code:
                return result
            for seq, (chunk, last) in enumerate(AsrWsClient.slice_data(wav_data, segment_size), 1):
                # if no compression, comment this line
                payload_bytes = gzip.compress(chunk)
                audio_only_request = bytearray(generate_audio_default_header())
                if last:
                    audio_only_request = bytearray(generate_last_audio_default_header())
                audio_only_request.extend((len(payload_bytes)).to_bytes(4, 'big'))  # payload size(4 bytes)
                audio_only_request.extend(payload_bytes)  # payload
                # 发送 audio-only client request
                await ws.send(audio_only_request)
                res = await ws.recv()
                result = parse_response(res)
                if 'payload_msg' in result and result['payload_msg']['code'] != self.success_code:
                    return result
        return result

    async def execute(self):
        print(f"Executing with audio_path: {self.audio_path}")
        with open(self.audio_path, mode="rb") as _f:
            data = _f.read()
        audio_data = bytes(data)
        if self.format == "mp3":
            segment_size = self.mp3_seg_size
            return await self.segment_data_processor(audio_data, segment_size)
        if self.format != "wav":
            raise Exception("format should in wav or mp3")
        nchannels, sampwidth, framerate, nframes, wav_len = read_wav_info(
            audio_data)
        size_per_sec = nchannels * sampwidth * framerate
        segment_size = int(size_per_sec * self.seg_duration / 1000)
        return await self.segment_data_processor(audio_data, segment_size)


def execute_one(audio_item, cluster, **kwargs):
    """

    :param audio_item: {"id": xxx, "path": "xxx"}
    :param cluster:集群名称
    :return:
    """
    assert 'id' in audio_item
    assert 'path' in audio_item
    audio_id = audio_item['id']
    audio_path = audio_item['path']
    audio_type = AudioType.LOCAL
    asr_http_client = AsrWsClient(
        audio_path=audio_path,  # 确保这里使用了正确的 audio_path
        cluster=cluster,
        audio_type=audio_type,
        **kwargs
    )
    result = asyncio.run(asr_http_client.execute())
    return {"id": audio_id, "path": audio_path, "result": result}

def test_one():
    result = execute_one(
        {
            'id': 1,
            'path': audio_path
        },
        cluster=cluster,
        appid=appid,
        token=token,
        format=audio_format,
    )
    print(result)


if __name__ == '__main__':
    test_one()




RuntimeError: asyncio.run() cannot be called from a running event loop

In [None]:
为以上代码添加录音功能。

In [11]:
"""
requires Python 3.6 or later

pip install asyncio
pip install websockets
"""

import asyncio
import base64
import gzip
import hmac
import json
import logging
import os
import uuid
import wave
from enum import Enum
from hashlib import sha256
from io import BytesIO
from typing import List
from urllib.parse import urlparse
import time
import websockets
from unihiker import Audio
import nest_asyncio
nest_asyncio.apply()

# 火山平台API配置
appid = "4166554764"    # 项目的 appid
token = "ggmUTHHMXio-nJlKMkRvqEgkcWyfDK0K"    # 项目的 token
cluster = "volcengine_streaming_common"  # 请求的集群

# 音频采集设置
RECORD_DURATION = 6  # 录音时长(秒)
TEMP_AUDIO_FILE = "temp_recording.wav"  # 临时音频文件名

PROTOCOL_VERSION = 0b0001
DEFAULT_HEADER_SIZE = 0b0001

PROTOCOL_VERSION_BITS = 4
HEADER_BITS = 4
MESSAGE_TYPE_BITS = 4
MESSAGE_TYPE_SPECIFIC_FLAGS_BITS = 4
MESSAGE_SERIALIZATION_BITS = 4
MESSAGE_COMPRESSION_BITS = 4
RESERVED_BITS = 8

# Message Type:
CLIENT_FULL_REQUEST = 0b0001
CLIENT_AUDIO_ONLY_REQUEST = 0b0010
SERVER_FULL_RESPONSE = 0b1001
SERVER_ACK = 0b1011
SERVER_ERROR_RESPONSE = 0b1111

# Message Type Specific Flags
NO_SEQUENCE = 0b0000  # no check sequence
POS_SEQUENCE = 0b0001
NEG_SEQUENCE = 0b0010
NEG_SEQUENCE_1 = 0b0011

# Message Serialization
NO_SERIALIZATION = 0b0000
JSON = 0b0001
THRIFT = 0b0011
CUSTOM_TYPE = 0b1111

# Message Compression
NO_COMPRESSION = 0b0000
GZIP = 0b0001
CUSTOM_COMPRESSION = 0b1111


def generate_header(
    version=PROTOCOL_VERSION,
    message_type=CLIENT_FULL_REQUEST,
    message_type_specific_flags=NO_SEQUENCE,
    serial_method=JSON,
    compression_type=GZIP,
    reserved_data=0x00,
    extension_header=bytes()
):
    """
    protocol_version(4 bits), header_size(4 bits),
    message_type(4 bits), message_type_specific_flags(4 bits)
    serialization_method(4 bits) message_compression(4 bits)
    reserved （8bits) 保留字段
    header_extensions 扩展头(大小等于 8 * 4 * (header_size - 1) )
    """
    header = bytearray()
    header_size = int(len(extension_header) / 4) + 1
    header.append((version << 4) | header_size)
    header.append((message_type << 4) | message_type_specific_flags)
    header.append((serial_method << 4) | compression_type)
    header.append(reserved_data)
    header.extend(extension_header)
    return header


def generate_full_default_header():
    return generate_header()


def generate_audio_default_header():
    return generate_header(
        message_type=CLIENT_AUDIO_ONLY_REQUEST
    )


def generate_last_audio_default_header():
    return generate_header(
        message_type=CLIENT_AUDIO_ONLY_REQUEST,
        message_type_specific_flags=NEG_SEQUENCE
    )

def parse_response(res):
    """
    protocol_version(4 bits), header_size(4 bits),
    message_type(4 bits), message_type_specific_flags(4 bits)
    serialization_method(4 bits) message_compression(4 bits)
    reserved （8bits) 保留字段
    header_extensions 扩展头(大小等于 8 * 4 * (header_size - 1) )
    payload 类似与http 请求体
    """
    protocol_version = res[0] >> 4
    header_size = res[0] & 0x0f
    message_type = res[1] >> 4
    message_type_specific_flags = res[1] & 0x0f
    serialization_method = res[2] >> 4
    message_compression = res[2] & 0x0f
    reserved = res[3]
    header_extensions = res[4:header_size * 4]
    payload = res[header_size * 4:]
    result = {}
    payload_msg = None
    payload_size = 0
    if message_type == SERVER_FULL_RESPONSE:
        payload_size = int.from_bytes(payload[:4], "big", signed=True)
        payload_msg = payload[4:]
    elif message_type == SERVER_ACK:
        seq = int.from_bytes(payload[:4], "big", signed=True)
        result['seq'] = seq
        if len(payload) >= 8:
            payload_size = int.from_bytes(payload[4:8], "big", signed=False)
            payload_msg = payload[8:]
    elif message_type == SERVER_ERROR_RESPONSE:
        code = int.from_bytes(payload[:4], "big", signed=False)
        result['code'] = code
        payload_size = int.from_bytes(payload[4:8], "big", signed=False)
        payload_msg = payload[8:]
    if payload_msg is None:
        return result
    if message_compression == GZIP:
        payload_msg = gzip.decompress(payload_msg)
    if serialization_method == JSON:
        payload_msg = json.loads(str(payload_msg, "utf-8"))
    elif serialization_method != NO_SERIALIZATION:
        payload_msg = str(payload_msg, "utf-8")
    result['payload_msg'] = payload_msg
    result['payload_size'] = payload_size
    return result


def read_wav_info(data: bytes = None) -> (int, int, int, int, int):
    with BytesIO(data) as _f:
        wave_fp = wave.open(_f, 'rb')
        nchannels, sampwidth, framerate, nframes = wave_fp.getparams()[:4]
        wave_bytes = wave_fp.readframes(nframes)
    return nchannels, sampwidth, framerate, nframes, len(wave_bytes)

class AudioType(Enum):
    LOCAL = 1  # 使用本地音频文件

class AsrWsClient:
    def __init__(self, audio_path, cluster, **kwargs):
        print(f"Initializing AsrWsClient with audio_path: {audio_path}")
        self.audio_path = audio_path
        self.cluster = cluster
        self.success_code = 1000  # success code, default is 1000
        self.seg_duration = int(kwargs.get("seg_duration", 15000))
        self.nbest = int(kwargs.get("nbest", 1))
        self.appid = kwargs.get("appid", "")
        self.token = kwargs.get("token", "")
        self.ws_url = kwargs.get("ws_url", "wss://openspeech.bytedance.com/api/v2/asr")
        self.uid = kwargs.get("uid", "streaming_asr_demo")
        self.workflow = kwargs.get("workflow", "audio_in,resample,partition,vad,fe,decode,itn,nlu_punctuate")
        self.show_language = kwargs.get("show_language", False)
        self.show_utterances = kwargs.get("show_utterances", False)
        self.result_type = kwargs.get("result_type", "full")
        self.format = kwargs.get("format", "wav")
        self.rate = kwargs.get("sample_rate", 16000)
        self.language = kwargs.get("language", "zh-CN")
        self.bits = kwargs.get("bits", 16)
        self.channel = kwargs.get("channel", 1)
        self.codec = kwargs.get("codec", "raw")
        self.audio_type = kwargs.get("audio_type", AudioType.LOCAL)
        self.secret = kwargs.get("secret", "access_secret")
        self.auth_method = kwargs.get("auth_method", "token")
        self.mp3_seg_size = int(kwargs.get("mp3_seg_size", 10000))

    def construct_request(self, reqid):
        req = {
            'app': {
                'appid': self.appid,
                'cluster': self.cluster,
                'token': self.token,
            },
            'user': {
                'uid': self.uid
            },
            'request': {
                'reqid': reqid,
                'nbest': self.nbest,
                'workflow': self.workflow,
                'show_language': self.show_language,
                'show_utterances': self.show_utterances,
                'result_type': self.result_type,
                "sequence": 1
            },
            'audio': {
                'format': self.format,
                'rate': self.rate,
                'language': self.language,
                'bits': self.bits,
                'channel': self.channel,
                'codec': self.codec
            }
        }
        return req

    @staticmethod
    def slice_data(data: bytes, chunk_size: int) -> (list, bool):
        """
        slice data
        :param data: wav data
        :param chunk_size: the segment size in one request
        :return: segment data, last flag
        """
        data_len = len(data)
        offset = 0
        while offset + chunk_size < data_len:
            yield data[offset: offset + chunk_size], False
            offset += chunk_size
        else:
            yield data[offset: data_len], True

    def _real_processor(self, request_params: dict) -> dict:
        pass

    def token_auth(self):
        return {'Authorization': 'Bearer; {}'.format(self.token)}

    def signature_auth(self, data):
        header_dicts = {
            'Custom': 'auth_custom',
        }

        url_parse = urlparse(self.ws_url)
        input_str = 'GET {} HTTP/1.1\n'.format(url_parse.path)
        auth_headers = 'Custom'
        for header in auth_headers.split(','):
            input_str += '{}\n'.format(header_dicts[header])
        input_data = bytearray(input_str, 'utf-8')
        input_data += data
        mac = base64.urlsafe_b64encode(
            hmac.new(self.secret.encode('utf-8'), input_data, digestmod=sha256).digest())
        header_dicts['Authorization'] = 'HMAC256; access_token="{}"; mac="{}"; h="{}"'.format(self.token,
                                                                                              str(mac, 'utf-8'), auth_headers)
        return header_dicts

    async def segment_data_processor(self, wav_data: bytes, segment_size: int):
        reqid = str(uuid.uuid4())
        # 构建 full client request，序列化压缩
        request_params = self.construct_request(reqid)
        payload_bytes = str.encode(json.dumps(request_params))
        payload_bytes = gzip.compress(payload_bytes)
        full_client_request = bytearray(generate_full_default_header())
        full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big'))  # payload size(4 bytes)
        full_client_request.extend(payload_bytes)  # payload
        header = None
        if self.auth_method == "token":
            header = self.token_auth()
        elif self.auth_method == "signature":
            header = self.signature_auth(full_client_request)
        async with websockets.connect(self.ws_url, extra_headers=header, max_size=1000000000) as ws:
            # 发送 full client request
            await ws.send(full_client_request)
            res = await ws.recv()
            result = parse_response(res)
            if 'payload_msg' in result and result['payload_msg']['code'] != self.success_code:
                return result
            for seq, (chunk, last) in enumerate(AsrWsClient.slice_data(wav_data, segment_size), 1):
                # if no compression, comment this line
                payload_bytes = gzip.compress(chunk)
                audio_only_request = bytearray(generate_audio_default_header())
                if last:
                    audio_only_request = bytearray(generate_last_audio_default_header())
                audio_only_request.extend((len(payload_bytes)).to_bytes(4, 'big'))  # payload size(4 bytes)
                audio_only_request.extend(payload_bytes)  # payload
                # 发送 audio-only client request
                await ws.send(audio_only_request)
                res = await ws.recv()
                result = parse_response(res)
                if 'payload_msg' in result and result['payload_msg']['code'] != self.success_code:
                    return result
        return result

    async def execute(self):
        print(f"Executing with audio_path: {self.audio_path}")
        with open(self.audio_path, mode="rb") as _f:
            data = _f.read()
        audio_data = bytes(data)
        if self.format == "mp3":
            segment_size = self.mp3_seg_size
            return await self.segment_data_processor(audio_data, segment_size)
        if self.format != "wav":
            raise Exception("format should in wav or mp3")
        nchannels, sampwidth, framerate, nframes, wav_len = read_wav_info(
            audio_data)
        size_per_sec = nchannels * sampwidth * framerate
        segment_size = int(size_per_sec * self.seg_duration / 1000)
        return await self.segment_data_processor(audio_data, segment_size)


def execute_one(audio_item, cluster, **kwargs):
    """

    :param audio_item: {"id": xxx, "path": "xxx"}
    :param cluster:集群名称
    :return:
    """
    assert 'id' in audio_item
    assert 'path' in audio_item
    audio_id = audio_item['id']
    audio_path = audio_item['path']
    audio_type = AudioType.LOCAL
    asr_http_client = AsrWsClient(
        audio_path=audio_path,  # 确保这里使用了正确的 audio_path
        cluster=cluster,
        audio_type=audio_type,
        **kwargs
    )
    result = asyncio.run(asr_http_client.execute())
    return {"id": audio_id, "path": audio_path, "result": result}

def test_one():
    result = execute_one(
        {
            'id': 1,
            'path': audio_path
        },
        cluster=cluster,
        appid=appid,
        token=token,
        format=audio_format,
    )
    print(result)

def record_audio():
    """使用麦克风录制音频"""
    print("开始录音...")
    audio = Audio()
    
    try:
        # 显示录音倒计时
        for i in range(3, 0, -1):
            print(f"录音将在 {i} 秒后开始...")
            time.sleep(1)
        
        # 开始录音
        print(f"正在录音，持续 {RECORD_DURATION} 秒...")
        audio.record(TEMP_AUDIO_FILE, RECORD_DURATION)
        time.sleep(RECORD_DURATION + 0.5)  # 等待录音完成，多等待0.5秒确保文件保存
        
        # 检查文件是否成功创建
        if os.path.exists(TEMP_AUDIO_FILE):
            print(f"录音完成，文件已保存为: {TEMP_AUDIO_FILE}")
            return TEMP_AUDIO_FILE
        else:
            print("录音文件创建失败")
            return None
            
    except Exception as e:
        print(f"录音过程中发生错误: {e}")
        return None

async def process_audio(audio_file):
    """处理录制的音频文件"""
    print(f"开始处理音频文件: {audio_file}")
    
    try:
        result = execute_one(
            {
                'id': 1,
                'path': audio_file
            },
            cluster=cluster,
            appid=appid,
            token=token,
            format="wav",
        )
        
        print("Raw result:", result)
        
        if isinstance(result, dict):
            if 'result' in result and isinstance(result['result'], dict):
                payload_msg = result['result'].get('payload_msg', {})
                if isinstance(payload_msg, dict):
                    if payload_msg.get('code') == 1000:
                        # 处理结果列表
                        results = payload_msg.get('result', [])
                        if results and isinstance(results, list):
                            # 合并所有识别结果
                            texts = [item.get('text', '') for item in results if isinstance(item, dict)]
                            full_text = ''.join(texts)
                            print("\n识别结果:")
                            print(full_text)
                            
                            # 如果需要，也可以显示每段的置信度
                            for i, item in enumerate(results, 1):
                                if isinstance(item, dict):
                                    confidence = item.get('confidence', 0)
                                    print(f"第{i}段置信度: {confidence}")
                        else:
                            print("未获取到识别文本")
                    else:
                        print(f"\n识别失败，错误码: {payload_msg.get('code')}")
                        print(f"错误信息: {payload_msg.get('message', '无错误信息')}")
                else:
                    print("无法解析 payload_msg")
            else:
                print("无法获取识别结果")
                print("完整返回数据:", result)
        else:
            print("返回数据格式错误")
            print("返回数据:", result)
            
    except Exception as e:
        print(f"处理音频时发生错误: {str(e)}")
        print("错误类型:", type(e))
        import traceback
        print("错误堆栈:", traceback.format_exc())
    finally:
        # 清理临时音频文件
        try:
            if os.path.exists(audio_file):
                os.remove(audio_file)
                print("临时音频文件已删除")
        except Exception as e:
            print(f"删除临时文件时发生错误: {e}")

async def single_recognition():
    """执行一次录音识别"""
    print("开始录音...")
    audio = Audio()
    audio_file = "temp_recording.wav"
    
    try:
        # 倒计时
        for i in range(3, 0, -1):
            print(f"录音将在 {i} 秒后开始...")
            await asyncio.sleep(1)
        
        # 开始录音
        print(f"正在录音，持续 6 秒...")
        audio.record(audio_file, 6)
        await asyncio.sleep(6.5)  # 等待录音完成
        
        if os.path.exists(audio_file):
            print(f"录音完成，文件已保存")
            # 处理音频
            await process_audio(audio_file)
        else:
            print("录音失败")
    except Exception as e:
        print(f"录音过程中发生错误: {e}")

# 在 Jupyter 中使用这个函数来执行一次识别
async def run_once():
    """执行一次语音识别"""
    await single_recognition()

# 在 Jupyter 中这样使用：
await run_once()


开始录音...
录音将在 3 秒后开始...
录音将在 2 秒后开始...
录音将在 1 秒后开始...
正在录音，持续 6 秒...
录音完成，文件已保存
开始处理音频文件: temp_recording.wav
Initializing AsrWsClient with audio_path: temp_recording.wav
Executing with audio_path: temp_recording.wav
Raw result: {'id': 1, 'path': 'temp_recording.wav', 'result': {'payload_msg': {'addition': {'duration': '5632', 'logid': '2024102510542047FFF69A9234E524AB9C', 'split_time': '[]'}, 'code': 1000, 'message': 'Success', 'reqid': '0a75e06a-cfe7-4b7b-90b5-f1d49e2af2e1', 'result': [{'confidence': 0, 'text': '我走了。 正如我轻轻的。'}], 'sequence': -2}, 'payload_size': 252}}

识别结果:
我走了。 正如我轻轻的。
第1段置信度: 0
临时音频文件已删除


In [None]:
上面的代码是定时录音的，为上面的代码点击屏幕录音，点击屏幕结束录音的功能。

In [15]:
"""
requires Python 3.6 or later

pip install asyncio
pip install websockets
"""

import asyncio
import base64
import gzip
import hmac
import json
import logging
import os
import uuid
import wave
from enum import Enum
from hashlib import sha256
from io import BytesIO
from typing import List
from urllib.parse import urlparse
import time
import websockets
from unihiker import Audio, GUI
import nest_asyncio
nest_asyncio.apply()
import threading

# 火山平台API配置
appid = "4166554764"    # 项目的 appid
token = "ggmUTHHMXio-nJlKMkRvqEgkcWyfDK0K"    # 项目的 token
cluster = "volcengine_streaming_common"  # 请求的集群

# 音频采集设置
RECORD_DURATION = 6  # 录音时长(秒)
TEMP_AUDIO_FILE = "temp_recording.wav"  # 临时音频文件名

PROTOCOL_VERSION = 0b0001
DEFAULT_HEADER_SIZE = 0b0001

PROTOCOL_VERSION_BITS = 4
HEADER_BITS = 4
MESSAGE_TYPE_BITS = 4
MESSAGE_TYPE_SPECIFIC_FLAGS_BITS = 4
MESSAGE_SERIALIZATION_BITS = 4
MESSAGE_COMPRESSION_BITS = 4
RESERVED_BITS = 8

# Message Type:
CLIENT_FULL_REQUEST = 0b0001
CLIENT_AUDIO_ONLY_REQUEST = 0b0010
SERVER_FULL_RESPONSE = 0b1001
SERVER_ACK = 0b1011
SERVER_ERROR_RESPONSE = 0b1111

# Message Type Specific Flags
NO_SEQUENCE = 0b0000  # no check sequence
POS_SEQUENCE = 0b0001
NEG_SEQUENCE = 0b0010
NEG_SEQUENCE_1 = 0b0011

# Message Serialization
NO_SERIALIZATION = 0b0000
JSON = 0b0001
THRIFT = 0b0011
CUSTOM_TYPE = 0b1111

# Message Compression
NO_COMPRESSION = 0b0000
GZIP = 0b0001
CUSTOM_COMPRESSION = 0b1111


def generate_header(
    version=PROTOCOL_VERSION,
    message_type=CLIENT_FULL_REQUEST,
    message_type_specific_flags=NO_SEQUENCE,
    serial_method=JSON,
    compression_type=GZIP,
    reserved_data=0x00,
    extension_header=bytes()
):
    """
    protocol_version(4 bits), header_size(4 bits),
    message_type(4 bits), message_type_specific_flags(4 bits)
    serialization_method(4 bits) message_compression(4 bits)
    reserved （8bits) 保留字段
    header_extensions 扩展头(大小等于 8 * 4 * (header_size - 1) )
    """
    header = bytearray()
    header_size = int(len(extension_header) / 4) + 1
    header.append((version << 4) | header_size)
    header.append((message_type << 4) | message_type_specific_flags)
    header.append((serial_method << 4) | compression_type)
    header.append(reserved_data)
    header.extend(extension_header)
    return header


def generate_full_default_header():
    return generate_header()


def generate_audio_default_header():
    return generate_header(
        message_type=CLIENT_AUDIO_ONLY_REQUEST
    )


def generate_last_audio_default_header():
    return generate_header(
        message_type=CLIENT_AUDIO_ONLY_REQUEST,
        message_type_specific_flags=NEG_SEQUENCE
    )

def parse_response(res):
    """
    protocol_version(4 bits), header_size(4 bits),
    message_type(4 bits), message_type_specific_flags(4 bits)
    serialization_method(4 bits) message_compression(4 bits)
    reserved （8bits) 保留字段
    header_extensions 扩展头(大小等于 8 * 4 * (header_size - 1) )
    payload 类似与http 请求体
    """
    protocol_version = res[0] >> 4
    header_size = res[0] & 0x0f
    message_type = res[1] >> 4
    message_type_specific_flags = res[1] & 0x0f
    serialization_method = res[2] >> 4
    message_compression = res[2] & 0x0f
    reserved = res[3]
    header_extensions = res[4:header_size * 4]
    payload = res[header_size * 4:]
    result = {}
    payload_msg = None
    payload_size = 0
    if message_type == SERVER_FULL_RESPONSE:
        payload_size = int.from_bytes(payload[:4], "big", signed=True)
        payload_msg = payload[4:]
    elif message_type == SERVER_ACK:
        seq = int.from_bytes(payload[:4], "big", signed=True)
        result['seq'] = seq
        if len(payload) >= 8:
            payload_size = int.from_bytes(payload[4:8], "big", signed=False)
            payload_msg = payload[8:]
    elif message_type == SERVER_ERROR_RESPONSE:
        code = int.from_bytes(payload[:4], "big", signed=False)
        result['code'] = code
        payload_size = int.from_bytes(payload[4:8], "big", signed=False)
        payload_msg = payload[8:]
    if payload_msg is None:
        return result
    if message_compression == GZIP:
        payload_msg = gzip.decompress(payload_msg)
    if serialization_method == JSON:
        payload_msg = json.loads(str(payload_msg, "utf-8"))
    elif serialization_method != NO_SERIALIZATION:
        payload_msg = str(payload_msg, "utf-8")
    result['payload_msg'] = payload_msg
    result['payload_size'] = payload_size
    return result


def read_wav_info(data: bytes = None) -> (int, int, int, int, int):
    with BytesIO(data) as _f:
        wave_fp = wave.open(_f, 'rb')
        nchannels, sampwidth, framerate, nframes = wave_fp.getparams()[:4]
        wave_bytes = wave_fp.readframes(nframes)
    return nchannels, sampwidth, framerate, nframes, len(wave_bytes)

class AudioType(Enum):
    LOCAL = 1  # 使用本地音频文件

class AsrWsClient:
    def __init__(self, audio_path, cluster, **kwargs):
        print(f"Initializing AsrWsClient with audio_path: {audio_path}")
        self.audio_path = audio_path
        self.cluster = cluster
        self.success_code = 1000  # success code, default is 1000
        self.seg_duration = int(kwargs.get("seg_duration", 15000))
        self.nbest = int(kwargs.get("nbest", 1))
        self.appid = kwargs.get("appid", "")
        self.token = kwargs.get("token", "")
        self.ws_url = kwargs.get("ws_url", "wss://openspeech.bytedance.com/api/v2/asr")
        self.uid = kwargs.get("uid", "streaming_asr_demo")
        self.workflow = kwargs.get("workflow", "audio_in,resample,partition,vad,fe,decode,itn,nlu_punctuate")
        self.show_language = kwargs.get("show_language", False)
        self.show_utterances = kwargs.get("show_utterances", False)
        self.result_type = kwargs.get("result_type", "full")
        self.format = kwargs.get("format", "wav")
        self.rate = kwargs.get("sample_rate", 16000)
        self.language = kwargs.get("language", "zh-CN")
        self.bits = kwargs.get("bits", 16)
        self.channel = kwargs.get("channel", 1)
        self.codec = kwargs.get("codec", "raw")
        self.audio_type = kwargs.get("audio_type", AudioType.LOCAL)
        self.secret = kwargs.get("secret", "access_secret")
        self.auth_method = kwargs.get("auth_method", "token")
        self.mp3_seg_size = int(kwargs.get("mp3_seg_size", 10000))

    def construct_request(self, reqid):
        req = {
            'app': {
                'appid': self.appid,
                'cluster': self.cluster,
                'token': self.token,
            },
            'user': {
                'uid': self.uid
            },
            'request': {
                'reqid': reqid,
                'nbest': self.nbest,
                'workflow': self.workflow,
                'show_language': self.show_language,
                'show_utterances': self.show_utterances,
                'result_type': self.result_type,
                "sequence": 1
            },
            'audio': {
                'format': self.format,
                'rate': self.rate,
                'language': self.language,
                'bits': self.bits,
                'channel': self.channel,
                'codec': self.codec
            }
        }
        return req

    @staticmethod
    def slice_data(data: bytes, chunk_size: int) -> (list, bool):
        """
        slice data
        :param data: wav data
        :param chunk_size: the segment size in one request
        :return: segment data, last flag
        """
        data_len = len(data)
        offset = 0
        while offset + chunk_size < data_len:
            yield data[offset: offset + chunk_size], False
            offset += chunk_size
        else:
            yield data[offset: data_len], True

    def _real_processor(self, request_params: dict) -> dict:
        pass

    def token_auth(self):
        return {'Authorization': 'Bearer; {}'.format(self.token)}

    def signature_auth(self, data):
        header_dicts = {
            'Custom': 'auth_custom',
        }

        url_parse = urlparse(self.ws_url)
        input_str = 'GET {} HTTP/1.1\n'.format(url_parse.path)
        auth_headers = 'Custom'
        for header in auth_headers.split(','):
            input_str += '{}\n'.format(header_dicts[header])
        input_data = bytearray(input_str, 'utf-8')
        input_data += data
        mac = base64.urlsafe_b64encode(
            hmac.new(self.secret.encode('utf-8'), input_data, digestmod=sha256).digest())
        header_dicts['Authorization'] = 'HMAC256; access_token="{}"; mac="{}"; h="{}"'.format(self.token,
                                                                                              str(mac, 'utf-8'), auth_headers)
        return header_dicts

    async def segment_data_processor(self, wav_data: bytes, segment_size: int):
        reqid = str(uuid.uuid4())
        # 构建 full client request，序列化压缩
        request_params = self.construct_request(reqid)
        payload_bytes = str.encode(json.dumps(request_params))
        payload_bytes = gzip.compress(payload_bytes)
        full_client_request = bytearray(generate_full_default_header())
        full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big'))  # payload size(4 bytes)
        full_client_request.extend(payload_bytes)  # payload
        header = None
        if self.auth_method == "token":
            header = self.token_auth()
        elif self.auth_method == "signature":
            header = self.signature_auth(full_client_request)
        async with websockets.connect(self.ws_url, extra_headers=header, max_size=1000000000) as ws:
            # 发送 full client request
            await ws.send(full_client_request)
            res = await ws.recv()
            result = parse_response(res)
            if 'payload_msg' in result and result['payload_msg']['code'] != self.success_code:
                return result
            for seq, (chunk, last) in enumerate(AsrWsClient.slice_data(wav_data, segment_size), 1):
                # if no compression, comment this line
                payload_bytes = gzip.compress(chunk)
                audio_only_request = bytearray(generate_audio_default_header())
                if last:
                    audio_only_request = bytearray(generate_last_audio_default_header())
                audio_only_request.extend((len(payload_bytes)).to_bytes(4, 'big'))  # payload size(4 bytes)
                audio_only_request.extend(payload_bytes)  # payload
                # 发送 audio-only client request
                await ws.send(audio_only_request)
                res = await ws.recv()
                result = parse_response(res)
                if 'payload_msg' in result and result['payload_msg']['code'] != self.success_code:
                    return result
        return result

    async def execute(self):
        print(f"Executing with audio_path: {self.audio_path}")
        with open(self.audio_path, mode="rb") as _f:
            data = _f.read()
        audio_data = bytes(data)
        if self.format == "mp3":
            segment_size = self.mp3_seg_size
            return await self.segment_data_processor(audio_data, segment_size)
        if self.format != "wav":
            raise Exception("format should in wav or mp3")
        nchannels, sampwidth, framerate, nframes, wav_len = read_wav_info(
            audio_data)
        size_per_sec = nchannels * sampwidth * framerate
        segment_size = int(size_per_sec * self.seg_duration / 1000)
        return await self.segment_data_processor(audio_data, segment_size)


def execute_one(audio_item, cluster, **kwargs):
    """

    :param audio_item: {"id": xxx, "path": "xxx"}
    :param cluster:集群名称
    :return:
    """
    assert 'id' in audio_item
    assert 'path' in audio_item
    audio_id = audio_item['id']
    audio_path = audio_item['path']
    audio_type = AudioType.LOCAL
    asr_http_client = AsrWsClient(
        audio_path=audio_path,  # 确保这里使用了正确的 audio_path
        cluster=cluster,
        audio_type=audio_type,
        **kwargs
    )
    result = asyncio.run(asr_http_client.execute())
    return {"id": audio_id, "path": audio_path, "result": result}

def test_one():
    result = execute_one(
        {
            'id': 1,
            'path': audio_path
        },
        cluster=cluster,
        appid=appid,
        token=token,
        format=audio_format,
    )
    print(result)

def record_audio():
    """使用麦克风录制音频"""
    print("开始录音...")
    audio = Audio()
    
    try:
        # 显示录音倒计时
        for i in range(3, 0, -1):
            print(f"录音将在 {i} 秒后开始...")
            time.sleep(1)
        
        # 开始录音
        print(f"正在录音，持续 {RECORD_DURATION} 秒...")
        audio.record(TEMP_AUDIO_FILE, RECORD_DURATION)
        time.sleep(RECORD_DURATION + 0.5)  # 等待录音完成，多等待0.5秒确保文件保存
        
        # 检查文件是否成功创建
        if os.path.exists(TEMP_AUDIO_FILE):
            print(f"录音完成，文件已保存为: {TEMP_AUDIO_FILE}")
            return TEMP_AUDIO_FILE
        else:
            print("录音文件创建失败")
            return None
            
    except Exception as e:
        print(f"录音过程中发生错误: {e}")
        return None

async def process_audio(audio_file):
    """处理录制的音频文件"""
    print(f"开始处理音频文件: {audio_file}")
    
    try:
        result = execute_one(
            {
                'id': 1,
                'path': audio_file
            },
            cluster=cluster,
            appid=appid,
            token=token,
            format="wav",
        )
        
        print("Raw result:", result)
        
        if isinstance(result, dict):
            if 'result' in result and isinstance(result['result'], dict):
                payload_msg = result['result'].get('payload_msg', {})
                if isinstance(payload_msg, dict):
                    if payload_msg.get('code') == 1000:
                        # 处理结果列表
                        results = payload_msg.get('result', [])
                        if results and isinstance(results, list):
                            # 合并所有识别结果
                            texts = [item.get('text', '') for item in results if isinstance(item, dict)]
                            full_text = ''.join(texts)
                            print("\n识别结果:")
                            print(full_text)
                            
                            # 如果需要，也可以显示每段的置信度
                            for i, item in enumerate(results, 1):
                                if isinstance(item, dict):
                                    confidence = item.get('confidence', 0)
                                    print(f"第{i}段置信度: {confidence}")
                        else:
                            print("未获取到识别文本")
                    else:
                        print(f"\n识别失败，错误码: {payload_msg.get('code')}")
                        print(f"错误信息: {payload_msg.get('message', '无错误信息')}")
                else:
                    print("无法解析 payload_msg")
            else:
                print("无法获取别结果")
                print("完整返回数据:", result)
        else:
            print("返回数据格式错误")
            print("返回数据:", result)
            
    except Exception as e:
        print(f"处理音频时发生错误: {str(e)}")
        print("错误类型:", type(e))
        import traceback
        print("错误堆栈:", traceback.format_exc())
    finally:
        # 清理临时音频文件
        try:
            if os.path.exists(audio_file):
                os.remove(audio_file)
                print("临时音频文件已删除")
        except Exception as e:
            print(f"删除时文件时发生错误: {e}")

# 初始化 Audio 和 GUI
audio = Audio()
gui = GUI()

# 全局变量
is_recording = False
audio_file = "/tmp/recording.wav"
recording_start_time = 0
elapsed_time = 0
time_text = None
result_text = None
running = True

def print_status(message):
    print(f"[状态] {message}")

def update_gui():
    global gui, time_text, result_text
    gui.clear()
    
    # 添加标题
    gui.draw_text(x=120, y=20, text="语音识别系统", origin='center')
    
    # 添加时间显示
    time_text = gui.draw_text(x=120, y=60, text="等待录音...", origin='center')
    
    # 根据录音状态显示不同的按钮
    if is_recording:
        gui.add_button(x=120, y=120, w=160, h=60, text="正在录音", origin='center', 
                      onclick=None, name="start_button", state="disabled")
        gui.add_button(x=120, y=200, w=160, h=60, text="停止录音", origin='center', 
                      onclick=stop_recording, name="stop_button")
    else:
        gui.add_button(x=120, y=120, w=160, h=60, text="开始录音", origin='center', 
                      onclick=start_recording, name="start_button")
        gui.add_button(x=120, y=200, w=160, h=60, text="停止录音", origin='center', 
                      onclick=None, name="stop_button", state="disabled")
    
    # 添加退出按钮
    gui.add_button(x=120, y=280, w=160, h=60, text="退出程序", origin='center', 
                  onclick=exit_program, name="exit_button")
    
    # 添加结果显示区域
    result_text = gui.draw_text(x=120, y=360, text="", origin='center')

def update_time_text():
    global time_text
    if time_text and is_recording:
        time_text.text = f"录音时长: {elapsed_time} 秒"
    elif time_text:
        time_text.text = "等待录音..."

def update_result_text(text):
    global result_text
    if result_text:
        result_text.text = text

def start_recording():
    global is_recording, recording_start_time, elapsed_time
    if not is_recording:
        print_status("开始录音")
        try:
            audio.start_record(audio_file)
            is_recording = True
            recording_start_time = time.time()
            elapsed_time = 0
            update_gui()
            update_result_text("录音中...")
        except Exception as e:
            print_status(f"开始录音时发生错误: {e}")
            update_result_text(f"错误: {str(e)}")

def stop_recording():
    global is_recording, elapsed_time
    if is_recording:
        print_status("停止录音")
        try:
            audio.stop_record()
            is_recording = False
            update_gui()
            update_result_text("正在识别...")
            threading.Thread(target=process_audio, daemon=True).start()
        except Exception as e:
            print_status(f"停止录音时发生错误: {e}")
            update_result_text(f"错误: {str(e)}")

def process_audio():
    print_status(f"开始处理音频文件: {audio_file}")
    
    try:
        result = execute_one(
            {
                'id': 1,
                'path': audio_file
            },
            cluster=cluster,
            appid=appid,
            token=token,
            format="wav",
        )
        
        if isinstance(result, dict):
            if 'result' in result and isinstance(result['result'], dict):
                payload_msg = result['result'].get('payload_msg', {})
                if isinstance(payload_msg, dict):
                    if payload_msg.get('code') == 1000:
                        results = payload_msg.get('result', [])
                        if results and isinstance(results, list):
                            texts = [item.get('text', '') for item in results if isinstance(item, dict)]
                            full_text = ''.join(texts)
                            print_status(f"识别结果: {full_text}")
                            update_result_text(f"识别结果: {full_text}")
                    else:
                        error_msg = f"识别失败 (错误码: {payload_msg.get('code')})"
                        print_status(error_msg)
                        update_result_text(error_msg)
                else:
                    update_result_text("无法解析识别结果")
            else:
                update_result_text("无法获取识别结果")
        else:
            update_result_text("返回数据格式错误")
            
    except Exception as e:
        print_status(f"处理音频时发生错误: {e}")
        update_result_text(f"错误: {str(e)}")
    finally:
        try:
            if os.path.exists(audio_file):
                os.remove(audio_file)
                print_status("临时音频文件已删除")
        except Exception as e:
            print_status(f"删除临时文件时发生错误: {e}")

def exit_program():
    global running
    running = False
    print_status("程序正在退出...")
    gui.clear()
    exit()

def main():
    global elapsed_time, running
    
    update_gui()
    print_status("程序已启动，等待操作...")

    # 主循环
    while running:
        try:
            if is_recording:
                current_time = time.time()
                new_elapsed_time = int(current_time - recording_start_time)
                if new_elapsed_time != elapsed_time:
                    elapsed_time = new_elapsed_time
                    update_time_text()
            time.sleep(0.1)
        except Exception as e:
            print_status(f"主循环发生错误: {e}")
            time.sleep(1)

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print_status("程序被用户中断")
    except Exception as e:
        print_status(f"程序发生错误: {e}")
    finally:
        print_status("程序结束")



[状态] 程序已启动，等待操作...
[状态] 开始录音
[状态] 停止录音
[状态] 开始处理音频文件: /tmp/recording.wav
Initializing AsrWsClient with audio_path: /tmp/recording.wav
Executing with audio_path: /tmp/recording.wav
[状态] 识别结果: 蝶康桥。 徐志摩。 轻轻的。 我走了。 正如我轻轻的来。 我轻轻的招手。 作别西天的云彩。 那河畔。 的，金柳岸的。 是夕阳中的。
[状态] 临时音频文件已删除
[状态] 程序被用户中断
[状态] 程序结束


In [None]:
使用的是流式录音API，接下来实现流式实时显示。

In [None]:
"""
requires Python 3.6 or later

pip install asyncio
pip install websockets
"""

import asyncio
import base64
import gzip
import hmac
import json
import logging
import os
import uuid
import wave
from enum import Enum
from hashlib import sha256
from io import BytesIO
from typing import List
from urllib.parse import urlparse
import time
import websockets
from unihiker import Audio, GUI
import nest_asyncio
nest_asyncio.apply()
import threading
import queue
import io
import tempfile

# 火山平台API配置
appid = "4166554764"    # 项目的 appid
token = "ggmUTHHMXio-nJlKMkRvqEgkcWyfDK0K"    # 项目的 token
cluster = "volcengine_streaming_common"  # 请求的集群

# 音频采集设置
RECORD_DURATION = 6  # 录音时长(秒)
TEMP_AUDIO_FILE = "temp_recording.wav"  # 临时音频文件名

PROTOCOL_VERSION = 0b0001
DEFAULT_HEADER_SIZE = 0b0001

PROTOCOL_VERSION_BITS = 4
HEADER_BITS = 4
MESSAGE_TYPE_BITS = 4
MESSAGE_TYPE_SPECIFIC_FLAGS_BITS = 4
MESSAGE_SERIALIZATION_BITS = 4
MESSAGE_COMPRESSION_BITS = 4
RESERVED_BITS = 8

# Message Type:
CLIENT_FULL_REQUEST = 0b0001
CLIENT_AUDIO_ONLY_REQUEST = 0b0010
SERVER_FULL_RESPONSE = 0b1001
SERVER_ACK = 0b1011
SERVER_ERROR_RESPONSE = 0b1111

# Message Type Specific Flags
NO_SEQUENCE = 0b0000  # no check sequence
POS_SEQUENCE = 0b0001
NEG_SEQUENCE = 0b0010
NEG_SEQUENCE_1 = 0b0011

# Message Serialization
NO_SERIALIZATION = 0b0000
JSON = 0b0001
THRIFT = 0b0011
CUSTOM_TYPE = 0b1111

# Message Compression
NO_COMPRESSION = 0b0000
GZIP = 0b0001
CUSTOM_COMPRESSION = 0b1111


def generate_header(
    version=PROTOCOL_VERSION,
    message_type=CLIENT_FULL_REQUEST,
    message_type_specific_flags=NO_SEQUENCE,
    serial_method=JSON,
    compression_type=GZIP,
    reserved_data=0x00,
    extension_header=bytes()
):
    """
    protocol_version(4 bits), header_size(4 bits),
    message_type(4 bits), message_type_specific_flags(4 bits)
    serialization_method(4 bits) message_compression(4 bits)
    reserved （8bits) 保留字段
    header_extensions 扩展头(大小等于 8 * 4 * (header_size - 1) )
    """
    header = bytearray()
    header_size = int(len(extension_header) / 4) + 1
    header.append((version << 4) | header_size)
    header.append((message_type << 4) | message_type_specific_flags)
    header.append((serial_method << 4) | compression_type)
    header.append(reserved_data)
    header.extend(extension_header)
    return header


def generate_full_default_header():
    return generate_header()


def generate_audio_default_header():
    return generate_header(
        message_type=CLIENT_AUDIO_ONLY_REQUEST
    )


def generate_last_audio_default_header():
    return generate_header(
        message_type=CLIENT_AUDIO_ONLY_REQUEST,
        message_type_specific_flags=NEG_SEQUENCE
    )

def parse_response(res):
    """
    protocol_version(4 bits), header_size(4 bits),
    message_type(4 bits), message_type_specific_flags(4 bits)
    serialization_method(4 bits) message_compression(4 bits)
    reserved （8bits) 保留字段
    header_extensions 扩展头(大小等于 8 * 4 * (header_size - 1) )
    payload 类似与http 请求体
    """
    protocol_version = res[0] >> 4
    header_size = res[0] & 0x0f
    message_type = res[1] >> 4
    message_type_specific_flags = res[1] & 0x0f
    serialization_method = res[2] >> 4
    message_compression = res[2] & 0x0f
    reserved = res[3]
    header_extensions = res[4:header_size * 4]
    payload = res[header_size * 4:]
    result = {}
    payload_msg = None
    payload_size = 0
    if message_type == SERVER_FULL_RESPONSE:
        payload_size = int.from_bytes(payload[:4], "big", signed=True)
        payload_msg = payload[4:]
    elif message_type == SERVER_ACK:
        seq = int.from_bytes(payload[:4], "big", signed=True)
        result['seq'] = seq
        if len(payload) >= 8:
            payload_size = int.from_bytes(payload[4:8], "big", signed=False)
            payload_msg = payload[8:]
    elif message_type == SERVER_ERROR_RESPONSE:
        code = int.from_bytes(payload[:4], "big", signed=False)
        result['code'] = code
        payload_size = int.from_bytes(payload[4:8], "big", signed=False)
        payload_msg = payload[8:]
    if payload_msg is None:
        return result
    if message_compression == GZIP:
        payload_msg = gzip.decompress(payload_msg)
    if serialization_method == JSON:
        payload_msg = json.loads(str(payload_msg, "utf-8"))
    elif serialization_method != NO_SERIALIZATION:
        payload_msg = str(payload_msg, "utf-8")
    result['payload_msg'] = payload_msg
    result['payload_size'] = payload_size
    return result


def read_wav_info(data: bytes = None) -> (int, int, int, int, int):
    with BytesIO(data) as _f:
        wave_fp = wave.open(_f, 'rb')
        nchannels, sampwidth, framerate, nframes = wave_fp.getparams()[:4]
        wave_bytes = wave_fp.readframes(nframes)
    return nchannels, sampwidth, framerate, nframes, len(wave_bytes)

class AudioType(Enum):
    LOCAL = 1  # 使用本地音频文件

class AsrWsClient:
    def __init__(self, audio_path, cluster, **kwargs):
        print(f"Initializing AsrWsClient with audio_path: {audio_path}")
        self.audio_path = audio_path
        self.cluster = cluster
        self.success_code = 1000  # success code, default is 1000
        self.seg_duration = int(kwargs.get("seg_duration", 15000))
        self.nbest = int(kwargs.get("nbest", 1))
        self.appid = kwargs.get("appid", "")
        self.token = kwargs.get("token", "")
        self.ws_url = kwargs.get("ws_url", "wss://openspeech.bytedance.com/api/v2/asr")
        self.uid = kwargs.get("uid", "streaming_asr_demo")
        self.workflow = kwargs.get("workflow", "audio_in,resample,partition,vad,fe,decode,itn,nlu_punctuate")
        self.show_language = kwargs.get("show_language", False)
        self.show_utterances = kwargs.get("show_utterances", False)
        self.result_type = kwargs.get("result_type", "full")
        self.format = kwargs.get("format", "wav")
        self.rate = kwargs.get("sample_rate", 16000)
        self.language = kwargs.get("language", "zh-CN")
        self.bits = kwargs.get("bits", 16)
        self.channel = kwargs.get("channel", 1)
        self.codec = kwargs.get("codec", "raw")
        self.audio_type = kwargs.get("audio_type", AudioType.LOCAL)
        self.secret = kwargs.get("secret", "access_secret")
        self.auth_method = kwargs.get("auth_method", "token")
        self.mp3_seg_size = int(kwargs.get("mp3_seg_size", 10000))
        self.on_message = None  # 添加回调函数属性

    def construct_request(self, reqid):
        req = {
            'app': {
                'appid': self.appid,
                'cluster': self.cluster,
                'token': self.token,
            },
            'user': {
                'uid': self.uid
            },
            'request': {
                'reqid': reqid,
                'nbest': self.nbest,
                'workflow': self.workflow,
                'show_language': self.show_language,
                'show_utterances': self.show_utterances,
                'result_type': self.result_type,
                "sequence": 1
            },
            'audio': {
                'format': self.format,
                'rate': self.rate,
                'language': self.language,
                'bits': self.bits,
                'channel': self.channel,
                'codec': self.codec
            }
        }
        return req

    @staticmethod
    def slice_data(data: bytes, chunk_size: int) -> (list, bool):
        """
        slice data
        :param data: wav data
        :param chunk_size: the segment size in one request
        :return: segment data, last flag
        """
        data_len = len(data)
        offset = 0
        while offset + chunk_size < data_len:
            yield data[offset: offset + chunk_size], False
            offset += chunk_size
        else:
            yield data[offset: data_len], True

    def _real_processor(self, request_params: dict) -> dict:
        pass

    def token_auth(self):
        return {'Authorization': 'Bearer; {}'.format(self.token)}

    def signature_auth(self, data):
        header_dicts = {
            'Custom': 'auth_custom',
        }

        url_parse = urlparse(self.ws_url)
        input_str = 'GET {} HTTP/1.1\n'.format(url_parse.path)
        auth_headers = 'Custom'
        for header in auth_headers.split(','):
            input_str += '{}\n'.format(header_dicts[header])
        input_data = bytearray(input_str, 'utf-8')
        input_data += data
        mac = base64.urlsafe_b64encode(
            hmac.new(self.secret.encode('utf-8'), input_data, digestmod=sha256).digest())
        header_dicts['Authorization'] = 'HMAC256; access_token="{}"; mac="{}"; h="{}"'.format(self.token,
                                                                                              str(mac, 'utf-8'), auth_headers)
        return header_dicts

    async def segment_data_processor(self, wav_data: bytes, segment_size: int):
        reqid = str(uuid.uuid4())
        request_params = self.construct_request(reqid)
        payload_bytes = str.encode(json.dumps(request_params))
        payload_bytes = gzip.compress(payload_bytes)
        full_client_request = bytearray(generate_full_default_header())
        full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big'))
        full_client_request.extend(payload_bytes)
        
        header = None
        if self.auth_method == "token":
            header = self.token_auth()
        elif self.auth_method == "signature":
            header = self.signature_auth(full_client_request)
            
        async with websockets.connect(self.ws_url, extra_headers=header, max_size=1000000000) as ws:
            await ws.send(full_client_request)
            res = await ws.recv()
            result = parse_response(res)
            
            # 处理第一个响应
            if self.on_message:
                self.on_message(result)
                
            if 'payload_msg' in result and result['payload_msg']['code'] != self.success_code:
                return result
                
            for seq, (chunk, last) in enumerate(AsrWsClient.slice_data(wav_data, segment_size), 1):
                payload_bytes = gzip.compress(chunk)
                audio_only_request = bytearray(generate_audio_default_header())
                if last:
                    audio_only_request = bytearray(generate_last_audio_default_header())
                audio_only_request.extend((len(payload_bytes)).to_bytes(4, 'big'))
                audio_only_request.extend(payload_bytes)
                
                await ws.send(audio_only_request)
                res = await ws.recv()
                result = parse_response(res)
                
                # 处理每个分片的响应
                if self.on_message:
                    self.on_message(result)
                    
                if 'payload_msg' in result and result['payload_msg']['code'] != self.success_code:
                    return result
                    
        return result

    async def execute(self):
        print(f"Executing with audio_path: {self.audio_path}")
        with open(self.audio_path, mode="rb") as _f:
            data = _f.read()
        audio_data = bytes(data)
        if self.format == "mp3":
            segment_size = self.mp3_seg_size
            return await self.segment_data_processor(audio_data, segment_size)
        if self.format != "wav":
            raise Exception("format should in wav or mp3")
        nchannels, sampwidth, framerate, nframes, wav_len = read_wav_info(
            audio_data)
        size_per_sec = nchannels * sampwidth * framerate
        segment_size = int(size_per_sec * self.seg_duration / 1000)
        return await self.segment_data_processor(audio_data, segment_size)


def execute_one(audio_item, cluster, **kwargs):
    """

    :param audio_item: {"id": xxx, "path": "xxx"}
    :param cluster:集群名称
    :return:
    """
    assert 'id' in audio_item
    assert 'path' in audio_item
    audio_id = audio_item['id']
    audio_path = audio_item['path']
    audio_type = AudioType.LOCAL
    asr_http_client = AsrWsClient(
        audio_path=audio_path,  # 确保这里使用了正确的 audio_path
        cluster=cluster,
        audio_type=audio_type,
        **kwargs
    )
    result = asyncio.run(asr_http_client.execute())
    return {"id": audio_id, "path": audio_path, "result": result}

def test_one():
    result = execute_one(
        {
            'id': 1,
            'path': audio_path
        },
        cluster=cluster,
        appid=appid,
        token=token,
        format=audio_format,
    )
    print(result)

def record_audio():
    """使用麦克风录制音频"""
    print("开始录音...")
    audio = Audio()
    
    try:
        # 显示录音倒计时
        for i in range(3, 0, -1):
            print(f"录音将在 {i} 秒后开始...")
            time.sleep(1)
        
        # 开始录音
        print(f"正在录音，持续 {RECORD_DURATION} 秒...")
        audio.record(TEMP_AUDIO_FILE, RECORD_DURATION)
        time.sleep(RECORD_DURATION + 0.5)  # 等待录音完成，等待0.5秒确保文件保存
        
        # 检查文件是否成功创建
        if os.path.exists(TEMP_AUDIO_FILE):
            print(f"录音完成，文件已保存为: {TEMP_AUDIO_FILE}")
            return TEMP_AUDIO_FILE
        else:
            print("录音文件创建失败")
            return None
            
    except Exception as e:
        print(f"录音过程中发生错误: {e}")
        return None

async def process_audio(audio_file):
    """处理录制的音频文件"""
    print(f"开始处理音频文件: {audio_file}")
    
    try:
        result = execute_one(
            {
                'id': 1,
                'path': audio_file
            },
            cluster=cluster,
            appid=appid,
            token=token,
            format="wav",
        )
        
        print("Raw result:", result)
        
        if isinstance(result, dict):
            if 'result' in result and isinstance(result['result'], dict):
                payload_msg = result['result'].get('payload_msg', {})
                if isinstance(payload_msg, dict):
                    if payload_msg.get('code') == 1000:
                        # 处理结果列表
                        results = payload_msg.get('result', [])
                        if results and isinstance(results, list):
                            # 合并所有识别结果
                            texts = [item.get('text', '') for item in results if isinstance(item, dict)]
                            full_text = ''.join(texts)
                            print("\n识别结果:")
                            print(full_text)
                            
                            # 如果需要，也可以显示每段的置信度
                            for i, item in enumerate(results, 1):
                                if isinstance(item, dict):
                                    confidence = item.get('confidence', 0)
                                    print(f"第{i}段置信度: {confidence}")
                        else:
                            print("未获取到识别文本")
                    else:
                        print(f"\n识别失败，错误码: {payload_msg.get('code')}")
                        print(f"错误信息: {payload_msg.get('message', '无错误信息')}")
                else:
                    print("无法解析 payload_msg")
            else:
                print("无获取别结果")
                print("完整返回数据:", result)
        else:
            print("返回数据格式错误")
            print("返回数据:", result)
            
    except Exception as e:
        print(f"处理音频时发生错误: {str(e)}")
        print("错误类型:", type(e))
        import traceback
        print("错误堆栈:", traceback.format_exc())
    finally:
        # 清理临时音频文
        try:
            if os.path.exists(audio_file):
                os.remove(audio_file)
                print("临时音频文件已删除")
        except Exception as e:
            print(f"删除时文件时发生错误: {e}")

# 全局变量
gui = GUI()
audio = Audio()
is_recording = False
recording_start_time = 0
elapsed_time = 0
time_text = None
result_text = None

# 临时文件相关
TEMP_DIR = tempfile.gettempdir()
current_audio_file = None
is_processing = False
recognition_task = None

# 在全局作用域定义 run_recognition 函数
def run_recognition():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(process_audio_stream())
    loop.close()

def start_recording():
    global is_recording, recording_start_time, elapsed_time, is_processing
    global current_audio_file, recognition_task
    
    if not is_recording:
        print_status("开始录音")
        try:
            # 创建临时文件
            current_audio_file = os.path.join(TEMP_DIR, f"recording_{int(time.time())}.wav")
            
            # 启动录音
            audio.start_record(current_audio_file)
            is_recording = True
            is_processing = True
            recording_start_time = time.time()
            elapsed_time = 0
            update_gui()
            
            # 启动语音识别线程
            recognition_task = threading.Thread(target=run_recognition)
            recognition_task.daemon = True
            recognition_task.start()
            
        except Exception as e:
            print_status(f"开始录音时发生错误: {e}")
            is_recording = False
            is_processing = False

def stop_recording():
    global is_recording, is_processing, current_audio_file
    if is_recording:
        print_status("停止录音")
        try:
            is_recording = False
            is_processing = False
            audio.stop_record()
            
            # 等待识别任务结束
            if recognition_task and recognition_task.is_alive():
                recognition_task.join(timeout=2)
            
            # 清理临时文件
            if current_audio_file and os.path.exists(current_audio_file):
                os.remove(current_audio_file)
                current_audio_file = None
            
            update_gui()
        except Exception as e:
            print_status(f"停止录音时发生错误: {e}")

def update_result_text(text):
    global result_text
    if result_text:
        try:
            # 将长文本分成多行显示
            max_chars_per_line = 20  # 每行最大字符数
            display_lines = []
            
            # 分行处理文本
            current_line = ""
            for char in text:
                current_line += char
                if len(current_line) >= max_chars_per_line:
                    display_lines.append(current_line)
                    current_line = ""
            if current_line:
                display_lines.append(current_line)
            
            # 更新显示
            result_text.text = "\n".join(display_lines)
            # 强制更新 GUI
            gui.update()
        except Exception as e:
            print_status(f"更新文本显示时发生错误: {e}")

def update_gui():
    global gui, time_text, result_text
    try:
        gui.clear()
        
        # 添加标题
        gui.draw_text(x=120, y=20, text="语音识别系统", origin='center')
        
        # 添加时间显示
        time_text = gui.draw_text(x=120, y=50, text="等待录音...", origin='center')
        
        # 按钮布局
        if is_recording:
            gui.add_button(x=120, y=100, w=160, h=50, text="正在录音", origin='center', 
                          onclick=None, name="start_button", state="disabled")
            gui.add_button(x=120, y=160, w=160, h=50, text="停止录音", origin='center', 
                          onclick=stop_recording, name="stop_button")
        else:
            gui.add_button(x=120, y=100, w=160, h=50, text="开始录音", origin='center', 
                          onclick=start_recording, name="start_button")
            gui.add_button(x=120, y=160, w=160, h=50, text="停止录音", origin='center', 
                          onclick=None, name="stop_button", state="disabled")
        
        # 退出按钮
        gui.add_button(x=120, y=220, w=160, h=50, text="退出程序", origin='center', 
                      onclick=exit_program, name="exit_button")
        
        # 结果显示区域
        result_text = gui.draw_text(x=120, y=300, text="", origin='center')
    except Exception as e:
        print_status(f"更新GUI时发生错误: {e}")

def print_status(msg):
    print(f"[状态] {msg}")

def exit_program():
    global is_recording, is_processing
    is_recording = False
    is_processing = False
    if recognition_task and recognition_task.is_alive():
        recognition_task.join(timeout=2)
    gui.clear()
    exit()

# 初始化 GUI
update_gui()
print_status("程序已启动，等待操作...")

# 启动主循环
while True:
    if is_recording:
        elapsed_time = int(time.time() - recording_start_time)
        minutes = elapsed_time // 60
        seconds = elapsed_time % 60
        if time_text:
            time_text.text = f"录音时间: {minutes:02d}:{seconds:02d}"
    time.sleep(0.1)

async def process_audio_stream():
    global current_audio_file, is_processing
    print_status("开始语音识别流程")
    
    try:
        # 构建请求参数
        reqid = str(uuid.uuid4())
        request_params = {
            "app": {
                "appid": appid,
                "token": token,
                "cluster": cluster
            },
            "user": {
                "uid": "streaming_asr_demo"
            },
            "audio": {
                "format": "wav",
                "channel": 1,
                "rate": 16000,
                "bits": 16
            },
            "request": {
                "reqid": reqid,
                "sequence": 1,
                "workflow": "audio_in,resample,partition,vad,fe,decode,itn,nlu_punctuate",
                "show_utterances": True,
                "result_type": "single"
            }
        }

        print_status(f"连接WebSocket服务器...")
        async with websockets.connect(
            'wss://openspeech.bytedance.com/api/v2/asr',
            extra_headers={'Authorization': f'Bearer {token}'}
        ) as ws:
            print_status("WebSocket连接成功")
            
            # 读取并发送音频数据
            chunk_size = 3200  # 每次处理100ms的音频数据
            last_size = 0
            
            while is_processing and current_audio_file:
                try:
                    # 检查文件大小是否有变化
                    if os.path.exists(current_audio_file):
                        current_size = os.path.getsize(current_audio_file)
                        if current_size > last_size:
                            print_status(f"读取新音频数据: {current_size - last_size} 字节")
                            with open(current_audio_file, 'rb') as f:
                                f.seek(last_size)
                                audio_data = f.read(chunk_size)
                                
                                if audio_data:
                                    print_status(f"发送音频数据: {len(audio_data)} 字节")
                                    # 压缩音频数据
                                    compressed_chunk = gzip.compress(audio_data)
                                    
                                    # 构建音频请求
                                    audio_request = bytearray([
                                        0x11,  # version 1, header size 1
                                        0x20,  # audio only request, no flags
                                        0x01,  # no serialization, Gzip compression
                                        0x00   # reserved
                                    ])
                                    audio_request.extend((len(compressed_chunk)).to_bytes(4, 'big'))
                                    audio_request.extend(compressed_chunk)
                                    
                                    # 发送音频数据
                                    await ws.send(audio_request)
                                    print_status("等待识别结果...")
                                    
                                    # 接收识别结果
                                    response = await ws.recv()
                                    result = parse_response(response)
                                    print_status(f"收到响应: {result}")
                                    
                                    # 处理识别结果
                                    if result and 'payload_msg' in result:
                                        payload_msg = result['payload_msg']
                                        print_status(f"响应内容: {payload_msg}")
                                        if payload_msg.get('code') == 1000:
                                            results = payload_msg.get('result', [])
                                            if results and isinstance(results, list):
                                                for res in results:
                                                    text = res.get('text', '')
                                                    if text:
                                                        print_status(f"实时识别: {text}")
                                                        update_result_text(f"识别结果:\n{text}")
                                    
                                    last_size = current_size
                    
                    await asyncio.sleep(0.1)
                    
                except Exception as e:
                    print_status(f"处理音频流时发生错误: {e}")
                    break
            
            print_status("发送结束标记")
            # 发送结束标记
            last_request = bytearray([
                0x11,  # version 1, header size 1
                0x22,  # audio only request, end flag
                0x01,  # no serialization, Gzip compression
                0x00   # reserved
            ])
            last_request.extend((0).to_bytes(4, 'big'))
            await ws.send(last_request)
            
    except Exception as e:
        print_status(f"语音识别过程发生错误: {e}")
    finally:
        is_processing = False
        print_status("语音识别流程结束")





GUI is cleared because of reinit
[状态] 程序已启动，等待操作...
[状态] 开始录音


Exception in thread Thread-11:
Traceback (most recent call last):
  File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.7/threading.py", line 865, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_969/2229986343.py", line 486, in run_recognition
    loop.run_until_complete(process_audio_stream())
NameError: name 'process_audio_stream' is not defined

