Skip to content

Commit

Permalink
优化缓存文件保留规则, 仅当有匹配的缓存元数据时保留文件:
Browse files Browse the repository at this point in the history
- 启动 app 时自动删除在线播放缓存, close #150
- 自动删除旧版缓存
- 修复删除缓存有时候会卡很久的问题, close #231
  • Loading branch information
Him188 committed Apr 24, 2024
1 parent f6e12e3 commit 74bad5b
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 114 deletions.
134 changes: 97 additions & 37 deletions app/shared/data/common/data/media/TorrentMediaCacheEngine.kt
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
package me.him188.ani.app.data.media

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.withContext
import me.him188.ani.app.data.media.resolver.TorrentVideoSourceResolver
import me.him188.ani.app.torrent.FilePriority
import me.him188.ani.app.torrent.TorrentDownloadSession
import me.him188.ani.app.torrent.TorrentDownloader
import me.him188.ani.app.torrent.TorrentFileEntry
import me.him188.ani.app.torrent.TorrentFileHandle
import me.him188.ani.app.torrent.model.EncodedTorrentData
import me.him188.ani.datasources.api.CachedMedia
Expand All @@ -28,11 +34,16 @@ import me.him188.ani.datasources.core.cache.MediaStats
import me.him188.ani.utils.coroutines.SuspendLazy
import me.him188.ani.utils.logging.info
import me.him188.ani.utils.logging.logger
import me.him188.ani.utils.logging.warn
import java.nio.file.Paths
import kotlin.coroutines.CoroutineContext
import kotlin.io.path.deleteIfExists
import kotlin.io.path.exists

private const val EXTRA_TORRENT_DATA = "torrentData"
private const val EXTRA_TORRENT_CACHE_DIR = "torrentCacheDir" // 种子的缓存目录, 注意, 一个 MediaCache 可能只对应该种子资源的其中一个文件
//private const val EXTRA_TORRENT_CACHE_FILE =
// "torrentCacheFile" // MediaCache 所对应的视频文件. 该文件一定是 [EXTRA_TORRENT_CACHE_DIR] 目录中的文件 (的其中一个)

