In [13]:
from together import Together
import json
import pandas as pd
import numpy as np
import re
import os
import json
import requests
import torch
import time
import tqdm
from io import BytesIO
from concurrent.futures import ThreadPoolExecutor
from PIL import Image
import cv2
import matplotlib.pyplot as plt
from IPython.display import display
from yandex_cloud_ml_sdk import YCloudML
from huggingface_hub import hf_hub_download
from doclayout_yolo import YOLOv10
import easyocr
import base64
from typing import List, Dict, Tuple, Union
from pathlib import Path
import torchvision.ops
from multiprocessing import cpu_count
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import io

In [2]:
def merge_word_into_layout(word_bboxes: np.ndarray, yolo_seg_result: List[Dict]) -> List[Dict]:
    """
    Объединяет слова с результатами сегментации макета YOLO.
    
    Args:
        word_bboxes (np.ndarray): массив формы (N, 4) с [x1, y1, x2, y2] для слов
        yolo_seg_result (List[Dict]): список словарей с результатами сегментации YOLO
        
    Returns:
        List[Dict]: обновленный список результатов сегментации с добавленными текстовыми блоками
    """
    if not isinstance(word_bboxes, np.ndarray):
        raise TypeError(f"word_bboxes должен быть numpy array, получен {type(word_bboxes)}")
    
    if not isinstance(yolo_seg_result, list):
        raise TypeError(f"yolo_seg_result должен быть списком, получен {type(yolo_seg_result)}")
    
    if word_bboxes.size == 0:
        return yolo_seg_result
    
    if word_bboxes.ndim != 2 or word_bboxes.shape[1] != 4:
        raise ValueError(f"word_bboxes должен иметь форму (N, 4), получен {word_bboxes.shape}")
    
    new_boxes = []
    
    for item in yolo_seg_result:
        if isinstance(item, dict) and 'bounds' in item:
            new_boxes.append(item['bounds'])
    
    for x1, y1, x2, y2 in word_bboxes:
        cx = (x1 + x2) / 2.0
        cy = (y1 + y2) / 2.0
        
        # 1) проверяем, внутри ли центр слова какой-нибудь блок
        is_inside = any(
            lx1 <= cx <= lx2 and ly1 <= cy <= ly2
            for lx1, ly1, lx2, ly2 in new_boxes
        )
        if is_inside:
            continue
        
        # 2) ищем ближайшее соприкосновение сверху (блоки выше центра слова)
        tops = [
            ly2 for lx1, ly1, lx2, ly2 in new_boxes
            if lx2 >= x1 and lx1 <= x2  # перекрытие по X
            and ly2 <= cy              # блок выше центра слова
        ]
        new_top = max(tops) if tops else y1
        
        # 3) снизу (блоки ниже центра слова)
        bottoms = [
            ly1 for lx1, ly1, lx2, ly2 in new_boxes
            if lx2 >= x1 and lx1 <= x2  # перекрытие по X
            and ly1 >= cy              # блок ниже центра слова
        ]
        new_bottom = min(bottoms) if bottoms else y2
        
        # 4) слева (блоки левее центра слова)
        lefts = [
            lx2 for lx1, ly1, lx2, ly2 in new_boxes
            if ly2 >= new_top and ly1 <= new_bottom  # перекрытие по Y
            and lx2 <= cx              # блок левее центра слова
        ]
        new_left = max(lefts) if lefts else x1
        
        # 5) справа (блоки правее центра слова)
        rights = [
            lx1 for lx1, ly1, lx2, ly2 in new_boxes
            if ly2 >= new_top and ly1 <= new_bottom  # перекрытие по Y
            and lx1 >= cx              # блок правее центра слова
        ]
        new_right = min(rights) if rights else x2
        
        # Добавляем расширенный bbox
        new_boxes.append([new_left, new_top, new_right, new_bottom])
        
        block_data = {
            "bounds": [new_left, new_top, new_right, new_bottom],
            "label": "Text",
            "confidence": 1.0,
            "id": 9
        }
        
        yolo_seg_result.append(block_data)
    
    return yolo_seg_result


