In [2]:
import os
from pywhispercpp.model import Model
import sqlite3
from pathlib import Path
from yt_dlp import YoutubeDL

### config

In [2]:
db_path = 'data/podcast.db'

input_files_dir = Path('input_files/')
# print(input_files_dir.resolve())
# print(list(input_files_dir.glob('*.mp3')))
file = './s10e43_tiny_benchmark.mp3'

### database

##### init database

In [29]:
# create a folder data/ if doesn't exist yet
os.makedirs('data', exist_ok=True)

# create/connect to sqlite database
def get_connection():
    return sqlite3.connect(db_path)

def init_db():
    with get_connection() as db:
        cursor = db.cursor()

        # episodes table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS episodes (
                id INTEGER PRIMARY KEY,
                title TEXT,
                date TEXT,
                url_path TEXT,
                description TEXT,
                season_number INTEGER,
                episode_number INTEGER,
                index_number INTEGER
            )
        """)

        # participants table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS participants (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL
            )
        """)

        # transcript segments table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS transcription_segments (
                id INTEGER PRIMARY KEY,
                episode_id INTEGER REFERENCES episodes(id),
                start_time REAL,
                end_time REAL,
                text TEXT,
                participant_id INTEGER REFERENCES participants(id) -- speaker for diarisation
            )
        """)

        # --- junction table for a participant in an episode
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS episodes_participants (
                episode_id INTEGER REFERENCES episodes(id),
                participant_id INTEGER REFERENCES participants(id),
                role TEXT,
                PRIMARY KEY (episode_id, participant_id)
            )
        """)

        db.commit()

# init the db
init_db()

##### check the structure of the tables

In [30]:
def inspect_table_structure(table_name):
    """Inspect and print table structure"""
    with get_connection() as db:
        cursor = db.cursor()
        cursor.execute(f'PRAGMA table_info({table_name})')
        print(f'\n{table_name}')
        for col in cursor.fetchall():
            print(col)

# Usage
tables = ['episodes', 'participants', 'transcription_segments', 'episodes_participants']
for table in tables:
    inspect_table_structure(table)


episodes
(0, 'id', 'INTEGER', 0, None, 1)
(1, 'title', 'TEXT', 0, None, 0)
(2, 'date', 'TEXT', 0, None, 0)
(3, 'url_path', 'TEXT', 0, None, 0)
(4, 'description', 'TEXT', 0, None, 0)
(5, 'season_number', 'INTEGER', 0, None, 0)
(6, 'episode_number', 'INTEGER', 0, None, 0)
(7, 'index_number', 'INTEGER', 0, None, 0)

participants
(0, 'id', 'INTEGER', 0, None, 1)
(1, 'name', 'TEXT', 1, None, 0)

transcription_segments
(0, 'id', 'INTEGER', 0, None, 1)
(1, 'episode_id', 'INTEGER', 0, None, 0)
(2, 'start_time', 'REAL', 0, None, 0)
(3, 'end_time', 'REAL', 0, None, 0)
(4, 'text', 'TEXT', 0, None, 0)
(5, 'participant_id', 'INTEGER', 0, None, 0)

episodes_participants
(0, 'episode_id', 'INTEGER', 0, None, 1)
(1, 'participant_id', 'INTEGER', 0, None, 2)
(2, 'role', 'TEXT', 0, None, 0)


### model

In [5]:
model = Model(model='large-v3', models_dir='./whisper.cpp/models')

