In [None]:
BASE_PATH = "C:/Users/Mark/repos/WeChatUserDB/DECRYPT_WIN_WECHAT_DB/"

In [None]:
import sqlite3
import numpy as np
import glob

In [None]:
# get msgs
con_msg = sqlite3.connect(BASE_PATH + "decrypt_ChatMsg.db")
cur_msg = con_msg.cursor()
msgs = cur_msg.execute(
    "SELECT StrTalker, IsSender, Type, StrContent FROM ChatCRMsg"
).fetchall()
columns_msg = cur_msg.execute("PRAGMA table_info(ChatCRMsg)").fetchall()

for f in glob.glob(BASE_PATH + "decrypt_MSG*"):
    con_msg = sqlite3.connect(f)
    cur_msg = con_msg.cursor()
    msgs += cur_msg.execute(
        "SELECT StrTalker, IsSender, Type, StrContent FROM MSG"
    ).fetchall()


In [None]:
# get contacts
con_mm = sqlite3.connect(BASE_PATH + "decrypt_MicroMsg.db")
cur_mm = con_mm.cursor()
contacts = cur_mm.execute("SELECT * FROM Contact").fetchall()
columns_mm = cur_mm.execute("PRAGMA table_info(Contact)").fetchall()

In [None]:
cids = {c[1]: c[0] for c in columns_mm}
names = {
    c[cids["UserName"]]: c[cids["Remark"]] or c[cids["NickName"]] for c in contacts
}


def get_name(s):
    if s in names:
        return names[s]
    else:
        return s

In [None]:
def get_wd_cnt(type, l):
    if type == 1:
        return l
    else:
        return 0

In [None]:
import pandas as pd

df = pd.DataFrame(msgs, columns=["strTalker", "IsSender", "Type", "StrContent"])
df["name"] = df.apply(lambda row: get_name(row["strTalker"]), axis=1)
df["msg_cnt"] = 1
df["wd_cnt"] = df["StrContent"].str.len()
df["cn_wd_cnt"] = df["StrContent"].str.count(pat="[\u4e00-\u9fff]")
df["wd_cnt"] = df.apply(lambda row: get_wd_cnt(row["Type"], row["wd_cnt"]), axis=1)
df["cn_wd_cnt"] = df.apply(
    lambda row: get_wd_cnt(row["Type"], row["cn_wd_cnt"]), axis=1
)

In [None]:
# I am sender
snd = (
    df.where(df["IsSender"] == 1)
    .groupby(["strTalker", "name"])
    .sum(["IsSender", "msg_cnt", "wd_cnt", "cn_wd_cnt"])
    .sort_values("msg_cnt", ascending=False)
)
snd["wd_pmsg"] = snd["wd_cnt"] / snd["msg_cnt"]
snd["cn_wd_pmsg"] = snd["cn_wd_cnt"] / snd["msg_cnt"]

snd.head(20).style.hide(["IsSender", "Type"], axis=1).format(
    {
        "msg_cnt": "{:.0f}",
        "wd_cnt": "{:.0f}",
        "cn_wd_cnt": "{:.0f}",
        "wd_pmsg": "{:.2f}",
        "cn_wd_pmsg": "{:.2f}",
    },
)


In [None]:
# I am receiver
rcv = (
    df.where(~df["strTalker"].str.contains("chatroom"))
    .where(df["IsSender"] == 0)
    .groupby(["strTalker", "name"])
    .sum(["IsSender", "msg_cnt", "wd_cnt", "cn_wd_cnt"])
    .sort_values("msg_cnt", ascending=False)
)
rcv["wd_pmsg"] = rcv["wd_cnt"] / rcv["msg_cnt"]
rcv["cn_wd_pmsg"] = rcv["cn_wd_cnt"] / rcv["msg_cnt"]

rcv.head(20).style.hide(["IsSender", "Type"], axis=1).format(
    {
        "msg_cnt": "{:.0f}",
        "wd_cnt": "{:.0f}",
        "cn_wd_cnt": "{:.0f}",
        "wd_pmsg": "{:.2f}",
        "cn_wd_pmsg": "{:.2f}",
    },
)


