Skip to content

Audio packet transmission behavior changed after upgrading from websockets 13.0 to 15.0.1 #1679

@Pliosauroidea

Description

@Pliosauroidea

Description

Summary

After upgrading the websockets library from version 13.0 to 15.0.1, I'm experiencing different behavior when sending and receiving audio packets over WebSocket connections. The audio transmission that worked correctly in 13.0 now behaves unexpectedly in 15.0.1.

Image Image

Environment

websockets version (working): 13.0
websockets version (issue): 15.0.1
Python version: python3.12.12
Operating System: Linux/Ubuntu 22.04
Use case: Real-time audio streaming over WebSocket

Expected Behavior

In websockets 13.0, audio packets were transmitted and received reliably with consistent timing and no data loss.

Actual Behavior

After upgrading to websockets 15.0.1, the audio packet transmission exhibits different behavior:

  1. some packages are missed
  2. unexpected packages show up with random content

clue that might help:

  1. i'm sending those audio chunks quite fast
  2. and have no additional buffer on server side (i'm assuming there is an intern buffer)
  3. i'm using asyncio in other places on client side,can this cause what happened?
  4. each chunk is about 6k bytes,and i'm sending roughly 10 of them per second

code related:

client side

  # 启动音频节点
            self.audio_ws = await websockets.connect(
                f"ws://{audio_server['host']}:{audio_server['port']}",
                subprotocols=["binary"],
            )
            self.audio_input_manager = MCPAudioInputManager(
                AudioConfig(**config.input_audio_config), audio_ws=self.audio_ws
            )
            self.audio_output_manager = MCPAudioOutputManager(
                AudioConfig(**config.output_audio_config), audio_ws=self.audio_ws
            )
            await self.audio_output_manager.initialize()

            # 启动播放线程
            self.is_recording = True
            self.is_playing = True

            audio_player_thread = threading.Thread(target=self.sync_audio_player_thread)
            audio_player_thread.daemon = True
            audio_player_thread.start()

...

    async def _audio_player_thread(self):
        """音频播放线程"""
        while self.is_playing:
            try:
                # 从队列获取音频数据
                if self.is_analyzing_intent:  # 要先进行意图判定,才确定播放什么音频
                    await asyncio.sleep(0.1)
                    continue
                audio_data = self.audio_queue.get(timeout=0.05)
                if audio_data is not None:
                    # 第一次播放音频时回收计时
                    if self.is_timing:
                        latency = self.latency_timer.stop_timing()
                        self.is_timing = False

                        # 检查时延是否合理(大于0且小于10秒)
                        if 0 < latency < 10.0:
                            report = self.latency_timer.get_latency_report()
                            log_info(f"时延报告: {report}")
                        else:
                            log_warning(f"不合理的时延: {latency:.3f}s,已排除")

                    await self.audio_output_manager.write_audio(audio_data)
            except queue.Empty:
                # 队列为空时等待一小段时间
                await asyncio.sleep(0.1)
            except Exception as e:
                log_error(f"音频播放错误: {e}")
                await asyncio.sleep(0.1)

    def sync_audio_player_thread(self):
        """同步调用音频播放线程"""
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(self._audio_player_thread())

...
(class OutputManager:)
 async def write_audio(self, audio_data: bytes):
        audio_data = self._amplify_audio(audio_data)
        resampled_audio_int16 = self._resample_audio(audio_data)
        try:

            header = self.session_id.to_bytes(4, byteorder="big")
            audio_message = header + resampled_audio_int16.tobytes()
            # audio_message = header  # TODO: 测试

            await self.audio_ws.send(audio_message)

            log_debug(
                f"发送音频数据, session_id: {self.session_id}, 数据长度: {len(resampled_audio_int16)}"
            )
            self.session_id += 1

        except Exception as e:
            log_error(f"RosPlayer: 发布音频数据时出错: {e}")
            import traceback

            traceback.print_exc()

server side

  async def serve_audio_ws(self):
        """
        发布音频websocket端口
        """
        audio_ws_host = self.config["audio_ws"]["host"]
        audio_ws_port = self.config["audio_ws"]["port"]

        rospy.loginfo(
            f"Starting Audio WebSocket server on ws://{audio_ws_host}:{audio_ws_port}"
        )

        await websockets.serve(
            self.handle_audio_messages,
            audio_ws_host,
            audio_ws_port,
            max_size=2**22,
            max_queue=500,
            write_limit=2**25,
        )
        await asyncio.Future()

    async def handle_audio_messages(self, websocket):
        """
        处理音频websocket连接
        """
        self.audio_ws = websocket
        rospy.loginfo(
            f"✓ Audio WebSocket connection established from {websocket.remote_address}"
        )

        try:
            while True:
                # 接收音频websocket数据
                async for data in websocket:
                    # data = await self.audio_ws.recv()
                    header, message = data[:4], data[4:]
                    header = int.from_bytes(header, byteorder="big")
                    print(f"header: {header}")
                # message = np.frombuffer(message, dtype=np.int16).tolist()
                # print(f"header: {header}, message len: {len(message)}")
                # if not self.audio_out:
                #     rospy.logwarn("Audio data topic not connected")
                #     continue
                # # 将字节数据转换为ROS消息
                # ros_msg = Int16MultiArray()
                # ros_msg.data = message

                # self.audio_out.publish(ros_msg)

        except Exception as e:
            rospy.logerr(f"处理音频数据时出错:{traceback.format_exc()}")
        finally:
            # 连接关闭时的处理
            self.audio_ws = None
            rospy.loginfo("✗ Audio WebSocket connection closed")

please guide me and confirm whether this is a bug,or anything i did was wrong

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions