Skip to content

Commit

Permalink
Feat: bilibili live danmaku monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
remiliacn committed Oct 25, 2023
1 parent 05c3633 commit 235ee26
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 4 deletions.
101 changes: 97 additions & 4 deletions Services/live_notification.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,30 @@
import asyncio
import codecs
import dataclasses
import pickle
import sqlite3
import subprocess
import uuid
from json import dumps, loads
from os import getcwd
from time import time
from typing import Union, List
from time import time, time_ns
from typing import Union, List, Dict

from loguru import logger
from wordcloud import WordCloud

from Services.util.common_util import HttpxHelperClient, OptionalDict
from config import DANMAKU_PROCESS


@dataclasses.dataclass
class LivestreamDanmakuData:
danmaku_frequency_dict: Dict = None
danmaku_count: int = 0
qq_group_dumped: str = ''
gift_received_count: int = 0
like_received_count: int = 0
highest_rank: int = 999


class DynamicNotificationData:
Expand Down Expand Up @@ -51,6 +68,14 @@ def _init_database(self):
)
"""
)
self.live_database.execute(
"""
create table if not exists bilibili_danmaku_data (
uid text not null unique on conflict ignore,
data_dump text
)
"""
)
self.live_database.commit()

async def _get_one_notification_data_from_db(self, streamer_name: str):
Expand Down Expand Up @@ -151,6 +176,72 @@ async def convert_live_data_to_string(data: LiveNotificationData) -> str:

return response

def dump_live_data(self, data: str):
uid = str(uuid.uuid1())
self.live_database.execute(
"""
insert into bilibili_danmaku_data (uid, data_dump) values (?, ?)
""", (uid, data)
)
self.live_database.commit()

def get_dumped_live_data(self):
data = self.live_database.execute(
"""
select * from bilibili_danmaku_data
"""
).fetchall()

return self._analyze_dumped_live_data(data)

@staticmethod
def stringify_danmaku_data(data: LivestreamDanmakuData) -> str:
word_cloud = WordCloud(font_path=f'{getcwd()}/Services/util/SourceHanSansSC-Bold.otf',
background_color='#fff',
max_words=90,
width=1920,
height=1080).generate_from_frequencies(data.danmaku_frequency_dict)
path = f'{getcwd()}/data/pixivPic/{int(time_ns())}.png'
word_cloud.to_file(path)
return '直播已结束!撒花~✿✿ヽ(°▽°)ノ✿\n' \
f'一共收到啦{data.danmaku_count}枚弹幕\n' \
f'被点赞共{data.like_received_count}\n' \
f'收到礼物(包括SC){data.gift_received_count}\n' \
f'最高人气排名:{data.highest_rank}\n' \
f'[CQ:image,file=file:///{path}]'

def _delete_dumped_live_data(self, uid):
self.live_database.execute(
"""
delete from bilibili_danmaku_data where uid = ?
""", (uid,)
)
self.live_database.commit()

def _analyze_dumped_live_data(self, datas) -> List[LivestreamDanmakuData]:
unpickled_data_list = []
for data in datas:
uid = data[0]
dumped_data: str = data[1]
unpickled_data: LivestreamDanmakuData = pickle.loads(codecs.decode(dumped_data.encode(), "base64"))
unpickled_data_list.append(unpickled_data)

self._delete_dumped_live_data(uid)

return unpickled_data_list

def check_if_live_cached(self, room_id: str) -> bool:
user_needs_to_be_checked = self.live_database.execute(
"""
select * from live_notification_bilibili where uid = ?
""", (room_id,)
).fetchone()

is_live = user_needs_to_be_checked is not None and user_needs_to_be_checked[4]
logger.success(f'Live cache hit, result returned: {is_live}')

return is_live

async def check_if_live(self, room_id: str, streamer_name: str) -> LiveNotificationData:
logger.info(f'Checking live stat for {streamer_name}, room id: {room_id}')
url = self.bilibili_live_check_url + room_id
Expand All @@ -165,7 +256,7 @@ async def check_if_live(self, room_id: str, streamer_name: str) -> LiveNotificat
.map('live_status') \
.or_else(0)

if not is_live:
if is_live != 1:
logger.info(f'{streamer_name} is not live.')
return LiveNotificationData(
streamer_name,
Expand Down Expand Up @@ -210,6 +301,7 @@ async def check_live_bilibili(self) -> List[LiveNotificationData]:
is_enabled = live_data[1]
room_id = live_data[2]
last_record_state = live_data[4]
group_ids = live_data[5]

if not is_enabled:
logger.info(f'{streamer_name} is disable to send notification.')
Expand All @@ -220,8 +312,9 @@ async def check_live_bilibili(self) -> List[LiveNotificationData]:
if notify_data.is_live:
notify_data.set_live_change_status('开播啦!')
live_data_list.append(notify_data)
subprocess.Popen(f'{DANMAKU_PROCESS} {room_id} {group_ids}')
else:
notify_data = LiveNotificationData('糖糖', False)
notify_data = LiveNotificationData(streamer_name, False)
notify_data.set_live_change_status('下播啦!')
live_data_list.append(notify_data)

Expand Down
9 changes: 9 additions & 0 deletions awesome/plugins/vtuber_functions/vtuber_functions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from json import loads
from typing import List

import nonebot
from loguru import logger
Expand Down Expand Up @@ -108,6 +109,14 @@ async def do_bilibili_live_fetch():
group_id=int(group),
message=await live_notification.convert_live_data_to_string(data))

unpickled_danmaku_datas = live_notification.get_dumped_live_data()
for danmaku_data in unpickled_danmaku_datas:
notified_group: List[str] = loads(danmaku_data.qq_group_dumped)
for group in notified_group:
await bot.send_group_msg(
group_id=int(group),
message=live_notification.stringify_danmaku_data(danmaku_data))


async def do_dynamic_fetch():
logger.info('Automatically fetching bilibili dynamic info...')
Expand Down
160 changes: 160 additions & 0 deletions blive_danmaku_report_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# -*- coding: utf-8 -*-
# !/usr/bin/env python3.10
import asyncio
import codecs
import http.cookies
import pickle
import re
import sys
from typing import Optional

import aiohttp
from loguru import logger

import blivedm
import blivedm.models.web as web_models
from Services.live_notification import LiveNotification, LivestreamDanmakuData
from Services.util.common_util import OptionalDict
from blivedm.clients import ws_base

live_notification = LiveNotification()


class MyDanmakuHandler(blivedm.BaseHandler):
def __init__(self):
self.danmaku_frequency_dict = {}
self.danmaku_count = 0
self.highest_rank = 99999
self.rank_area = ''
self.like_received_count = 0
self.gift_received_count = 0
self.room_id = ''
self.group_ids = ''

def set_room_id(self, parsed_in_room_id: str):
self.room_id = parsed_in_room_id

def set_group_ids(self, group_ids_dumped: str):
self.group_ids = group_ids_dumped

def add_danmaku_into_frequency_dict(self, msg):
self.danmaku_count += 1
msg = msg.replace('(', '').replace(')', '').replace('(', '').replace(')', '')
if not re.fullmatch(r'[\s+,,。、??!!]+', msg):
message_list = re.split(r'[\s+,,。、??!!]', msg)
else:
message_list = [msg]
for message in message_list:
message = message.strip()
if len(message) <= 5 and message:
logger.success(f'{message} is noted.')
if message not in self.danmaku_frequency_dict:
self.danmaku_frequency_dict[message] = 1
else:
self.danmaku_frequency_dict[message] += 1

_CMD_CALLBACK_DICT = blivedm.BaseHandler._CMD_CALLBACK_DICT.copy()

def _like_info_v3_callback(self, client: blivedm.BLiveClient, command: dict):
self.like_received_count += 1
logger.info(f'收到点赞, {client.room_id}, 点赞人:{OptionalDict(command).map("data").map("uname").or_else("?")}')

def _popularity_change(self, client: blivedm.BLiveClient, command: dict):
rank = OptionalDict(command).map("data").map("rank").or_else(999)
logger.info(f'人气榜变动,目前人气档位:{rank}')
if rank > 0:
self.highest_rank = min(self.highest_rank, rank)

_CMD_CALLBACK_DICT['LIKE_INFO_V3_CLICK'] = _like_info_v3_callback
_CMD_CALLBACK_DICT['POPULAR_RANK_CHANGED'] = _popularity_change

def _on_heartbeat(self, client: ws_base.WebSocketClientBase, message: web_models.HeartbeatMessage):
if not live_notification.check_if_live_cached(self.room_id):
logger.success(f'Livestream is not going anymore for room id: {self.room_id}, dumping the data.')
pickled_data = codecs.encode(pickle.dumps(LivestreamDanmakuData(
danmaku_count=self.danmaku_count,
danmaku_frequency_dict=self.danmaku_frequency_dict,
qq_group_dumped=self.group_ids,
like_received_count=self.like_received_count,
gift_received_count=self.gift_received_count,
highest_rank=self.highest_rank
)), 'base64').decode()
live_notification.dump_live_data(pickled_data)
exit(1)

def _on_gift(self, client: blivedm.BLiveClient, message: web_models.GiftMessage):
self.gift_received_count += message.num
logger.info(f'[{client.room_id}] {message.uname} 赠送{message.gift_name}x{message.num}'
f' ({message.coin_type}瓜子x{message.total_coin})')

def _on_danmaku(self, client: blivedm.BLiveClient, message: web_models.DanmakuMessage):
logger.info(f'Message received: {message.msg}, name: {message.uname}, receive_time: {message.timestamp}')
self.add_danmaku_into_frequency_dict(message.msg)

def _on_super_chat(self, client: blivedm.BLiveClient, message: web_models.SuperChatMessage):
self.gift_received_count += 1
logger.info(f'[{client.room_id}] 醒目留言 ¥{message.price} {message.uname}{message.message}')


# 直播间ID的取值看直播间URL
TEST_ROOM_IDS = []

# 这里填一个已登录账号的cookie。不填cookie也可以连接,但是收到弹幕的用户名会打码,UID会变成0
SESSDATA = ''

session: Optional[aiohttp.ClientSession] = None
handler = MyDanmakuHandler()


async def main():
global TEST_ROOM_IDS
argv = sys.argv
if not sys.argv or len(argv) != 3:
raise RuntimeError('No argv, should includes at least one room id.')
else:
room_id = argv[1]
group_ids = argv[2]
handler.set_room_id(room_id)
handler.set_group_ids(group_ids)
TEST_ROOM_IDS.append(room_id)
init_session()
try:
await run_listening()
finally:
await session.close()


def init_session():
cookies = http.cookies.SimpleCookie()
cookies['SESSDATA'] = SESSDATA
cookies['SESSDATA']['domain'] = 'bilibili.com'

global session
session = aiohttp.ClientSession()
session.cookie_jar.update_cookies(cookies)


async def run_listening():
"""
演示同时监听多个直播间
"""
clients = [blivedm.BLiveClient(int(single_room_id), session=session) for single_room_id in TEST_ROOM_IDS]
for client in clients:
client.set_handler(handler)
client.start()

try:
await asyncio.gather(*(
client.join() for client in clients
))
finally:
await asyncio.gather(*(
client.stop_and_close() for client in clients
))


if __name__ == '__main__':
try:
asyncio.run(main())
finally:
print(handler.danmaku_frequency_dict)
5 changes: 5 additions & 0 deletions sample_config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from os import getcwd

# noinspection PyUnresolvedReferences
from nonebot.default_config import *

Expand Down Expand Up @@ -32,3 +34,6 @@
HOST = ''
PORT = 0
SUPER_USER = 0

DANMAKU_PROCESS = f'path_to_py310_python_executable' + \
f' {getcwd()}/blive_danmaku_report_generator.py'

0 comments on commit 235ee26

Please sign in to comment.