In [None]:
# both
bth = (
    df.where(~df["strTalker"].str.contains("chatroom"))
    .groupby(["strTalker", "name"])
    .sum(["IsSender", "msg_cnt", "wd_cnt", "cn_wd_cnt"])
    .sort_values("msg_cnt", ascending=False)
)
bth["wd_pmsg"] = bth["wd_cnt"] / bth["msg_cnt"]
bth["cn_wd_pmsg"] = bth["cn_wd_cnt"] / bth["msg_cnt"]
bth["msg_ratio"] = (bth["msg_cnt"] - bth["IsSender"]) / (bth["IsSender"])


def get_cn_wd_ratio(id):
    if id in rcv.index and id in snd.index:
        return rcv.loc[id, "cn_wd_cnt"] / snd.loc[id, "cn_wd_cnt"]
    else:
        return None


bth["cn_wd_ratio"] = bth.apply(lambda row: get_cn_wd_ratio(row.name), axis=1)

bth.head(20).style.hide(["IsSender", "Type"], axis=1).format(
    {
        "msg_cnt": "{:.0f}",
        "wd_cnt": "{:.0f}",
        "cn_wd_cnt": "{:.0f}",
        "wd_pmsg": "{:.2f}",
        "cn_wd_pmsg": "{:.2f}",
        "msg_ratio": "{:.2f}",
        "cn_wd_ratio": "{:.2f}",
    },
)

In [None]:
# high r/s ratio
bth[np.logical_and(bth["msg_ratio"] < np.inf, bth["msg_cnt"] > 215)].sort_values(
    "msg_ratio", ascending=False
).head(20).style.hide(["IsSender", "Type"], axis=1).format(
    {
        "msg_cnt": "{:.0f}",
        "wd_cnt": "{:.0f}",
        "cn_wd_cnt": "{:.0f}",
        "wd_pmsg": "{:.2f}",
        "cn_wd_pmsg": "{:.2f}",
        "msg_ratio": "{:.2f}",
        "cn_wd_ratio": "{:.2f}",
    },
)

In [None]:
# low r/s ratio
bth[np.logical_and(pd.notna(bth["msg_ratio"]), bth["msg_cnt"] > 215)].sort_values(
    "msg_ratio", ascending=False
).tail(20).style.hide(["IsSender", "Type"], axis=1).format(
    {
        "msg_cnt": "{:.0f}",
        "wd_cnt": "{:.0f}",
        "cn_wd_cnt": "{:.0f}",
        "wd_pmsg": "{:.2f}",
        "cn_wd_pmsg": "{:.2f}",
        "msg_ratio": "{:.2f}",
        "cn_wd_ratio": "{:.2f}",
    },
)

In [None]:
# high r/s cn_wd_ratio
bth[np.logical_and(bth["cn_wd_ratio"] < np.inf, bth["msg_cnt"] > 215)].sort_values(
    "cn_wd_ratio", ascending=False
).head(20).style.hide(["IsSender", "Type"], axis=1).format(
    {
        "msg_cnt": "{:.0f}",
        "wd_cnt": "{:.0f}",
        "cn_wd_cnt": "{:.0f}",
        "wd_pmsg": "{:.2f}",
        "cn_wd_pmsg": "{:.2f}",
        "msg_ratio": "{:.2f}",
        "cn_wd_ratio": "{:.2f}",
    },
)

In [None]:
# low r/s cn_wd_ratio
bth[np.logical_and(pd.notna(bth["cn_wd_ratio"]), bth["msg_cnt"] > 215)].sort_values(
    "cn_wd_ratio", ascending=False
).tail(20).style.hide(["IsSender", "Type"], axis=1).format(
    {
        "msg_cnt": "{:.0f}",
        "wd_cnt": "{:.0f}",
        "cn_wd_cnt": "{:.0f}",
        "wd_pmsg": "{:.2f}",
        "cn_wd_pmsg": "{:.2f}",
        "msg_ratio": "{:.2f}",
        "cn_wd_ratio": "{:.2f}",
    },
)