### Requirements

Uncomment this line to install requirements if necessary:

In [None]:
#!pip install -U python-dotenv matplotlib scipy

### Import and load env

In [None]:
from datetime import datetime, timezone, timedelta
from dotenv import load_dotenv
from pprint import pprint
from scipy import stats
import matplotlib.pyplot as plt
import numpy as np
import os
import sqlite3
import sys

load_dotenv()

VERBOSE = os.environ.get("VERBOSE")
MSG_DB_PATH = os.environ.get("MSG_DB_PATH")
FULL_DB_PATH = os.environ.get("FULL_DB_PATH")
START_TIMESTAMP = os.environ.get("START_TIMESTAMP")
END_TIMESTAMP = os.environ.get("END_TIMESTAMP")
TARGET_ID = os.environ.get("TARGET_ID")

if VERBOSE == "1":
    print("load_dotenv():")
    print(f"  {VERBOSE=}")
    print(f"  {MSG_DB_PATH=}")
    print(f"  {FULL_DB_PATH=}")
    print(f"  {START_TIMESTAMP=}")
    print(f"  {END_TIMESTAMP=}")
    print(f"  {TARGET_ID=}")

### Define helper methods

In [None]:
def connect_db(path):
    try:
        conn = sqlite3.connect(path)
        return conn
    except sqlite3.Error as e:
        print(f"Error connecting db: {e}")
        sys.exit(1)

def execute_query(conn, query, params=(), no_commit=False):
    try:
        cursor = conn.cursor()
        cursor.execute(query, params)
        if not no_commit:
            conn.commit()
        if query.strip().lower().startswith("select"):
            return cursor.fetchall()
        else:
            return None
    except sqlite3.Error as e:
        print(f"Error executing SQL query: {e}")
        sys.exit(1)

def msg_sequence_to_ctime(sequence):
    ts = str(sequence)[:-3]
    dt = datetime.fromtimestamp(int(ts))
    return dt.strftime("%c")

### Create or load MSG db

In [None]:
if not os.path.exists(MSG_DB_PATH):
    full_conn = connect_db(FULL_DB_PATH)

    query = f"""
    SELECT *
    FROM MSG
    WHERE Sequence >= {START_TIMESTAMP + '000'}
      AND Sequence <  {END_TIMESTAMP + '000'}
      AND StrTalker = '{TARGET_ID}';
    """
    msg = execute_query(full_conn, query)
    if not msg:
        print("error: no result from FULL_DB_PATH")
        sys.exit(1)
    print(f"Found {len(msg)} results.")
    full_conn.close()
    
    msg_conn = connect_db(MSG_DB_PATH)

    query = """
    CREATE TABLE MSG(
      localId INT,
      TalkerId INT,
      MsgSvrID INT,
      Type INT,
      SubType INT,
      IsSender INT,
      CreateTime INT,
      Sequence INT,
      StatusEx INT,
      FlagEx INT,
      Status INT,
      MsgServerSeq INT,
      MsgSequence INT,
      StrTalker TEXT,
      StrContent TEXT,
      DisplayContent TEXT,
      Reserved0 INT,
      Reserved1 INT,
      Reserved2 INT,
      Reserved3 INT,
      Reserved4 TEXT,
      Reserved5 TEXT,
      Reserved6 TEXT,
      CompressContent,
      BytesExtra,
      BytesTrans
    )
    """
    execute_query(msg_conn, query)
    query = "INSERT INTO MSG VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
    for i, row in enumerate(msg):
        r = list(row)
        r[-4] = msg_sequence_to_ctime(r[7]) # use Reserved6 to store ctime
        execute_query(msg_conn, query, r, no_commit=True)
        if i % 5000 == 0:
            msg_conn.commit()
    msg_conn.commit()
    print(f"{MSG_DB_PATH} is created.")

else:
    msg_conn = connect_db(MSG_DB_PATH)
    print(f"{MSG_DB_PATH} is found and loaded.")

---

### Report

In [None]:
senders = ["Purple", "Blue"]
senders_cn = ["紫先生", "蓝小姐"]
colors = ["#e9d8ff", "#b3e0fc"] # purple, blue

data = [
    execute_query(msg_conn, "SELECT COUNT(*) FROM MSG WHERE IsSender = 1")[0][0],
    execute_query(msg_conn, "SELECT COUNT(*) FROM MSG WHERE IsSender = 0")[0][0],
]

print(f"今年我们一共发送了 {data[0] + data[1]} 条消息，")
print(f"其中，{senders_cn[0]}发送了 {data[0]} 条，{senders_cn[1]}发送了 {data[1]} 条：")

fig, ax = plt.subplots(figsize=(4, 4))
ax.pie(
    data,
    labels=[f"{senders[0]}\n{data[0]}", f"{senders[1]}\n{data[1]}"],
    autopct="%.2f%%",
    colors=colors,
    wedgeprops=dict(width=0.6, edgecolor='w', linewidth=4),
)
plt.show()

在这么多条消息中里...

