In [1]:
import os
from datetime import datetime, timedelta
from collections import defaultdict
from statistics import mean, median

In [2]:
log_file_prefix = "emqx"

def get_merge_log_file_name():
    for file_name in os.listdir('.'):
        if file_name.startswith(log_file_prefix):
            return 'new_' + file_name.split('trace_')[1]
    return "merge_log.log"

def read_and_sort_files():
    # 创建一个列表存储所有行
    all_lines = []
    
    # 遍历当前目录下的所有文件
    for file_name in os.listdir('.'):
        # 确保只处理以'emqx'开头的文件
        if file_name.startswith(log_file_prefix):
            with open(file_name, 'r') as file:
                # 读取文件的所有行
                lines = file.readlines()
                all_lines.extend(lines)
    
    # 按照时间戳排序所有行，时间戳在每行的起始位置
    all_lines.sort(key=lambda line: line.split()[0])
    
    return all_lines

def write_sorted_contents(sorted_lines, new_file_name):
    # 写入新文件
    with open(new_file_name, 'w') as new_file:
        new_file.writelines(sorted_lines)
    
    print(f'所有内容已写入到新文件: {new_file_name}')

merge_file_name = get_merge_log_file_name()

# sorted_lines = read_and_sort_files()

# write_sorted_contents(sorted_lines, merge_file_name)

In [3]:
def convert_seconds_to_hms(seconds):
    # 取整
    rounded_seconds = int(round(seconds))

    # 转换为时分秒格式
    hours = rounded_seconds // 3600
    minutes = (rounded_seconds % 3600) // 60
    seconds = rounded_seconds % 60

    # 构建格式化输出
    if hours > 0:
        formatted_time = f"{hours}小时 {minutes}分钟 {seconds}秒"
    elif minutes > 0:
        formatted_time = f"{minutes}分钟 {seconds}秒"
    else:
        formatted_time = f"{seconds}秒"

    return formatted_time

def max_messages_in_15s(logs):
    max_sent = 0
    max_received = 0

    sent_count = 0
    received_count = 0
    start = 0

    for end in range(len(logs)):
        end_time = logs[end]['timestamp']

        if logs[end]['message_type'] == 'mqtt_packet_sent':
            sent_count += 1
        elif logs[end]['message_type'] == 'mqtt_packet_received':
            received_count += 1

        while end_time - logs[start]['timestamp'] > timedelta(seconds=15):
            if logs[start]['message_type'] == 'mqtt_packet_sent':
                sent_count -= 1
            elif logs[start]['message_type'] == 'mqtt_packet_received':
                received_count -= 1
            start += 1

        max_sent = max(max_sent, sent_count)
        max_received = max(max_received, received_count)

    return max_sent, max_received

def parse_timestamp(timestamp_str):
    format_str = "%Y-%m-%dT%H:%M:%S.%f%z"
    return datetime.strptime(timestamp_str, format_str)

def parse_log_file(filename):
    with open(filename, 'r') as file:
        lines = file.readlines()

    logs = []
    for line in lines:
        if '[MQTT]' in line or '[SOCKET]' in line:
            parts = line.split()
            timestamp = parse_timestamp(parts[0])
            message_type = parts[parts.index('msg:') + 1].rstrip(',')
            packet_info = None
            packet_info = None
            disconn_reason = None
            if 'packet:' in parts:
                packet_info = line.split("packet: ")[1].rstrip()
            if 'reason:' in parts:
                disconn_reason = line.split("reason: ")[1].rstrip()
            logs.append({'timestamp': timestamp, 'message_type': message_type, 'packet_info': packet_info, 'disconn_reason': disconn_reason})
    return logs

logs = parse_log_file(merge_file_name)

# 统计在线时长信息
online_periods = []
start_time = None
end_time = None
for log in logs:
    if log['message_type'] in ['mqtt_packet_received', 'mqtt_packet_sent'] and not start_time:
        start_time = log['timestamp']
    elif log['message_type'] == 'emqx_connection_terminated':
        if start_time:
            end_time = log['timestamp']
            online_periods.append((start_time, end_time))
            start_time = None