def download_image(image_url: str, timeout: int = 60) -> Image.Image:
    """
    Загружает изображение по URL и возвращает объект PIL Image.
    
    Args:
        image_url (str): URL изображения для загрузки
        timeout (int): таймаут для запроса в секундах
        
    Returns:
        Image.Image: объект PIL Image
    """
    if not image_url:
        raise ValueError("URL изображения не может быть пустым")
    
    if not isinstance(image_url, str):
        raise TypeError(f"URL должен быть строкой, получен {type(image_url)}")
    
    response = requests.get(image_url, timeout=timeout)
    response.raise_for_status()
    
    img_bytes_io = io.BytesIO(response.content)
    img = Image.open(img_bytes_io)
    
    img.verify()
    img_bytes_io.seek(0)
    img = Image.open(img_bytes_io)
    
    return img


def detect_word_bboxes(pil_image: Image.Image, lang: str = 'ru', use_gpu: bool = True) -> np.ndarray:
    """
    Детектирует границы слов на изображении с помощью EasyOCR.
    
    Args:
        pil_image (Image.Image): входное изображение PIL
        lang (str): язык для распознавания
        use_gpu (bool): использовать ли GPU
        
    Returns:
        np.ndarray: массив нормализованных координат границ слов
    """
    if not isinstance(pil_image, Image.Image):
        raise TypeError(f"pil_image должно быть PIL Image, получен {type(pil_image)}")
    
    image_cv = cv2.cvtColor(np.array(pil_image.convert("RGB")), cv2.COLOR_RGB2BGR)
    height, width, _ = image_cv.shape
    
    if use_gpu and not torch.cuda.is_available():
        use_gpu = False
    
    reader = easyocr.Reader([lang], gpu=use_gpu)
    horizontal_list, free_list = reader.detect(image_cv)
    
    if not horizontal_list or not horizontal_list[0]:
        return np.array([])
    
    bboxes = np.array([
        [x_min/width, y_min/height, x_max/width, y_max/height]
        for x_min, x_max, y_min, y_max in horizontal_list[0]
    ])
    
    return bboxes


def process_single_image(img: Image.Image, model: YOLOv10, imgsz: int = 1120, conf: float = 0.3, iou: float = 0.2, merge_with_ocr: bool = False) -> List[Dict]:
    """
    Обрабатывает одиночное изображение.
    
    Args:
        img (Image.Image): изображение PIL
        model (YOLOv10): модель YOLO
        imgsz (int): размер изображения
        conf (float): порог уверенности
        iou (float): порог IoU для NMS
        merge_with_ocr (bool): объединять ли с результатами OCR
        
    Returns:
        List[Dict]: список результатов детекции
    """
    if not isinstance(img, Image.Image):
        raise TypeError(f"img должно быть PIL Image, получен {type(img)}")
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    det_res = model.predict(
        img,
        imgsz=imgsz,
        conf=conf,
        device=device
    )
    
    if not det_res or len(det_res) == 0:
        return []
    
    result = det_res[0]
    
    # Проверяем, есть ли боксы
    if len(result.boxes) == 0:
        return []
    
    boxes_xyxyn = result.boxes.xyxyn
    scores = result.boxes.conf
    
    # Применяем NMS
    keep = torchvision.ops.nms(boxes_xyxyn, scores, iou)
    
    boxes = result.boxes[keep]
    names = result.names
    
    output_data = []
    
    for i in range(len(boxes)):
        x1n, y1n, x2n, y2n = map(float, boxes.xyxyn[i])
        confidence = float(boxes.conf[i])
        class_id = int(boxes.cls[i])
        label = names[class_id]

        block_data = {
            "bounds": [x1n, y1n, x2n, y2n],
            "label": label,
            "confidence": confidence,
            "id": class_id
        }

        output_data.append(block_data)
    
    if merge_with_ocr:
        try:
            word_bboxes = detect_word_bboxes(img)
            final_res = merge_word_into_layout(word_bboxes, output_data)
            return final_res
        except Exception as e:
            # В случае ошибки OCR возвращаем только результаты YOLO
            return output_data
    
    return output_data


