diff --git a/app/music_new/music.py b/app/music_new/music.py new file mode 100644 index 0000000..f1bffc8 --- /dev/null +++ b/app/music_new/music.py @@ -0,0 +1,107 @@ +from abc import abstractmethod as abstract, ABC as ABSTRACT +from typing import Optional + +import aiohttp + + +class MusicPiece(ABSTRACT): + def __init__(self, name: str, artists: list[str]): + self.name = name or 'N/A' + self.artists = artists or ['Unknown'] + self.endtime_ms: Optional[int] = None + + @abstract + async def cover_url(self, sess: aiohttp.ClientSession) -> str: + pass + + @abstract + async def duration_ms(self, sess: aiohttp.ClientSession) -> int: + pass + + async def playable(self, sess: aiohttp.ClientSession) -> bool: + return (await self.media_url(sess)) is not None + + @abstract + async def media_url(self, sess: aiohttp.ClientSession) -> Optional[str]: + """Must be consistent with the self.playable: return None when self.playable is False""" + pass + + def __repr__(self): + return f'{self.__class__.__name__}({repr(self.name)}, {repr(self.artists)})' + + +class Platform(ABSTRACT): + """The abstract class for a music platform. + + Some platforms may have additional feature (e.g. radio of netease). + What we should do is to create another platform with different implementation. + + Do raise NotImplementedError and update __functionalities__ when some functionalities are not available. + """ + + __functionalities__ = [ + 'search_music', + 'play_by_keywords', + 'play_by_url', + 'play_by_id', + 'import_album_by_url', + 'import_playlist_by_url' + ] + + def __init__(self, name: str, *alias: str): + self.name = name + self.alias = [*alias] + + def names(self): + return {self.name, *self.alias} + + # ============================================= Music ============================================= + + @abstract + async def search_music(self, sess: aiohttp.ClientSession, keywords: str, limit: int) -> list[MusicPiece]: + """Search for at most `limit` musics, but do not get all information at once. + It can return an empty list.""" + pass + + @staticmethod + @abstract + def is_music_url(text: str): + pass + + @staticmethod + @abstract + def is_music_id(text: str): + """Make it return False if playing by id is not applicable.""" + pass + + async def play_by_keywords(self, sess: aiohttp.ClientSession, keywords: str, limit: int) -> Optional[MusicPiece]: + result = await self.search_music(sess, keywords, limit) + return result[0] if result else None + + @abstract + async def play_by_url(self, sess: aiohttp.ClientSession, url: str) -> Optional[MusicPiece]: + """A music can have multiple urls, which potentially come from phone app, PC client and webpage.""" + pass + + @abstract + async def play_by_id(self, sess: aiohttp.ClientSession, music_id: int) -> Optional[MusicPiece]: + pass + + # ============================================= Album ============================================= + + @abstract + async def import_album_by_url(self, sess: aiohttp.ClientSession, url: str, limit: int, offset: int = 0) \ + -> Optional[MusicPiece]: + """An album can have multiple urls, which potentially come from phone app, PC client and webpage.""" + pass + + # ============================================= Playlist ============================================= + + @abstract + async def import_playlist_by_url(self, sess: aiohttp.ClientSession, url: str, limit: int, offset: int = 0) \ + -> Optional[MusicPiece]: + """A playlist can have multiple urls, which potentially come from phone app, PC client and webpage.""" + pass + + def __repr__(self): + return f"{self.__class__.__name__}({repr(self.name)}, {', '.join(map(repr, self.alias))})" diff --git a/app/music_new/netease/netease_music.py b/app/music_new/netease/netease_music.py new file mode 100644 index 0000000..54776c3 --- /dev/null +++ b/app/music_new/netease/netease_music.py @@ -0,0 +1,173 @@ +import json +import math +import re +from dataclasses import dataclass +from enum import Enum +from time import time as now +from typing import Optional + +import aiohttp + +from app.music_new.music import Platform, MusicPiece + +MUSIC_ID_PATTERN = re.compile(r'[1-9]\d*') +MUSIC_URL_PATTERN = re.compile( + r'((https?)://)?music\.163\.com/' + r'(song/(?P[1-9]\d*))' + r'|(#/song\?(.*?&)*id=(?P[1-9]\d*))' +) + +NETEASE_MUSIC_DETAIL_URL = 'http://music.163.com/api/song/detail/' +NETEASE_MUSIC_MEDIA_URL = 'http://music.163.com/api/song/enhance/player/url/' +NETEASE_MUSIC_DEFAULT_BITRATE = 320_000 +NETEASE_MUSIC_SEARCH_URL = 'http://music.163.com/api/search/get/' + + +@dataclass +class BasicDetails: + song_id: int + name: str + artists: list[str] + duration_ms: int + cover_url: str + + +class SearchType(Enum): + MUSIC = 1 + ALBUM = 10 + SINGER = 100 + PLAYLIST = 1000 + USER = 1002 + MV = 1004 + LYRICS = 1006 + RADIO = 1009 + VIDEO = 1014 + + +async def search_music(sess: aiohttp.ClientSession, keywords: str, limit: int = 30, offset: int = 0) \ + -> list[BasicDetails]: + async with sess.get(NETEASE_MUSIC_SEARCH_URL, params={ + 's': keywords, + 'type': SearchType.MUSIC.value, + 'limit': limit, + 'offset': offset + }) as resp: + text = await resp.text() + data = json.loads(text) + status = data.get('code') + if status == 200: + return [ + BasicDetails( + song_id=song['id'], + name=song.get('name', 'N/A'), + artists=[artist.get('name', 'Unknown') for artist in song.get('artists', [])], + duration_ms=song.get('duration', 0), + cover_url=song.get("album", {}).get("picUrl", "") + '?param=130y130' + ) for song in data.get('result', {}).get('songs', []) if isinstance(song.get('id'), int) + ] + else: + raise Exception(f"could not search music with keywords '{keywords}', code is: {status}") + + +async def batch_fetch_basic_details(sess: aiohttp.ClientSession, *song_ids: int) -> dict[int, BasicDetails]: + """Incorrect id does not have a corresponding song entry in the returned value.""" + async with sess.get(NETEASE_MUSIC_DETAIL_URL, params={ + 'ids': f'[{",".join(map(str, song_ids))}]' + }) as resp: + text = await resp.text() + data = json.loads(text) + status = data.get('code') + if status == 200: + return { + song['id']: BasicDetails( + song_id=song['id'], + name=song.get('name', 'N/A'), + artists=[artist.get('name', 'Unknown') for artist in song.get('artists', [])], + duration_ms=song.get('duration', 0), + cover_url=song.get("album", {}).get("picUrl", "") + '?param=130y130' + ) for song in data.get('songs', []) if isinstance(song.get('id'), int) + } + else: + raise Exception(f"could not fetch music details with ids {song_ids}, code is: {status}") + + +async def batch_fetch_media_urls(sess: aiohttp.ClientSession, *song_ids: int, + bitrate=NETEASE_MUSIC_DEFAULT_BITRATE) -> dict[int, str]: + """Incorrect id does not have a corresponding song entry in the returned value.""" + async with sess.get(NETEASE_MUSIC_MEDIA_URL, params={ + 'br': bitrate, + 'ids': f'[{",".join(map(str, song_ids))}]' + }) as resp: + text = await resp.text() + data = json.loads(text) + status = data.get('code') + if status == 200: + return { + song['id']: song['url'] for song in data.get('data', []) + if isinstance(song.get('id'), int) and isinstance(song.get('url'), str) + } + else: + raise Exception(f'could not fetch media urls with ids {song_ids}, code is {status}.') + + +class NeteaseMusicPlatform(Platform): + def __init__(self): + super(NeteaseMusicPlatform, self).__init__('netease', 'netease-music', '网易', '网易云', '网易云音乐') + + async def search_music(self, sess: aiohttp.ClientSession, keywords: str, limit: int) -> list[MusicPiece]: + pass + + @staticmethod + def is_music_id(text: str): + return MUSIC_ID_PATTERN.fullmatch(text) is not None + + @staticmethod + def is_music_url(text: str): + return MUSIC_URL_PATTERN.fullmatch(text) is not None + + async def play_by_url(self, sess: aiohttp.ClientSession, url: str) -> Optional[MusicPiece]: + result = MUSIC_URL_PATTERN.fullmatch(url) + if result is None: + return None + result = result.groupdict() + result = result.get('id1') or result.get('id2') + if result is None: + return None + return await self.play_by_id(sess, int(result)) + + async def play_by_id(self, sess: aiohttp.ClientSession, music_id: int) -> Optional[MusicPiece]: + details = await batch_fetch_basic_details(sess, music_id) + detail = details.get(music_id) + return None if detail is None else NeteaseMusic(detail) + + async def import_playlist_by_url(self, sess: aiohttp.ClientSession, url: str, limit: int, offset: int = 0) \ + -> Optional[MusicPiece]: + pass + + async def import_album_by_url(self, sess: aiohttp.ClientSession, url: str, limit: int, offset: int = 0) \ + -> Optional[MusicPiece]: + pass + + +MEDIA_EXPIRATION_TIME_SEC = 5 * 60 + + +class NeteaseMusic(MusicPiece): + def __init__(self, details: BasicDetails): + super(NeteaseMusic, self).__init__(details.name, details.artists) + self.details = details + self.__media_expiration_sec = -math.inf + self.__media_url = None + + async def cover_url(self, sess: aiohttp.ClientSession) -> str: + return self.details.cover_url + + async def duration_ms(self, sess: aiohttp.ClientSession) -> int: + return self.details.duration_ms + + async def media_url(self, sess: aiohttp.ClientSession) -> Optional[str]: + if self.__media_expiration_sec < now(): + urls = await batch_fetch_media_urls(sess, self.details.song_id) + self.__media_url = urls.get(self.details.song_id) + self.__media_expiration_sec = now() + MEDIA_EXPIRATION_TIME_SEC + return self.__media_url diff --git a/app/test/test_netease_music.py b/app/test/test_netease_music.py new file mode 100644 index 0000000..449238b --- /dev/null +++ b/app/test/test_netease_music.py @@ -0,0 +1,86 @@ +import asyncio +from unittest import IsolatedAsyncioTestCase, TestCase + +from app.music_new.netease.netease_music import * + +SESSION_CLOSE_DELAY = 0 +SONG_ID_PLAYABLE = 36226134 +SONG_ID_NOT_PLAYABLE = 1866768982 +SONG_ID_INVALID = 0 + + +class TestNeteaseMusicAPI(IsolatedAsyncioTestCase): + async def asyncSetUp(self) -> None: + self.sess = aiohttp.ClientSession() + + async def asyncTearDown(self) -> None: + # shortly delay before closing for the best practice: + # https://docs.aiohttp.org/en/latest/client_advanced.html#graceful-shutdown + await asyncio.sleep(SESSION_CLOSE_DELAY) + await self.sess.close() + + async def test_search_music(self): + details = await search_music(self.sess, "Duvet") + self.assertNotEqual(details, []) + + async def test_batch_fetch_basic_details(self): + details = await batch_fetch_basic_details(self.sess, SONG_ID_PLAYABLE, SONG_ID_NOT_PLAYABLE, SONG_ID_INVALID) + self.assertIsNotNone(details.get(SONG_ID_PLAYABLE)) + self.assertIsNotNone(details.get(SONG_ID_NOT_PLAYABLE)) + self.assertIsNone(details.get(SONG_ID_INVALID)) + + async def test_batch_fetch_media_urls(self): + details = await batch_fetch_media_urls(self.sess, SONG_ID_PLAYABLE, SONG_ID_NOT_PLAYABLE, SONG_ID_INVALID) + self.assertIsNotNone(details.get(SONG_ID_PLAYABLE)) + self.assertIsNone(details.get(SONG_ID_NOT_PLAYABLE)) + self.assertIsNone(details.get(SONG_ID_INVALID)) + + +class TestNeteaseMusic(IsolatedAsyncioTestCase): + async def asyncSetUp(self) -> None: + self.sess = aiohttp.ClientSession() + + async def asyncTearDown(self) -> None: + await asyncio.sleep(SESSION_CLOSE_DELAY) + await self.sess.close() + + async def test_music_playable(self): + music = await NeteaseMusicPlatform().play_by_id(self.sess, SONG_ID_PLAYABLE) + self.assertEqual(music.name, + 'Liebesträume - 3 Notturnos für das Pianoforte S.541:O lieb, so lang du lieben kannst!') + self.assertEqual(music.artists, ['Leslie Howard']) + self.assertEqual(await music.duration_ms(self.sess), 253600) + await assert_is_image_url(self, await music.cover_url(self.sess)) + self.assertTrue(await music.playable(self.sess)) + await assert_is_music_url(self, await music.media_url(self.sess)) + + async def test_music_not_playable(self): + music = await NeteaseMusicPlatform().play_by_id(self.sess, SONG_ID_NOT_PLAYABLE) + self.assertEqual(music.name, 'BEACON') + self.assertEqual(music.artists, ['平沢進']) + self.assertEqual(await music.duration_ms(self.sess), 226146) + await assert_is_image_url(self, await music.cover_url(self.sess)) + self.assertFalse(await music.playable(self.sess)) + self.assertIsNone(await music.media_url(self.sess)) + + async def test_music_invalid(self): + music = await NeteaseMusicPlatform().play_by_id(self.sess, SONG_ID_INVALID) + self.assertIsNone(music) + + +async def assert_is_image_url(self: TestCase, url): + self.assertIsInstance(url, str) + async with aiohttp.ClientSession() as sess: + async with sess.get(url) as resp: + self.assertTrue(resp.status, 200) + self.assertTrue(resp.headers.get('Content-Type').startswith('image/')) + self.assertTrue(int(resp.headers.get('Content-Length')) > 0) + + +async def assert_is_music_url(self: TestCase, url): + self.assertIsInstance(url, str) + async with aiohttp.ClientSession() as sess: + async with sess.get(url) as resp: + self.assertTrue(resp.status, 200) + self.assertTrue(resp.headers.get('Content-Type').startswith('audio/')) + self.assertTrue(int(resp.headers.get('Content-Length')) > 0)