if start_time:
    end_time = logs[-1]['timestamp']
    online_periods.append((start_time, end_time))

total_online_time = sum((end - start).total_seconds() for start, end in online_periods)

logs_time = (logs[-1]['timestamp'] - logs[0]['timestamp']).total_seconds()

# 统计发送接收消息数
message_counts_per_second = defaultdict(lambda: {'sent': 0, 'received': 0})
for log in logs:
    second = log['timestamp'].replace(microsecond=0)
    if log['message_type'] == 'mqtt_packet_sent':
        message_counts_per_second[second]['sent'] += 1
    elif log['message_type'] == 'mqtt_packet_received':
        message_counts_per_second[second]['received'] += 1

sum_sent_msg = sum(count['sent'] for count in message_counts_per_second.values())
sum_received_msg = sum(count['received'] for count in message_counts_per_second.values())
max_sent_per_second = max(count['sent'] for count in message_counts_per_second.values())
max_received_per_second = max(count['received'] for count in message_counts_per_second.values())
average_sent_per_second = sum_sent_msg / total_online_time
average_received_per_second = sum_received_msg / total_online_time

# 15s内最大接收、发送消息数
max_sent_in_15s, max_received_in_15s = max_messages_in_15s(logs)


print("-" * 20 + " 时长统计 " + "-" * 20)
print(f"日志时长: {convert_seconds_to_hms(logs_time)}")
print(f"设备总在线时间: {convert_seconds_to_hms(total_online_time)}, 日志时长内在线占比 {total_online_time*100/logs_time:.2f} %")
print()
print("-" * 20 + " 消息数统计 " + "-" * 20)
print(f"【发送】MQTT消息总数: {sum_sent_msg}  【接收】MQTT消息总数: {sum_received_msg}")
print(f"最大每秒【发送】MQTT消息数: {max_sent_per_second}")
print(f"最大每秒【接收】MQTT消息数: {max_received_per_second}")
print(f"最大15s【发送】消息数: {max_sent_in_15s}")
print(f"最大15s【接收】消息数: {max_received_in_15s}")
print(f"在线期间平均每秒【发送】MQTT消息数: {average_sent_per_second:.2f}")
print(f"在线期间平均每秒【接收】MQTT消息数: {average_received_per_second:.2f}")

-------------------- 时长统计 --------------------
日志时长: 57分钟 3秒
设备总在线时间: 56分钟 14秒, 日志时长内在线占比 98.56 %

-------------------- 消息数统计 --------------------
【发送】MQTT消息总数: 102  【接收】MQTT消息总数: 312
最大每秒【发送】MQTT消息数: 7
最大每秒【接收】MQTT消息数: 7
最大15s【发送】消息数: 9
最大15s【接收】消息数: 27
在线期间平均每秒【发送】MQTT消息数: 0.03
在线期间平均每秒【接收】MQTT消息数: 0.09


In [4]:
def count_emqx_disconnections(logs):
    disconn_count = 0
    disconn_reasons = defaultdict(int)
    
    for log in logs:
        if log['message_type'] == 'emqx_connection_terminated':
            disconn_count += 1
            disconn_reasons[log['disconn_reason']] += 1
            
    return disconn_count, disconn_reasons

def calculate_intervals(logs, start_type, end_type, start_identifier, end_identifier, key_function):
    start_times = {}
    intervals = []
    session_id_counter = 0
    
    for log in logs:
        if log['message_type'] == start_type and log['packet_info'] and log['packet_info'].startswith(start_identifier):
            key = key_function(log['packet_info'], session_id_counter)
            start_times[key] = log['timestamp']
            session_id_counter += 1
        elif log['message_type'] == end_type and log['packet_info'] and log['packet_info'].startswith(end_identifier):
            for key in list(start_times):
                if start_identifier in key:
                    interval = (log['timestamp'] - start_times[key]).total_seconds() * 1000
                    intervals.append(interval)
                    del start_times[key]
                    break
                
    return intervals

