In [1]:
# Import relevant libraries
import vlc
from pathlib import Path, WindowsPath
import pandas as pd
import random
from collections import deque
from tinytag import TinyTag

### Test playing a video

In [2]:
# Define parent directory for shows
TV_PATH = Path("./data/TV Shows/")

In [3]:
# Get episode 1 of show
test_vid = next(next(TV_PATH.glob("*")).glob("*"))


StopIteration: 

In [None]:
# Test playing program with VLC
media = vlc.MediaPlayer(test_vid)
media.play()

0

In [None]:
# Stop when cell is run
media.stop()

### Generate Playlist File

In [None]:
def new_playlist_scheme(file_name : str):
    # Grab folder name for each video
    show_folders = [folder for folder in TV_PATH.iterdir()]

    # Create and populate dataframe
    show_folder_df = pd.DataFrame(columns=["show_path", "frequency"])
    show_folder_df["show_path"] = show_folders
    show_folder_df["frequency"] = show_folder_df["frequency"].fillna(1)

    # Make relative to TV_PATH and remove playlist row
    show_folder_df["show_path"] = show_folder_df["show_path"].apply(lambda x: x.relative_to(TV_PATH))
    show_folder_df = show_folder_df[show_folder_df["show_path"] != WindowsPath('.scheme')]

    # Create playlist file
    show_folder_df.to_csv(TV_PATH.as_posix() + "/.scheme/" + file_name + ".csv")

In [None]:
new_playlist_scheme("test")

### Test Reading Playlist File and Creating Random Subset

In [4]:
def load_playlist_scheme(file_name : str) -> pd.DataFrame:
    """
    Loads a playlist scheme from file. If none found, generates a new one
    """
    try:
        scheme = pd.read_csv(TV_PATH.as_posix() + "/.scheme/" + file_name + ".csv", index_col=0)
    except FileNotFoundError:
        scheme = new_playlist_scheme(file_name)

    return scheme

In [5]:
playlist_scheme = load_playlist_scheme("scheme")
playlist_scheme

Unnamed: 0,show_path,frequency
2,4543,0
3,Adventure Time (2010) Season 1-10 S01-S10 + Ex...,1
4,Hercules - The Legendary Journeys (1994)(tvdb-72),0
5,Star Wards - The Clone Wars (2008),0


In [6]:
# For details on sorting algorithm, see: https://nedbatchelder.com/blog/200712/human_sorting.html

import re

def tryint(string : str) -> str:
    """
    Return an int if possible, or `s` unchanged.
    """
    try:
        return int(string)
    except ValueError:
        return string

def alphanum_key(string : str) -> list:
    """
    Turn a string into a list of string and number chunks.

    >>> alphanum_key("z23a")
    ["z", 23, "a"]

    """
    string = string.as_posix()
    return [ tryint(c) for c in re.split('([0-9]+)', string) ]

def human_sort(path_list : list) -> None:
    """
    Sort a list in the way that humans expect.
    """
    path_list.sort(key=alphanum_key)

In [514]:
def get_first_episode(search_path : Path) -> Path:
    """
    Returns the first episode of a show. If show is arranged into seasons, searches recursively.
    :param search_path: Path to initiate search for episode
    """
    
    sorted_contents: list = [path for path in search_path.glob("*[!.txt]")]
    human_sort(sorted_contents)
    first_episode: Path = sorted_contents[0]

    # If the first value of the returned content is a directory, search in that directory
    if first_episode.is_dir():
        first_episode = get_first_episode(first_episode)

    return first_episode
        

In [515]:
def get_current_episode(show_path : Path) -> Path:
    """
    Uses the .eps file to search and return the first episode. If .eps file does not exist, find the first episode
    """
    eps_file_path : Path = show_path.joinpath(".eps.txt")

    # If .eps file does not exist, populate with first episode of show and return first episode
    if not eps_file_path.exists() or not eps_file_path.stat().st_size:
        first_episode : Path = get_first_episode(show_path)
        with open(eps_file_path, "w") as file:
            file.write(first_episode.as_posix())
        return first_episode

    # Otherwise, get current episode from .eps file
    current_episode: Path = show_path.joinpath(show_path.joinpath(".eps.txt").read_text())

    return current_episode

In [516]:
def find_next_path(search_item: Path, search_path: Path) -> Path:
    '''
    Searches for the next episode or season. If the final episode or season, returns the first one
    '''
    # Search for next episode of show in folder
    sorted_paths : list[Path] = [path for path in search_path.glob("*[!.txt]")]
    human_sort(sorted_paths)
    
    try:
        path_number : int = sorted_paths.index(search_item)
    except ValueError:
        return None
    
    # Grab the next episode if it is not the last episode
    try:
        next_path: Path = sorted_paths[path_number + 1]

    except IndexError:
        next_path : Path = sorted_paths[0]
    
    return next_path

In [517]:
def update_next_episode(show_path: Path) -> Path:
    '''
    Returns the current episode for the show. Additionally, updates the .eps file marker for the next episode. 
    If it has reached the final episode, resets to previous episode
    '''
    import os

    current_episode : Path = get_current_episode(show_path)
    next_episode : Path
    next_season : Path
    
    # If last episode and folder above is the TV_PATH, return to start of series
    next_episode = find_next_path(current_episode, current_episode.parent)
    
    # If no next episode was found, return the current episode
    if not next_episode:
        return current_episode
    
    write_string = next_episode.stem + next_episode.suffix
    
    # If folder above directory is not TV_PATH, we assume this directory structure is organized into series
    # Repeat search in next season
    if current_episode.parent.parent != TV_PATH:
        if next_episode == get_first_episode(current_episode.parent):
            next_season = find_next_path(current_episode.parent, show_path)

            # If no next seaon was found, return the current episode
            if not next_season:
                return current_episode
            
            # If we cannot find the next epsiode in the subsequent season folder, do not change .eps
            if os.listdir(next_season):
                next_episode = get_first_episode(next_season)

            else:
                return current_episode
        
        write_string = next_episode.parts[-2] + '/' + next_episode.parts[-1]
        
    # Overwrite .eps file with next episode
    with open(show_path.joinpath(".eps.txt"), "w") as file:
        file.write(write_string)

    return next_episode