class TorrentMediaCacheEngine(
private val mediaSourceId: String,
Expand All @@ -46,15 +57,34 @@ class TorrentMediaCacheEngine(
getTorrentDownloader()
}

class LazyFileHandle(
val scope: CoroutineScope,
val state: SharedFlow<State?>,
) {
val handle = state.map { it?.handle }
val entry = state.map { it?.entry }

class State(
val session: TorrentDownloadSession,
val entry: TorrentFileEntry?,
val handle: TorrentFileHandle?,
)
}

private inner class TorrentMediaCache(
override val origin: Media,
override val metadata: MediaCacheMetadata,
private val file: Flow<TorrentFileHandle>,
/**
* Required:
* @see EXTRA_TORRENT_CACHE_DIR
* @see EXTRA_TORRENT_DATA
*/
override val metadata: MediaCacheMetadata, // 注意, 我们不能写 check 检查这些属性, 因为可能会有旧版本的数据
val lazyFileHandle: LazyFileHandle
) : MediaCache {
override suspend fun getCachedMedia(): CachedMedia {
val file = file.first()
val finished = file.entry.stats.isFinished.first()
if (finished) {
val file = lazyFileHandle.handle.first()
val finished = file?.entry?.stats?.isFinished?.first()
if (finished == true) {
val filePath = file.entry.resolveFile()
if (!filePath.exists()) {
error("TorrentFileHandle has finished but file does not exist: $filePath")
Expand All @@ -76,46 +106,52 @@ class TorrentMediaCacheEngine(
}
}

private val entry get() = file.map { it.entry }
override fun isValid(): Boolean {
return metadata.extra[EXTRA_TORRENT_CACHE_DIR]?.let {
Paths.get(it).exists()
} ?: false
}

private val entry get() = lazyFileHandle.entry

override val downloadSpeed: Flow<FileSize>
get() = entry.flatMapLatest { session ->
session.stats.downloadRate.map {
session?.stats?.downloadRate?.map {
it?.bytes ?: FileSize.Unspecified
}
} ?: flowOf(FileSize.Unspecified)
}

override val uploadSpeed: Flow<FileSize>
get() = entry.flatMapLatest { session ->
session.stats.uploadRate.map {
session?.stats?.uploadRate?.map {
it?.bytes ?: FileSize.Unspecified
}
} ?: flowOf(FileSize.Unspecified)
}

override val progress: Flow<Float>
get() = entry.flatMapLatest { it.stats.progress }
get() = entry.filterNotNull().flatMapLatest { it.stats.progress }

override val finished: Flow<Boolean>
get() = entry.flatMapLatest { it.stats.isFinished }
get() = entry.filterNotNull().flatMapLatest { it.stats.isFinished }

override val totalSize: Flow<FileSize>
get() = entry.flatMapLatest { session ->
session.stats.totalBytes.map { it.bytes }
get() = entry.flatMapLatest { entry ->
entry?.stats?.totalBytes?.map { it.bytes } ?: flowOf(0.bytes)
}

@Volatile
private var deleted = false

override suspend fun pause() {
if (deleted) return
file.first().pause()
lazyFileHandle.handle.first()?.pause()
}

override suspend fun resume() {
if (deleted) return
val file = file.first()
val file = lazyFileHandle.handle.first()
logger.info { "Resuming file: $file" }
file.resume(FilePriority.NORMAL)
file?.resume(FilePriority.NORMAL)
}

override suspend fun delete() {
Expand All @@ -124,7 +160,7 @@ class TorrentMediaCacheEngine(
if (deleted) return
deleted = true
}
val handle = file.first()
val handle = lazyFileHandle.handle.first() ?: return // did not even selected a file
val file = handle.entry.resolveFile()
handle.close()
if (file.exists()) {
Expand All @@ -133,6 +169,7 @@ class TorrentMediaCacheEngine(
} else {
logger.info { "Torrent cache does not exist, ignoring: $file" }
}
lazyFileHandle.scope.cancel()
}
}

Expand Down Expand Up @@ -166,32 +203,34 @@ class TorrentMediaCacheEngine(
return TorrentMediaCache(
origin = origin,
metadata = metadata,
file = getFileEntryFlow(EncodedTorrentData(data), metadata, parentContext),
lazyFileHandle = getLazyFileHandle(EncodedTorrentData(data), metadata, parentContext)
)
}

private fun getFileEntryFlow(
private fun getLazyFileHandle(
encoded: EncodedTorrentData,
metadata: MediaCacheMetadata,
parentContext: CoroutineContext
): SharedFlow<TorrentFileHandle> {
val sessionFlow = flow {
emit(downloader.get().startDownload(encoded, parentContext))
}.mapNotNull { session ->
TorrentVideoSourceResolver.selectVideoFileEntry(
): LazyFileHandle {
val scope = CoroutineScope(parentContext + Job(parentContext[Job]))

// lazy
val state = flow {
val session = downloader.get().startDownload(encoded, parentContext)

val selectedFile = TorrentVideoSourceResolver.selectVideoFileEntry(
session.getFiles(),
listOf(metadata.episodeName),
metadata.episodeSort,
)?.createHandle() ?: kotlin.run {
)

val handle = selectedFile?.createHandle()
if (handle == null) {
session.closeIfNotInUse()
null
}
}.shareIn(
CoroutineScope(parentContext + Job(parentContext[Job])),
started = SharingStarted.Lazily,
replay = 1,
)
return sessionFlow
emit(LazyFileHandle.State(session, selectedFile, handle))
}
return LazyFileHandle(scope, state.shareIn(scope, SharingStarted.WhileSubscribed(), replay = 1))
}

@OptIn(ExperimentalStdlibApi::class)
Expand All @@ -200,14 +239,35 @@ class TorrentMediaCacheEngine(
request: MediaCacheMetadata,
parentContext: CoroutineContext
): MediaCache {
val data = downloader.get().fetchTorrent(origin.download.uri)
val downloader = downloader.get()
val data = downloader.fetchTorrent(origin.download.uri)
val metadata = request.withExtra(
mapOf(EXTRA_TORRENT_DATA to data.data.toHexString())
mapOf(
EXTRA_TORRENT_DATA to data.data.toHexString(),
EXTRA_TORRENT_CACHE_DIR to downloader.getSaveDir(data).absolutePath,
)
)

return TorrentMediaCache(
origin = origin,
metadata = metadata,
file = getFileEntryFlow(data, metadata, parentContext),
lazyFileHandle = getLazyFileHandle(data, metadata, parentContext),
)
}

override suspend fun deleteUnusedCaches(all: List<MediaCache>) {
val allowed = all.mapNotNull { it.metadata.extra[EXTRA_TORRENT_CACHE_DIR] }

val downloader = downloader.get()
withContext(Dispatchers.IO) {
val saves = downloader.listSaves()
for (save in saves) {
if (save.absolutePath !in allowed) {
val totalLength = save.walk().sumOf { it.length() }
logger.warn { "本地种子缓存文件未找到匹配的 MediaCache, 已释放 ${totalLength.bytes}: ${save.absolutePath}" }
save.deleteRecursively()
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ open class TestMediaCache(
) : MediaCache {
override val origin: Media get() = media.origin
override suspend fun getCachedMedia(): CachedMedia = media
override fun isValid(): Boolean = true

override val downloadSpeed: Flow<FileSize> = MutableStateFlow(1.bytes)
override val uploadSpeed: Flow<FileSize> = MutableStateFlow(1.bytes)
override val finished: Flow<Boolean> by lazy { progress.map { it == 1f } }
Expand Down
1 change: 1 addition & 0 deletions app/shared/pages/cache-manage/common/CacheItem.kt
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ fun CacheItemView(
AlertDialog(
onDismissRequest = { showDeleteDialog = false },
text = { Text("确认删除缓存吗? 该操作不可撤销") },
// for Delete, see me.him188.ani.app.pages.cache.manage.MediaCacheStorageState.delete
confirmButton = { Button({ onDelete(item) }) { Text("删除") } },
dismissButton = { TextButton({ showDeleteDialog = false }) { Text("取消") } }
)
Expand Down
18 changes: 11 additions & 7 deletions data-sources/core/src/cache/DirectoryMediaCacheStorage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private const val METADATA_FILE_EXTENSION = "metadata"
*/
class DirectoryMediaCacheStorage(
override val mediaSourceId: String,
private val dir: Path,
private val metadataDir: Path,
private val engine: MediaCacheEngine,
parentCoroutineContext: CoroutineContext = EmptyCoroutineContext,
) : MediaCacheStorage {
Expand All @@ -75,12 +75,13 @@ class DirectoryMediaCacheStorage(
)

init {
if (!dir.exists()) {
dir.createDirectories()
if (!metadataDir.exists()) {
metadataDir.createDirectories()
}

scope.launch {
dir.useDirectoryEntries { files ->
metadataDir.useDirectoryEntries { files ->
val allRecovered = mutableListOf<MediaCache>()
files.forEach { file ->
if (file.extension != METADATA_FILE_EXTENSION) return@forEach

Expand All @@ -97,6 +98,7 @@ class DirectoryMediaCacheStorage(
lock.withLock {
listFlow.value += it
}
allRecovered.add(it)
}
logger.info { "Cache restored: ${save.origin.mediaId}, result=${cache}" }
if (cache != null) {
Expand All @@ -113,7 +115,7 @@ class DirectoryMediaCacheStorage(
"Metadata file name mismatch, renaming: " +
"${file.name} -> $newSaveName"
}
file.moveTo(dir.resolve(newSaveName))
file.moveTo(metadataDir.resolve(newSaveName))
}
}

Expand All @@ -122,6 +124,8 @@ class DirectoryMediaCacheStorage(
logger.error(e) { "Failed to restore cache for ${save.origin.mediaId}" }
}
}

engine.deleteUnusedCaches(allRecovered)
}
}
}
Expand Down Expand Up @@ -166,7 +170,7 @@ class DirectoryMediaCacheStorage(
scope.coroutineContext
)
withContext(Dispatchers.IO) {
dir.resolve(getSaveFilename(cache)).writeText(
metadataDir.resolve(getSaveFilename(cache)).writeText(
json.encodeToString(
MediaCacheSave.serializer(),
MediaCacheSave(media, cache.metadata)
Expand All @@ -192,7 +196,7 @@ class DirectoryMediaCacheStorage(
lock.withLock {
cache.delete()
withContext(Dispatchers.IO) {
if (!dir.resolve(getSaveFilename(cache)).deleteIfExists()) {
if (!metadataDir.resolve(getSaveFilename(cache)).deleteIfExists()) {
logger.error { "Attempting to delete media cache '${cache.cacheId}' but its corresponding metadata file does not exist" }
}
}
Expand Down
5 changes: 5 additions & 0 deletions data-sources/core/src/cache/MediaCacheEngine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,9 @@ interface MediaCacheEngine {
request: MediaCacheMetadata,
parentContext: CoroutineContext
): MediaCache

/**
* 在本地缓存中删除所有未在 [all] 中找到对应 [MediaCache] 的文件.
*/
suspend fun deleteUnusedCaches(all: List<MediaCache>)
}
21 changes: 18 additions & 3 deletions data-sources/core/src/cache/MediaCacheStorage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,23 @@ interface MediaCacheStorage : AutoCloseable {
* A media cached in the storage.
*/
interface MediaCache {
/**
* 唯一缓存 id
*/
val cacheId: String
get() = (origin.mediaId.hashCode() * 31
+ metadata.subjectId.hashCode() * 31
+ metadata.episodeId.hashCode()).absoluteValue.toString()
get() {
val hash = (origin.mediaId.hashCode() * 31
+ metadata.subjectId.hashCode() * 31
+ metadata.episodeId.hashCode()).absoluteValue.toString()
val subjectName = metadata.subjectNames.firstOrNull() ?: metadata.subjectId
if (subjectName != null) {
fun removeSpecials(value: String): String {
return value.replace(Regex("""[-\\|/.,;'\[\]{}()=_ ~!@#$%^&*]"""), "")
}
return "${removeSpecials(subjectName).take(8)}-$hash"
}
return hash
}

/**
* Original media that is being cached.
Expand All @@ -95,6 +108,8 @@ interface MediaCache {
*/
suspend fun getCachedMedia(): CachedMedia

fun isValid(): Boolean

val metadata: MediaCacheMetadata

/**
Expand Down
Loading

0 comments on commit 74bad5b

Please sign in to comment.