def get_connect_key(packet_info, session_id):
    client_id = packet_info.split('ClientId=')[1].split(',')[0]
    return f"CONNECT_{client_id}_{session_id}"

def get_subscribe_key(packet_info, _):
    packet_id = packet_info.split('PacketId=')[1].split(' ')[0].split(',')[0]
    return f"SUBSCRIBE_{packet_id}"

def print_statistics(name, intervals):
    if intervals:
        print(f"{name}时间间隔（毫秒）:")
        print(f"  最大值: {max(intervals):.2f}")
        print(f"  最小值: {min(intervals):.2f}")
        print(f"  均值: {mean(intervals):.2f}")
        print(f"  中位数: {median(intervals):.2f}")
    else:
        print(f"{name}时间间隔（毫秒）: 无数据")

# mqtt连接、订阅统计
emqx_disconn_count, disconn_reasons = count_emqx_disconnections(logs)
mqtt_connect_intervals = calculate_intervals(logs, 'mqtt_packet_received', 'mqtt_packet_sent', 'CONNECT', 'CONNACK', get_connect_key)
mqtt_subscribe_intervals = calculate_intervals(logs, 'mqtt_packet_received', 'mqtt_packet_sent', 'SUBSCRIBE', 'SUBACK', get_subscribe_key)

print()
print("-" * 20 + " MQTT连接统计 " + "-" * 20)
print(f"MQTT新建连接总次数: {len(mqtt_connect_intervals)}")
print_statistics("CONNECT到CONNACK", mqtt_connect_intervals)
print()
print(f"MQTT断连总次数: {emqx_disconn_count}")
print("断连原因及次数:")
for reason, count in disconn_reasons.items():
    print(f"{reason}: {count}")
print()
print("-" * 20 + " MQTT订阅统计 " + "-" * 20)
print(f"MQTT订阅主题总次数: {len(mqtt_subscribe_intervals)}")
print_statistics("SUBSCRIBE到SUBACK", mqtt_subscribe_intervals)


-------------------- MQTT连接统计 --------------------
MQTT新建连接总次数: 7
CONNECT到CONNACK时间间隔（毫秒）:
  最大值: 2.87
  最小值: 0.34
  均值: 1.27
  中位数: 1.10

MQTT断连总次数: 6
断连原因及次数:
{shutdown,normal}: 5
{shutdown,tcp_closed}: 1

-------------------- MQTT订阅统计 --------------------
MQTT订阅主题总次数: 44
SUBSCRIBE到SUBACK时间间隔（毫秒）:
  最大值: 1.79
  最小值: 0.31
  均值: 0.75
  中位数: 0.75


In [5]:
def parse_publish_log(logs):
    total_received = 0
    total_sent = 0
    per_second_counts_received = defaultdict(int)
    per_second_counts_sent = defaultdict(int)
    topic_counts_received = defaultdict(int)
    topic_counts_sent = defaultdict(int)

    for log in logs:
        if 'packet_info' in log and log['packet_info'] is not None and 'PUBLISH' in log['packet_info']:
            timestamp = log['timestamp']
            second = timestamp.replace(microsecond=0)
            topic = extract_topic(log['packet_info'])

            if log['message_type'] == 'mqtt_packet_received':
                total_received += 1
                per_second_counts_received[second] += 1
                topic_counts_received[topic] += 1
            elif log['message_type'] == 'mqtt_packet_sent':
                total_sent += 1
                per_second_counts_sent[second] += 1
                topic_counts_sent[topic] += 1

    max_per_second_received = max(per_second_counts_received.values(), default=0)
    max_per_second_sent = max(per_second_counts_sent.values(), default=0)

    return {
        "total_received": total_received,
        "total_sent": total_sent,
        "max_per_second_received": max_per_second_received,
        "max_per_second_sent": max_per_second_sent,
        "topic_counts_received": topic_counts_received,
        "topic_counts_sent": topic_counts_sent
    }