def process_image_batch(
    image_sources: List[Union[str, Path, Image.Image]],
    model: YOLOv10,
    imgsz: int = 1120,
    conf: float = 0.3,
    iou: float = 0.2,
    merge_with_ocr: bool = False,
    max_workers: int = None,
    use_processes: bool = False
) -> Dict[str, List[Dict]]:
    """
    Параллельно обрабатывает несколько изображений.
    
    Args:
        image_sources: список источников изображений (URL, путей к файлам или PIL Images)
        model: модель YOLO
        imgsz: размер изображения для модели
        conf: порог уверенности
        iou: порог IoU для NMS
        merge_with_ocr: объединять ли с результатами OCR
        max_workers: максимальное количество потоков/процессов
        use_processes: использовать процессы вместо потоков
        
    Returns:
        Dict[str, List[Dict]]: словарь с результатами для каждого изображения
    """
    if max_workers is None:
        max_workers = min(32, cpu_count() * 4) if not use_processes else cpu_count()
    
    results = {}
    errors = {}
    
    # Функция для загрузки и обработки одного изображения
    def process_item(index: int, source: Union[str, Path, Image.Image]) -> Tuple[int, str, List[Dict]]:
        try:
            if isinstance(source, str):
                if source.startswith(('http://', 'https://')):
                    img = download_image(source)
                    source_id = source
                else:
                    img = Image.open(source)
                    source_id = str(source)
            elif isinstance(source, Path):
                img = Image.open(source)
                source_id = str(source)
            elif isinstance(source, Image.Image):
                img = source
                source_id = f"image_{index}"
            else:
                raise TypeError(f"Неподдерживаемый тип источника: {type(source)}")
            
            result = process_single_image(img, model, imgsz, conf, iou, merge_with_ocr)
            return index, source_id, result
        except Exception as e:
            return index, str(source) if hasattr(source, '__str__') else f"image_{index}", str(e)
    
    # Выбор executor'а
    Executor = ProcessPoolExecutor if use_processes else ThreadPoolExecutor
    
    with Executor(max_workers=max_workers) as executor:
        future_to_index = {
            executor.submit(process_item, i, source): i 
            for i, source in enumerate(image_sources)
        }
        
        for future in as_completed(future_to_index):
            index = future_to_index[future]
            try:
                idx, source_id, result = future.result()
                if isinstance(result, str):  # Это ошибка
                    errors[source_id] = result
                else:
                    results[source_id] = result
            except Exception as e:
                errors[f"item_{index}"] = str(e)
    
    if errors:
        results['_errors'] = errors
    
    return results


def process_images_from_directory(
    directory: Union[str, Path],
    model: YOLOv10,
    imgsz: int = 1120,
    conf: float = 0.3,
    iou: float = 0.2,
    merge_with_ocr: bool = False,
    extensions: List[str] = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff'],
    max_workers: int = None,
    use_processes: bool = False
) -> Dict[str, List[Dict]]:
    """
    Обрабатывает все изображения в директории.
    
    Args:
        directory: путь к директории
        model: модель YOLO
        imgsz: размер изображения
        conf: порог уверенности
        iou: порог IoU для NMS
        merge_with_ocr: объединять ли с результатами OCR
        extensions: поддерживаемые расширения файлов
        max_workers: максимальное количество потоков/процессов
        use_processes: использовать процессы вместо потоков
        
    Returns:
        Dict[str, List[Dict]]: результаты для каждого изображения
    """
    directory = Path(directory)
    if not directory.exists():
        raise FileNotFoundError(f"Директория не найдена: {directory}")
    
    image_files = []
    for ext in extensions:
        image_files.extend(directory.glob(f"*{ext}"))
        image_files.extend(directory.glob(f"*{ext.upper()}"))
    
    return process_image_batch(
        image_files,
        model,
        imgsz,
        conf,
        iou,
        merge_with_ocr,
        max_workers,
        use_processes
    )

In [3]:
df = pd.read_json("/home/jupyter/datasphere/datasets/newspapers/newspapers1934-39.json")
df["newspaper_name"] = df["newspaper_title"].apply(lambda x: x.split('/')[0])
url_data_dict = df.groupby('newspaper_url').agg({
    'origin_img': list,
    'newspaper_name': 'first',
    'newspaper_date': 'first'
}).rename(columns={'origin_img': 'papers_url', 'newspaper_name': 'name', 'newspaper_date': 'date'}).to_dict('index')

