Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 97 additions & 10 deletions src/mcp_server/core/live_streaming/live_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,16 +336,18 @@ def get_play_urls(self, play_domain: str, bucket: str, stream_name: str) -> Dict
"message": "Playback URLs generated successfully"
}

async def query_live_traffic_stats(self, begin: str, end: str) -> Dict[str, Any]:
async def query_live_traffic_stats(self, begin: str, end: str, include_raw_data: bool = False) -> Dict[str, Any]:
"""
Query live streaming traffic statistics

Args:
begin: Start time in format YYYYMMDDHHMMSS (e.g., 20240101000000)
end: End time in format YYYYMMDDHHMMSS (e.g., 20240129105148)
include_raw_data: If True, includes raw JSON data for download (default: False)

Returns:
Dict containing traffic statistics
Dict containing traffic statistics with total traffic (bytes), average bandwidth (bps),
peak bandwidth (bps), and optionally raw data
"""
if not self.live_endpoint:
self.live_endpoint = "mls.cn-east-1.qiniumiku.com"
Expand Down Expand Up @@ -375,14 +377,99 @@ async def query_live_traffic_stats(self, begin: str, end: str) -> Dict[str, Any]

if status == 200:
logger.info("Successfully queried live traffic stats")
return {
"status": "success",
"begin": begin,
"end": end,
"data": text,
"message": "Traffic statistics retrieved successfully",
"status_code": status
}

try:
# Parse JSON response
data = json.loads(text)

# Calculate total traffic and bandwidth metrics
total_traffic_bytes = 0
bandwidth_values = []
data_points = []

# Data format: [{"time":"2025-11-26T00:00:00+08:00","values":{"flow":0}}, ...]
for item in data:
if isinstance(item, dict) and "values" in item and "flow" in item["values"]:
flow_bytes = item["values"]["flow"]
total_traffic_bytes += flow_bytes

# Convert to bandwidth: flow is accumulated over 5 minutes (300 seconds)
# Bandwidth (bps) = bytes / 300 seconds * 8 bits/byte
bandwidth_bps = (flow_bytes / 300) * 8
bandwidth_values.append(bandwidth_bps)

# Store data point with timestamp
data_points.append({
"time": item.get("time", ""),
"traffic_bytes": flow_bytes,
"bandwidth_bps": bandwidth_bps
})

# Calculate average and peak bandwidth
avg_bandwidth_bps = sum(bandwidth_values) / len(bandwidth_values) if bandwidth_values else 0
peak_bandwidth_bps = max(bandwidth_values) if bandwidth_values else 0

# Convert to human-readable units
def format_bytes(bytes_val):
"""Convert bytes to human-readable format"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_val < 1024.0:
return f"{bytes_val:.2f} {unit}"
bytes_val /= 1024.0
return f"{bytes_val:.2f} PB"

def format_bandwidth(bps):
"""Convert bits per second to human-readable format"""
for unit in ['bps', 'Kbps', 'Mbps', 'Gbps', 'Tbps']:
if bps < 1000.0:
return f"{bps:.2f} {unit}"
bps /= 1000.0
return f"{bps:.2f} Pbps"

result = {
"status": "success",
"begin": begin,
"end": end,
"summary": {
"total_traffic_bytes": total_traffic_bytes,
"total_traffic_formatted": format_bytes(total_traffic_bytes),
"data_points_count": len(data_points),
"average_bandwidth_bps": avg_bandwidth_bps,
"average_bandwidth_formatted": format_bandwidth(avg_bandwidth_bps),
"peak_bandwidth_bps": peak_bandwidth_bps,
"peak_bandwidth_formatted": format_bandwidth(peak_bandwidth_bps),
"granularity": "5 minutes"
},
"message": "Traffic statistics calculated successfully",
"status_code": status
}

# Include raw data only if requested
if include_raw_data:
result["raw_data"] = data
result["data_points"] = data_points

return result

except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON response: {e}")
return {
"status": "error",
"begin": begin,
"end": end,
"message": f"Failed to parse traffic stats response: {str(e)}",
"raw_response": text,
"status_code": status
}
except Exception as e:
logger.error(f"Error processing traffic stats: {e}")
return {
"status": "error",
"begin": begin,
"end": end,
"message": f"Error processing traffic stats: {str(e)}",
"status_code": status
}
else:
logger.error(f"Failed to query traffic stats, status: {status}, response: {text}")
return {
Expand Down
7 changes: 6 additions & 1 deletion src/mcp_server/core/live_streaming/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ async def get_play_urls(self, **kwargs) -> list[types.TextContent]:
@tools.tool_meta(
types.Tool(
name="live_streaming_query_live_traffic_stats",
description="Query live streaming traffic statistics for a time range. Returns bandwidth and traffic usage data.",
description="Query live streaming traffic statistics for a time range. Returns total traffic (bytes), average bandwidth (bps), peak bandwidth (bps), and optionally raw data for download.",
inputSchema={
"type": "object",
"properties": {
Expand All @@ -189,6 +189,11 @@ async def get_play_urls(self, **kwargs) -> list[types.TextContent]:
"type": "string",
"description": "End time in format YYYYMMDDHHMMSS (e.g., 20240129105148)",
},
"include_raw_data": {
"type": "boolean",
"description": "If true, includes raw JSON data and detailed data points for download. Default is false.",
"default": False,
},
},
"required": ["begin", "end"],
},
Expand Down
Loading