### Add to, dequeue, and load previous playlist

In [518]:
def add_to_playlist(video_queue : deque, video : Path, duration : int) -> None:
    '''
    Manages to simultaneous updates of video_queues and playlist backlog file. Adds one video to the queue.
    :param video_queue: Current playlist queue 
    :param video: Video to add
    :param duration: Duration of video
    '''
    # Add to video queue
    video_queue.append(video)

    # Add to playlist backlog file
    with open(TV_PATH.joinpath(".playlist.csv"), "a") as file:
        file.write(str(len(video_queue)) + "," + video.as_posix() + "," + str(duration) + "\n")

In [519]:
def dequeue_playlist(video_queue : deque) -> Path:
    '''
    Manages simultaneous updates of video_queues and playlist backlog file. Removes one video from the queue.
    :param video_queue: Current playlist queue 
    '''
    # Remove from playlist text file, preserving all rows after
    try:
        with open(TV_PATH.joinpath(".playlist.csv"), "r") as file:
            data = file.readlines()
        with open(TV_PATH.joinpath(".playlist.csv"), "w") as file:
            file.write(",video,duration\n" + "".join(data[2:]))
    except (FileNotFoundError):
        print("Playlist does not exist")

    # Pop and return front of queue. If queue is empty, return None
    try:
        video = video_queue.popleft()
    except IndexError:
        video = None

    return video

In [520]:
def load_playlist() -> pd.DataFrame:
    """
    Load previous playlist from file and load into dataframe. This will load from TV_PATH/.playlist.csv.
    """ 
    # Try to load the playlist from file if it exists and is not empty. Otherwise, return the empty playlist and save a blank csv with headers
    try:
        playlist = pd.read_csv(TV_PATH.joinpath(".playlist.csv").as_posix(), index_col=0).reset_index(drop=True)
    except (pd.errors.EmptyDataError, FileNotFoundError):
        playlist = pd.DataFrame(columns=["video", "duration"])
        playlist.to_csv(TV_PATH.joinpath(".playlist.csv").as_posix())
        return playlist

    # Convert videos to path
    playlist["video"] = playlist["video"].apply(lambda x: WindowsPath(x))

    return playlist

In [521]:
def generate_playlist(scheme_name: str, max_length: int) -> deque:
    """
    Generates a playlist, adding  up to a maximum number of minutes
    """
    
    # Define variables
    queue_length_mins: int = 0
    video_queue: deque = deque()
    selected_show: WindowsPath
    next_episode: WindowsPath
    
    # If a playlist already exists from a previous session, load that from file. Otherwise, create a blank version
    playlist : pd.DataFrame = load_playlist()

    # Populate playlist object with videos and set starting queue time
    video_queue = deque(playlist["video"].to_list())
    queue_length_mins += playlist["duration"].sum()

    # Get requested playlist scheme 
    playlist_scheme = load_playlist_scheme(scheme_name)

    # Generate random list of videos
    while queue_length_mins < max_length:
        # Select show from list of shows and user generated frequencies
        selected_show = random.choices(playlist_scheme["show_path"].to_list(), weights=playlist_scheme["frequency"].to_list())

        # Use .eps text file to determine next episode to play
        show_path = TV_PATH.joinpath(selected_show[0])
        next_episode = update_next_episode(show_path)
        
        # Get duration of video and append to total duration
        try:
            tag = TinyTag.get(next_episode.as_posix())
            queue_length_mins += tag.duration

            # If data not available on video length, add 10 mins to ensure we do not infinite loop
            if not tag.duration:
                queue_length_mins += 10

        except FileNotFoundError:
            continue
        
        # Add path to video queue
        add_to_playlist(video_queue, video=next_episode, duration=tag.duration)

    return video_queue

In [549]:
# Set constraints
MAX_LENGTH_MINS: int = 200

# Gemerate playlist
queue = generate_playlist("scheme", MAX_LENGTH_MINS)
queue

deque([WindowsPath('data/TV Shows/Hercules - The Legendary Journeys (1994)(tvdb-72)/Hercules 2.mp4'),
       WindowsPath('data/TV Shows/4543/file_example_MP4_480_1_5MG.mp4'),
       WindowsPath('data/TV Shows/Hercules - The Legendary Journeys (1994)(tvdb-72)/Hercules 4.mp4'),
       WindowsPath('data/TV Shows/Adventure Time (2010) Season 1-10 S01-S10 + Extras/Season 1/Adventure Time Episode 2.mp4'),
       WindowsPath('data/TV Shows/Star Wards - The Clone Wars (2008)/Star wars 2.mp4'),
       WindowsPath('data/TV Shows/Adventure Time (2010) Season 1-10 S01-S10 + Extras/Season 2/Adventure time 2x1.mp4'),
       WindowsPath('data/TV Shows/Adventure Time (2010) Season 1-10 S01-S10 + Extras/Season 2/Adventure time 2x1.mp4')])

In [550]:
# Dequeue and find first episode
get_first_episode(dequeue_playlist(queue).parent)

WindowsPath('data/TV Shows/Hercules - The Legendary Journeys (1994)(tvdb-72)/Hercules 1.mp4')

### Putting it together