whisper_init_from_file_with_params_no_state: loading model from '/Users/quentin/dev/podcast_audio_extractor/whisper.cpp/models/ggml-large-v3.bin'
whisper_init_with_params_no_state: use gpu    = 1
whisper_init_with_params_no_state: flash attn = 0
whisper_init_with_params_no_state: gpu_device = 0
whisper_init_with_params_no_state: dtw        = 0
whisper_init_with_params_no_state: devices    = 3
whisper_init_with_params_no_state: backends   = 3
whisper_model_load: loading model
whisper_model_load: n_vocab       = 51866
whisper_model_load: n_audio_ctx   = 1500
whisper_model_load: n_audio_state = 1280
whisper_model_load: n_audio_head  = 20
whisper_model_load: n_audio_layer = 32
whisper_model_load: n_text_ctx    = 448
whisper_model_load: n_text_state  = 1280
whisper_model_load: n_text_head   = 20
whisper_model_load: n_text_layer  = 32
whisper_model_load: n_mels        = 128
whisper_model_load: ftype         = 1
whisper_model_load: qntvr         = 0
whisper_model_load: type          = 5 (larg

### code

##### gathering files

In [11]:
def fetch_episode_data(feed_url, items):
    ydl_config = {
        'extract_flat': False,
        'playlist_items': items,
        'quiet': True,
        'skip_download': True
    }
    with YoutubeDL(ydl_config) as ydl:
        info = ydl.extract_info(feed_url)
        episodes = info.get('entries', [info]) # make up a list of items from the fetched entries
        return info.get('entries')
        return [
            {
                "title": episode.get("title"),
                "description": episode.get("description"),
                "url": episode.get("webpage_url") or episode.get("original_url"),
                "upload_date": episode.get("upload_date"),
                "playlist_index": episode.get("playlist_index"),
                "season_number": episode.get("season_number"),
                "episode_number": episode.get("episode_number"),
            }
            for episode in episodes
        ]

# usage
fetch_episode_data('https://feeds.acast.com/public/shows/floodcast', "240")



[{'id': 'tag:soundcloud,2010:tracks/285183974',
  'title': 'S02E02 - Postiche de Fouffe',
  'timestamp': 1475106896,
  'direct': True,
  'formats': [{'format_id': 'mpeg',
    'url': 'https://stitcher2.acast.com/livestitches/6f02133700771e9698f070e0909016e3.mp3?ci=-vNnk5Kl9KDBbna5_BdvrwoxF9tpx3lRh-CYzL33gODV3gkd-ZpCZQ%3D%3D&pf=rss&sv=sphinx%401.258.0&uid=54a3fce4a605415f1bfd68bdb60be7b5&Expires=1758055111741&Key-Pair-Id=K38CTQXUSD0VVB&Signature=Ov7NsMjrMeoqIz1JB0xXTrDq3lPKmyDivwF4p1irQ8-n9o8vQBvfiyjI2GZoZTJuM4MMdjPEgJSYKu~m5KgElPszKJNc4lvUI9wF58fs~VaF77muqYAHhs~mm9xTp9l4~3OuK4WywSjf8QxmNThpnzuLcqyEdRhxNhszv66v9a5Ju15oCQIV8BRuw4i42ZFuS1W-EYMH-u3Hs8bizst40o~bgYUS8zKT0urUC560YQE5Pdo79nOvhS14pJf--Dk8KcJZoSuTcv9ipOH8FtlBmcIXlY5wysrsvQz6SPe55Q8Qb5ivIRXO~fEOvYHyQkCUHuclb~JB1pWgRBzio3wgxg__',
    'ext': 'mp3',
    'vcodec': 'none',
    'protocol': 'https',
    'audio_ext': 'mp3',
    'video_ext': 'none',
    'vbr': 0,
    'abr': None,
    'tbr': None,
    'acodec': 'mp3',
    'resolution': 'aud

##### transcription

for testing whispercpp parameters

In [15]:
def transcribe(file:str):
    transcription = model.transcribe(
        file, 
        language='fr',
        temperature=0.0,
        print_progress=True,
        extract_probability=False
    )
    print(transcription)
    return transcription

# help(Model.transcribe)
?Model.transcribe

[31mSignature:[39m
Model.transcribe(
    self,
    media: Union[str, numpy.ndarray],
    n_processors: int = [38;5;28;01mNone[39;00m,
    new_segment_callback: Callable[[pywhispercpp.model.Segment], NoneType] = [38;5;28;01mNone[39;00m,
    **params,
) -> List[pywhispercpp.model.Segment]
[31mDocstring:[39m
Transcribes the media provided as input and returns list of `Segment` objects.
Accepts a media_file path (audio/video) or a raw numpy array.

:param media: Media file path or a numpy array
:param n_processors: if not None, it will run the transcription on multiple processes
                     binding to whisper.cpp/whisper_full_parallel
                     > Split the input audio in chunks and process each chunk separately using whisper_full()
:param new_segment_callback: callback function that will be called when a new segment is generated
:param params: keyword arguments for different whisper.cpp parameters, see ::: constants.PARAMS_SCHEMA
:param extract_probability: If T

##### storing data into db

In [None]:
# print la transcription dans un fichier
def transcribe_into_file(file:str):

    # directory to store the transcriptions into 
    output_directory = 'transcriptions_output'
    os.makedirs(f'./{output_directory}', exist_ok=True)

    # name of the transcription output file
    # (using the name of the file given as params)
    base_name = os.path.splitext(file)[0]
    output_file_name = f"{base_name}.txt"

    # writing transcriptions in file
    with open(f'./{output_directory}/{output_file_name}','w',encoding='utf-8') as output_file:
        for segment in transcribe(file):
            output_file.write(f'{segment.text}')
            # print(segment)

In [None]:
def transcribe_into_db(input_files_dir):
    
    # for each files in the input audio files folder, it get the transcription, 
    # and then stores it in the db
    for audio_file in input_files_dir.glob('*.mp3'):

        print(f"file being processed : {audio_file.name}")
        
        # create episode and get its id
        episode_id = create_episode(audio_file)

        # store transcription segment of the episode into db
        with get_connection() as db:
            cursor = db.cursor()

            for transcription_segment in transcribe(audio_file.name):
                start = transcription_segment.t0
                end = transcription_segment.t1
                text = transcription_segment.text.strip()

                cursor.execute("""
                    INSERT INTO transcription_segments (episode_id, start_time, end_time, text)
                    VALUES (?,?,?,?)
                """, (episode_num, start, end, text))

            db.commit()

# insert episode into db
def create_episode(episode_num):

    description, url_path, title = fetch_episode_informations(episode_num)

    episode_season = 
    episode_number = 
    index_number = 

    with get_connection() as db:
        cursor = db.cursor()
        cursor.execute("""
            INSERT INTO episodes (title, url_path, description, )
            VALUES (?,?,?)
        """, (title, url_path, description))
        
        db.commit()

    return episode_id

# gather infos from API or RSS feed
def fetch_episode_informations(episode_num) -> tuple:
    return description, url_path, title

##### future functions 

In [None]:
def clean_output():
    print('format')

def vector_store():
    print('vector')

##### usage

In [None]:
transcribe_into_db(input_files_dir)

file being processed : s10e43_tiny_benchmark.mp3


Progress:   0%
Progress:  36%
Progress:  78%


[t0=0, t1=140, text=Message publicitaire., probability=nan, t0=140, t1=1022, text=Le médicament Daflon 500 mg, indiqué pour soulager les jambes lourdes et douloureuses dues à l'insuffisance veineuse, agit en renforçant le tonus veineux et en protégeant les vaisseaux sanguins., probability=nan, t0=1022, t1=1874, text=Son efficacité a été cliniquement démontrée dans le traitement des troubles de la circulation veineuse, jambes lourdes, douleurs, impatience, en complément des mesures hygiéno-diététiques., probability=nan, t0=1874, t1=2034, text=Rendez-vous sur Daflon.fr., probability=nan, t0=2034, t1=2624, text=Daflon 500 mg, composé de fractions flavonoïques purifiées micronisées et réservées à l'adulte, est disponible en pharmacie sans ordonnance., probability=nan, t0=2624, t1=3006, text=Tout médicament peut exposer à des risques. Parlez-en à votre pharmacien et lisez attentivement la notice., probability=nan, t0=3006, t1=3230, text=Si les symptômes persistent, consultez votre médecin.,

Progress: 100%


NameError: name 'episode_num' is not defined

##### unload model from vram

In [None]:
# unload model from vram (mac only)
del model

NameError: name 'model' is not defined

### embeddings