def extract_topic(packet_info):
    topic_start = packet_info.find('Topic=') + len('Topic=')
    topic_end = packet_info.find(',', topic_start)
    return packet_info[topic_start:topic_end]

stats = parse_publish_log(logs)

print("-" * 20 + " MQTT PUBLISH 消息统计 " + "-" * 20)
print(f"总接收消息数: {stats['total_received']}")
print(f"总发送消息数: {stats['total_sent']}")
print(f"每秒最大发送消息数: {stats['max_per_second_sent']}")
print(f"每秒最大接收消息数: {stats['max_per_second_received']}")

print("\n每个Topic的接收消息数:")
for topic, count in stats['topic_counts_received'].items():
    print(f"{topic}: {count}")

print("\n每个Topic的发送消息数:")
for topic, count in stats['topic_counts_sent'].items():
    print(f"{topic}: {count}")

-------------------- MQTT PUBLISH 消息统计 --------------------
总接收消息数: 206
总发送消息数: 0
每秒最大发送消息数: 0
每秒最大接收消息数: 6

每个Topic的接收消息数:
shellies/shelly1pm-485519CBA83A/relay/0/command: 11
/sys/device/87/test1234/post: 195

每个Topic的发送消息数:


In [6]:
from pyecharts.charts import Line, Bar
from pyecharts import options as opts

In [None]:
# 统计每秒的发送和接收消息数量
message_counts_per_second = defaultdict(lambda: {'sent': 0, 'received': 0})

previous_received_time = None
received_intervals = []

for log in logs:
    timestamp = log['timestamp'].replace(microsecond=0)
    message_type = log['message_type']
    
    if message_type == 'mqtt_packet_received':
        message_counts_per_second[timestamp]['received'] += 1
        
        if previous_received_time:
            interval = (timestamp - previous_received_time).total_seconds()
            received_intervals.append(interval)
        previous_received_time = timestamp
    
    elif message_type == 'mqtt_packet_sent':
        message_counts_per_second[timestamp]['sent'] += 1

# 提取数据用于绘图
x_time_data = sorted(message_counts_per_second.keys())
y_sent_data = [message_counts_per_second[time]['sent'] for time in x_time_data]
y_received_data = [message_counts_per_second[time]['received'] for time in x_time_data]

# 创建折线图
line = (
    Line(init_opts=opts.InitOpts(width='1000px', height='600px'))
    .add_xaxis(x_time_data)
    .add_yaxis('每秒发送的消息数', y_sent_data, markpoint_opts=opts.MarkPointOpts(
        data=[opts.MarkPointItem(type_="max", name="最大值")]
    ))
    .add_yaxis('每秒接收的消息数', y_received_data, markpoint_opts=opts.MarkPointOpts(
        data=[opts.MarkPointItem(type_="max", name="最大值")]
    ))
    .set_global_opts(
        title_opts=opts.TitleOpts(title="每秒消息发送与接收统计"),
        tooltip_opts=opts.TooltipOpts(trigger="axis"),
        toolbox_opts=opts.ToolboxOpts(is_show=True),
        xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
        yaxis_opts=opts.AxisOpts(name='消息数')
    )
)
# 渲染图表
# line.render_notebook()
line.render("message_distribution_line.html")

In [None]:
# 统计接收消息间隔的分布
interval_distribution = defaultdict(int)
for interval in received_intervals:
    interval_distribution[interval] += 1
# 创建接收消息间隔分布柱状图
bar = (
    Bar(init_opts=opts.InitOpts(width='1000px', height='600px'))
    .add_xaxis([str(int(interval)) for interval in sorted(interval_distribution.keys())])
    .add_yaxis('消息数量', [interval_distribution[interval] for interval in sorted(interval_distribution.keys())])
    .set_global_opts(
        title_opts=opts.TitleOpts(title="接收消息间隔分布"),
        xaxis_opts=opts.AxisOpts(name='时间间隔 (秒)'),
        yaxis_opts=opts.AxisOpts(name='消息数量')
    )
)
# bar.render_notebook()
bar.render("message_interval_distribution.html")