In [14]:
model = YOLOv10('/home/jupyter/datasphere/project/best_the_best.pt')

# Пример 1: Параллельная обработка нескольких URL
image_urls = url_data_dict['https://electro.nekrasovka.ru/books/6147190']['papers_url']

results_urls = process_image_batch(
    image_urls,
    model,
    iou = 0.1,
    merge_with_ocr = True,
    max_workers=None,
    use_processes=False  # Используем потоки для I/O операций
)


0: 1120x832 1 Caption, 1 Picture, 22 Section-headers, 29 Texts, 149.3ms
Speed: 43.4ms preprocess, 149.3ms inference, 495.6ms postprocess per image at shape (1, 3, 1120, 832)

0: 1120x832 1 Caption, 1 Picture, 13 Section-headers, 2 Tables, 37 Texts, 141.7ms
Speed: 7.7ms preprocess, 141.7ms inference, 3.5ms postprocess per image at shape (1, 3, 1120, 832)

0: 1120x832 1 Caption, 1 Picture, 25 Section-headers, 37 Texts, 115.5ms
Speed: 7.0ms preprocess, 115.5ms inference, 2.5ms postprocess per image at shape (1, 3, 1120, 832)

0: 1120x832 1 Caption, 1 Picture, 12 Section-headers, 11 Texts, 94.5ms
Speed: 9.7ms preprocess, 94.5ms inference, 1.4ms postprocess per image at shape (1, 3, 1120, 832)

0: 1120x832 2 Captions, 2 Pictures, 24 Section-headers, 3 Tables, 45 Texts, 83.0ms
Speed: 7.8ms preprocess, 83.0ms inference, 2.3ms postprocess per image at shape (1, 3, 1120, 832)

0: 1120x832 1 Footnote, 2 Pictures, 12 Section-headers, 37 Texts, 149.8ms
Speed: 7.2ms preprocess, 149.8ms inference, 

In [15]:
folder_id = os.environ['folder_id']
api_key = os.environ['api_key']

client = Together(api_key='f1c23b53adacc2b699045900b66cbb88a1f58d882e6c54556a08606b249062e7')
sdk = YCloudML(folder_id=folder_id, auth=api_key)
model_gpt = sdk.models.completions("yandexgpt", model_version="rc")


prompt = """
Ты - газетный редактор, которому на вход подается распознанный с помощью OCR текст газетной полосы.
Твоя задача:
* вернуть исходный текст с минимальными изменениями
* исправить в этом тексте опечатки
"""

prompt_summ = """
Ты - газетный редактор, которому на вход подается распознанный с помощью OCR текст газетной полосы.
Твоя задача: прочитать весь текст и суммаризировать его. Если поданный текст - стих(только проверяй аккуратно), напиши просто "poem" и больше ничего 
"""

prompt_reconstructing = "Сгруппируй блоки текста из газеты по статьям. Твоя задача - не вносить своих дополнений к тексту. Куски текста разделены таким символом '===='. Обязательно используй все блоки."

def call_api(url, data):
    headers = { "Authorization" : f"Api-Key {api_key}" }
    return requests.post(url, json=data, headers=headers).json()

def call_api_get(url, data):
    headers = { "Authorization" : f"Api-Key {api_key}" }
    return requests.get(url, headers=headers).json()

def ocr(img):
    buffer = BytesIO()
    img.save(buffer,format="JPEG")
    myimage = buffer.getvalue()

    j = {
      "mimeType": "JPEG",
      "languageCodes": ["*"],
      "model": "page",
      "content": base64.b64encode(myimage).decode('utf-8')
    }
    res = call_api("https://ocr.api.cloud.yandex.net/ocr/v1/recognizeText",j)
    return res

def clean_llama(ocr_result, prompt=prompt):
    completion = client.chat.completions.create(
      model="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free",
      messages=[{
                "role" : "system",
                "content" : prompt
            },
            {
                "role" : "user",
                "content" : ocr_result
            }
      ])
    return completion.choices[0].message.content