In [None]:
types = {
    "Text":    (1, 0, "%"),             # 文本
    "Pic":     (3, 0, "%"),             # 图片
    "Video":   (43, 0, "%"),            # 视频
    "Voice":   (34, 0, "%"),            # 语音
    "Emoji_1": (47, 0, "%"),            # 商店表情
    "Emoji_2": (49, 8, "%"),            # 自定义表情
    "Call":    (50, 0, "%"),            # 通话
    "Trans_1": (49, 2000, "%"),         # 转账
    "Trans_2": (10000, 0, "发出红包%"), # 红包
    "Trans_3": (11000, 0, "%"),         # 红包
    "Tap":     (10000, 4, "%"),         # 拍一拍
}
data = {}
for t, p in types.items():
    data[t] = np.array([
        execute_query(msg_conn, "SELECT COUNT(*) FROM MSG WHERE IsSender = 1 AND Type = ? AND SubType = ? AND StrContent LIKE ?", p)[0][0],
        execute_query(msg_conn, "SELECT COUNT(*) FROM MSG WHERE IsSender = 0 AND Type = ? AND SubType = ? AND StrContent LIKE ?", p)[0][0],
    ])
data["Emoji"] = data["Emoji_1"] + data["Emoji_2"]
data["Trans"] = data["Trans_1"] + data["Trans_2"] + data["Trans_3"]
if VERBOSE == "1":
    pprint(data)

show = ["Text", "Pic", "Video", "Voice", "Emoji", "Call", "Tap", "Trans"]
data_show = np.array([data[i] for i in show])
if VERBOSE == "1":
    pprint(data_show)

def print_type_compare(key, desc):
    d = data[key]
    morer = 0 if d[0] > d[1] else 1
    lesser = 1 - morer
    pct = d[morer] * 100.0 / d[lesser] - 100
    print(f"{senders_cn[morer]}发出了更多的{desc}，达到了 {d[morer]} 条，比{senders_cn[lesser]}多出了 {pct:.2f}% ！")

print_type_compare("Text", "文字消息")
print_type_compare("Pic", "图片")
print_type_compare("Video", "视频")
print_type_compare("Voice", "语音")
print_type_compare("Emoji", "表情包")
print_type_compare("Call", "通话")
print_type_compare("Tap", "拍一拍")
print_type_compare("Trans", "红包和转账")

x = np.arange(2)
width = 0.4
fig, ax = plt.subplots(figsize=(8, 4))
for i, d in enumerate(data_show):
    rects = ax.bar(x * width + i, d, width, color=colors)
    ax.bar_label(rects)
ax.set_ylabel("Messages")
ax.set_xticks(np.arange(len(show)) + width/2, show)
plt.show()

In [None]:
show = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
data = []
for m in show:
    data.append([
        execute_query(msg_conn, "SELECT COUNT(*) FROM MSG WHERE IsSender = 1 AND Reserved6 LIKE ?", [f"%{m}%"])[0][0],
        execute_query(msg_conn, "SELECT COUNT(*) FROM MSG WHERE IsSender = 0 AND Reserved6 LIKE ?", [f"%{m}%"])[0][0],
    ])
data = np.array(data).T
data_sum = np.sum(data, axis=0)
if VERBOSE == "1":
    pprint(data)
    pprint(data_sum)

max_i = np.argmax(data_sum)
min_i = np.argmin(data_sum)
pct = data_sum[max_i] * 100.0 / data_sum[min_i] - 100
month_cn = ["一月", "二月", "三月", "四月", "五月", "六月", "七月", "八月", "九月", "十月", "十一月", "十二月"]
print(f"聊天最少的月份是{month_cn[min_i]}，只有 {data_sum[min_i]} 条消息；")
print(f"聊天最多的月份是{month_cn[max_i]}，竟然有 {data_sum[max_i]} 条消息，比{month_cn[min_i]}多出了 {pct:.2f}% !")

x = np.arange(len(show))
width = 0.65
bottom = np.zeros(len(show))
fig, ax = plt.subplots(figsize=(8, 3))
for i, d in enumerate(data):
    rects = ax.bar(x, d, width, bottom=bottom, color=colors[i])
    ax.bar_label(rects, label_type="center")
    bottom += d
ax.set_ylabel("Messages")
ax.set_xticks(x, show)
plt.show()

In [None]:
show = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
data = []
for m in show:
    data.append([
        execute_query(msg_conn, "SELECT COUNT(*) FROM MSG WHERE IsSender = 1 AND Reserved6 LIKE ?", [f"%{m}%"])[0][0],
        execute_query(msg_conn, "SELECT COUNT(*) FROM MSG WHERE IsSender = 0 AND Reserved6 LIKE ?", [f"%{m}%"])[0][0],
    ])
data = np.array(data).T
data_sum = np.sum(data, axis=0)
if VERBOSE == "1":
    pprint(data)
    pprint(data_sum)

max_i = np.argmax(data_sum)
min_i = np.argmin(data_sum)
pct = data_sum[max_i] * 100.0 / data_sum[min_i] - 100
day_cn = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期天"]
print(f"{day_cn[min_i]}的聊天最少，只有 {data_sum[min_i]} 条消息；")
print(f"{day_cn[max_i]}的聊天最多，竟然有 {data_sum[max_i]} 条消息，比{day_cn[min_i]}多出了 {pct:.2f}% !")

x = np.arange(len(show))
width = 0.6
bottom = np.zeros(len(show))
fig, ax = plt.subplots(figsize=(6, 3))
for i, d in enumerate(data):
    rects = ax.bar(x, d, width, bottom=bottom, color=colors[i])
    ax.bar_label(rects, label_type="center")
    bottom += d
ax.set_ylabel("Messages")
ax.set_xticks(x, show)
plt.show()

In [None]:
dates = []
dates_raw = []
dt = datetime.fromtimestamp(int(START_TIMESTAMP), tz=timezone(timedelta(hours=8)))
end = datetime.fromtimestamp(int(END_TIMESTAMP), tz=timezone(timedelta(hours=8)))
while dt < end:
    dates.append([dt.strftime("%b %d").replace(" 0", "  "), dt.strftime("%Y")])
    dates_raw.append(dt)
    dt += timedelta(days=1)
if VERBOSE == "1":
    pprint(dates[:10])
    pprint(dates[-10:])

data = []
for d in dates:
    data.append(execute_query(
        msg_conn,
        "SELECT COUNT(*) FROM MSG WHERE Reserved6 LIKE ?",
        [f"%{d[0]}%{d[1]}"]
    )[0][0])
if VERBOSE == "1":
    pprint(data[:10])
    pprint(data[-10:])

In [None]:
max_i = np.argmax(data)
min_i = np.argmin(data)
pct = data[max_i] * 100.0 / (data[min_i] + 1) - 100
print(f"{dates_raw[max_i].month} 月 {dates_raw[max_i].day} 日是聊天最多的一天，足足发送了 {data[max_i]} 条消息，这天有什么记忆深刻的事吗？")
print(f"{dates_raw[min_i].month} 月 {dates_raw[min_i].day} 日只发送了 {data[min_i]} 条消息，果然还是面对面的聊天更快！")

res = np.where(np.array(data) == 0)[0]
if len(res) != 0:
    print(f"没发消息的日子有：")
    for i in res:
        print(f"  - {dates_raw[i].month} 月 {dates_raw[i].day} 日")

fig, ax = plt.subplots(figsize=(10, 3))
ax.plot(dates_raw, data)
ax.set_ylabel("Messages")
ax.yaxis.grid(True)
fig.autofmt_xdate()
plt.show()

In [None]:
print(f"平均每天会产生 {np.mean(data):.2f} 条消息记录，最常出现的消息条数是 {stats.mode(data)[0]}。")

fig, ax = plt.subplots(figsize=(10, 2))
ax.violinplot(data, orientation='horizontal', showmeans=True)
ax.set_yticklabels([])
ax.set_xlabel("Num of messages")
ax.xaxis.grid(True)
plt.show()

In [None]:
query = """
SELECT StrContent
FROM MSG
WHERE Type = 1
  AND SubType = 0
  AND IsSender = ?
"""
res1 = execute_query(msg_conn, query, [1])
res2 = execute_query(msg_conn, query, [0])
data1 = list(map(lambda r: len(r[0]), res1))
data2 = list(map(lambda r: len(r[0]), res2))
if VERBOSE == "1":
    pprint(res1[:10])
    pprint(data1[:10])

# drop the longest 1%
data1_clean = sorted(data1)[:int(len(data1)*0.99)]
data2_clean = sorted(data2)[:int(len(data2)*0.99)]

print(f"{senders_cn[0]}平均每条消息会发 {np.mean(data1_clean):.2f} 个字，最常发送的字数是 {stats.mode(data1_clean)[0]}，字数中位数是 {int(np.median(data1_clean))}；")
print(f"{senders_cn[1]}平均每条消息会发 {np.mean(data2_clean):.2f} 个字，最常发送的字数是 {stats.mode(data2_clean)[0]}，字数中位数是 {int(np.median(data2_clean))}。")

fig, axs = plt.subplots(nrows=2, ncols=1, sharex=True, figsize=(10, 4))
parts = axs[0].violinplot(data1_clean, orientation='horizontal', showmeans=True)
for pc in parts["bodies"]:
    pc.set_facecolor(colors[0])
    pc.set_alpha(1)
parts = axs[1].violinplot(data2_clean, orientation='horizontal', showmeans=True)
for pc in parts["bodies"]:
    pc.set_facecolor(colors[1])
    pc.set_alpha(1)
for ax in axs:
    ax.set_yticklabels([])
    ax.set_xlabel("Characters")
    ax.xaxis.grid(True)
    ax.xaxis.set_ticks(np.arange(1, max(data1_clean[-1], data2_clean[-1]) + 3, 3))
plt.show()

### Credit

[purple4pur/wx-chat-visualize](https://github.com/purple4pur/wx-chat-visualize)