def load_json(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        return json.load(f)
    
def denorm_boxes(data, w=5075, h=7042):
    bounds = np.array(data)
    if (bounds <= 1).all():
        x1, y1, x2, y2 = bounds
        x1, x2 = int(x1 * w), int(x2 * w)
        y1, y2 = int(y1 * h), int(y2 * h)
    else:
        x1, y1, x2, y2 = map(int, bounds)
    return [x1, y1, x2, y2]

In [16]:
def texts_to_articles(texts, prompt_reconstructing=prompt_reconstructing):

    try:
        res = model_gpt.run(
                        messages=[
                                {
                                    # "max_tokens" : 6000
                                    "max_tokens" : 10000,
                                    "role" : "system",
                                    "text" : prompt_reconstructing
                                },
                                {
                                    "role" : "user",
                                    "max_tokens" : 10000,
                                    "text" : '===='.join(texts)}])
        res_text = res.text
        if res.usage['completion_tokens'] < (res.usage['total_tokens'] // 3):
            raise RuntimeError('needs more tokens')
    except:
        res1 = model_gpt.run(#max_tokens=6048, 
                        messages=[
                                {
                                    # "max_tokens" : 6000
                                    "max_tokens" : 10000,
                                    "role" : "system",
                                    "text" : prompt_reconstructing
                                },
                                {
                                    "role" : "user",
                                    "max_tokens" : 10000,
                                    "text" : '===='.join(texts[:len(texts)//2])}])
        res2 = model_gpt.run(#max_tokens=6048, 
                        messages=[
                                {
                                    # "max_tokens" : 6000
                                    "max_tokens" : 10000,
                                    "role" : "system",
                                    "text" : prompt_reconstructing
                                },
                                {
                                    "role" : "user",
                                    "max_tokens" : 10000,
                                    "text" : '===='.join(texts[len(texts)//2:])}])
        res_text = res1.text + res2.text
    return res_text

In [17]:
def clear_ocr_results(bounds_with_raw_texts):
    for block in bounds_with_raw_texts:
        text = block['text']
        if text:
            clear_text = model_gpt.run([
                {"role": "system", "text": prompt},
                {"role": "user", "text": text}
            ]).text
            if "Посмотрите, что нашлось в поиске" in clear_text:
                clear_text = clean_llama(text)
            block['clear_text'] = clear_text
    return bounds_with_raw_texts


def preprocess_json(data, img_url):
    bounds_with_raw_texts=[]    
    img_blocks = data[img_url]
    image = download_image(img_url)
    w, h = image.size[0], image.size[1]
    full_text = []
    for block in img_blocks:

        x1, y1, x2, y2 = denorm_boxes(block['bounds'], w=w, h=h)
        label = block['label']

        if label in ['Picture', 'Caption', 'Table']:
            continue

        roi = image.crop((x1, y1, x2, y2))

        ocr_res = ocr(roi)
        while ocr_res.get('error') is not None: 
            time.sleep(1.0)
            ocr_res = ocr(roi)

        full_text = (
        ocr_res
        .get("result", {})
        .get("text_annotation", {})
        or ocr_res
        .get("result", {})
        .get("textAnnotation", {})).get("fullText", "")

        bounds_with_raw_texts.append({
            "bounds": block['bounds'],
            "text": full_text,
            "label": label
        })
    
    bounds_with_raw_texts = [block for block in bounds_with_raw_texts if block.get('text')]
    
    bounds_with_texts = clear_ocr_results(bounds_with_raw_texts)
    
    return bounds_with_texts

In [18]:
def get_articles_from_json(json_data):
    """
    Processes a JSON file (either by path or as a loaded dictionary)
    to extract articles and related information.
    """
    if isinstance(json_data, str):
        print(f"Loading JSON from path: {json_data}")
        try:
            with open(json_data, 'r', encoding='utf-8') as f:
                loaded_json = json.load(f)
        except FileNotFoundError:
            print(f"Error: File not found at {json_data}")
            return [], [], []
        except json.JSONDecodeError:
            print(f"Error: Could not decode JSON from {json_data}")
            return [], [], []
        json_file = loaded_json
    elif isinstance(json_data, dict):
        print("Processing provided JSON dictionary.")
        json_file = json_data
    else:
        print("Error: Input must be a file path (string) or a JSON dictionary.")
        return [], [], []

    all_articles = []
    all_bounds_with_texts = []
    for page_num, img_url in enumerate(json_file.keys()):
        print(f'processing page number {page_num + 1} out of {len(json_file.keys())}')
        bounds_with_texts = preprocess_json(data=json_file, img_url=img_url)
        all_bounds_with_texts.append(bounds_with_texts)
        texts_i = [block['clear_text'] for block in bounds_with_texts]
        articles_i = texts_to_articles(texts_i)
        all_articles.append(articles_i)
    return all_articles, all_bounds_with_texts


In [None]:
results_urls = {key: results_urls[key] for key in sorted(results_urls.keys())}
all_articles, all_texts = get_articles_from_json(results_urls)

{'http://electrohelp.ebook.nekrasovka.ru/books/newspaper/103952/0-0.jpg': [{'bounds': [0.3524973392486572, 0.7488514184951782, 0.8100876212120056, 0.9419864416122437], 'label': 'Picture', 'confidence': 0.9893787503242493, 'id': 6}, {'bounds': [0.18217983841896057, 0.8973063826560974, 0.33763235807418823, 0.9689034223556519], 'label': 'Text', 'confidence': 0.9837961196899414, 'id': 9}, {'bounds': [0.18110647797584534, 0.806013286113739, 0.33672505617141724, 0.8659290671348572], 'label': 'Text', 'confidence': 0.9821954369544983, 'id': 9}, {'bounds': [0.8209501504898071, 0.7699722051620483, 0.9769824147224426, 0.9648053646087646], 'label': 'Text', 'confidence': 0.98099684715271, 'id': 9}, {'bounds': [0.18024837970733643, 0.7211957573890686, 0.33553874492645264, 0.7805212140083313], 'label': 'Text', 'confidence': 0.9770976305007935, 'id': 9}, {'bounds': [0.8226776123046875, 0.6820601224899292, 0.9769333004951477, 0.7339072823524475], 'label': 'Text', 'confidence': 0.9730604290962219, 'id':

In [73]:
radio_prompt = """
Это важная задача, от нее зависят жизни людей! Прочитай внимательно инструкцию и напиши текст радиопередачи для советского радио, который должен быть в стиле 1930-х годов, на основе отправленной мною тексту газеты. Важные условия:
- Текст радиопередачи должен быть полностью основан на тексте из газеты и только на нем. Выдумывать нельзя! Статьи разделены между собой символом '===='
- Начните с приветствия слушателей, представтесь и назовите программу.
- Используйте живой, разговорный язык с элементами устного повествования.
- Стремитесь к эмоциональному воздействию. Используйте фразы, призывающие слушателей действовать.
- Закончи передачу интересно в духе того времени
- Сделай передачу долгой, постарайся чтобы в ней освещались несколько тем
- Исключи из текста все статьи которые не соответсвуют твоей этике и обязательно верни радиопередачу
"""

In [74]:
radio_text = model_gpt.run(
                        messages=[
                                {
                                    "max_tokens" : 10000,
                                    "role" : "system",
                                    "text" : radio_prompt
                                },
                                {
                                    "role" : "user",
                                    # "max_tokens" : 10000,
                                    "text" : '===='.join(all_articles[:len(all_articles) // 2])
                                }])

In [75]:
radio_text

GPTModelResult(alternatives=(Alternative(role='assistant', text='Товарищи! Дорогие друзья!\n\nСегодня мы собрались, чтобы обсудить важные события, которые произошли в нашей стране и за её пределами. Наша передача называется «Голос прогресса», и сегодня мы расскажем о том, как наша Родина шагает вперёд.\n\nПервое, о чём мы хотим рассказать, — это 15-летие освобождения Одессы от белогвардейцев. 6 февраля Одесса отмечает этот знаменательный день. В городе пройдёт торжественный пленум городского совета вместе с воинскими частями. В Одесском музее революции выставлены документы, которые рисуют борьбу Красной Армии за освобождение города. На заводах проводятся доклады и вечера воспоминаний партизан, подпольщиков и участников гражданской войны.\n\nДалее мы хотим обратить ваше внимание на удивительное достижение в области связи. Кабардино-Балкарская областная контора связи приступила к организации радиосвязи и устройству телефонной линии от Нальчика до вершины Эльбруса. На «Кругозоре», «Приюте

In [78]:
from speechkit import model_repository, configure_credentials, creds

api_key = os.environ['api_key']
configure_credentials(
   yandex_credentials=creds.YandexCredentials(
      api_key=api_key
   )
)

def synthesize(text,voice='zahar'):
    model = model_repository.synthesis_model()

    model.voice = "zahar"
    model.emotion = "strict"
    model.speed = 0.95 * 0.85  

    result = model.synthesize(text, raw_format=False)
    return result

In [79]:
def levitan_postprocess(audio):
    audio = audio._spawn(
        audio.raw_data,
        overrides={"frame_rate": int(audio.frame_rate * 0.85)}
    ).set_frame_rate(44100)

    audio = audio.overlay(audio, gain_during_overlay=-3)

    if len(audio) > 3000:
        for _ in range(2):
            pos = random.randint(1000, len(audio)-1000)
            silence = AudioSegment.silent(duration=50)
            audio = audio[:pos] + silence + audio[pos:]
    
    return audio

In [80]:
def add_echo(audio):
    echo = audio - 18
    return audio.overlay(echo, position=200)

In [81]:
from pydub import AudioSegment, generators
import io
import random
import numpy as np
from IPython.display import Audio

def safe_audio_conversion(audio_data):
    if isinstance(audio_data, AudioSegment):
        return audio_data
    elif isinstance(audio_data, bytes):
        return AudioSegment.from_ogg(io.BytesIO(audio_data))
    else:
        raise ValueError("Unsupported audio input type")

def old_radio_effect(sound):
    try:
        noise = generators.WhiteNoise().to_audio_segment(
            duration=len(sound),
            volume=-40
        )
        
        sound = sound.overlay(noise)
        
        samples = np.array(sound.get_array_of_samples())

        samples = np.clip(samples * 1.6, -32767, 32767)

        samples = samples + 0.1 * samples**2 / 32767

        distorted = sound._spawn(samples.astype(np.int16))
        sound = sound.overlay(distorted - 4)
        
        if len(sound) > 3000:
            for i in range(3):
                pos = random.randint(1000, len(sound)-2000)
                sound = sound[:pos] + sound[pos:pos+400].apply_gain(-5) + sound[pos+400:]
        
        return sound
    except Exception as e:
        print(f"Effect error: {str(e)}")
        return sound

def levitan_voice(text, voice='zahar', max_text_length=5000):
    try:
        if len(text) > max_text_length:
            chunks = [text[i:i+max_text_length] for i in range(0, len(text), max_text_length)]
            audio_segments = []
            
            for chunk in chunks:
                chunk_audio = synthesize(chunk, voice=voice)
                audio_segments.append(safe_audio_conversion(chunk_audio))
            
            sound = sum(audio_segments[1:], audio_segments[0])
        else:
            audio_data = synthesize(text, voice=voice)
            sound = safe_audio_conversion(audio_data)
        
        chunk_size = 30000
        chunks = [sound[i:i+chunk_size] for i in range(0, len(sound), chunk_size)]
        
        processed_chunks = []
        for chunk in chunks:
            chunk = levitan_postprocess(chunk)
            processed = old_radio_effect(chunk)
            processed_chunks.append(processed)
        
        result = sum(processed_chunks[1:], processed_chunks[0])
        result = add_echo(result)
        result = result.normalize(headroom=5)
        output = io.BytesIO()
        result.export(output, format="ogg", bitrate="64k")
        
        return Audio(output.getvalue(), autoplay=False)
    
    except Exception as e:
        print(f"Critical error: {str(e)}")
        return Audio(b'', autoplay=False)

In [82]:
lev_res = levitan_voice(radio_text.text)
lev_res