<a href="https://colab.research.google.com/github/orutkina/-./blob/main/%D0%94%D0%975_dbscan.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Домашнее задание 5.2

Структура проекта:


In [None]:
news_parser/
│
├── parser/
│   ├── __init__.py
│   ├── base_parser.py
│   ├── ria_parser.py
│   ├── lenta_parser.py
│   ├── gazeta_parser.py
│   └── kommersant_parser.py
│
├── database/
│   ├── __init__.py
│   └── db_manager.py
│
├── utils/
│   ├── __init__.py
│   ├── text_cleaner.py
│   ├── rate_limiter.py
│   └── user_agents.py
│
├── config.py
├── main.py
├── requirements.txt
└── README.md

1. Файл requirements.txt:

In [None]:
requests>=2.28.0
beautifulsoup4>=4.11.0
lxml>=4.9.0
sqlite3
uuid
python-dateutil>=2.8.0
aiohttp>=3.8.0
asyncio
pandas>=1.5.0

2. Файл config.py:

In [None]:
import os
from datetime import datetime

# Настройки базы данных
DB_NAME = "news_articles.db"
DB_PATH = os.path.join(os.path.dirname(__file__), DB_NAME)

# Настройки парсинга
MAX_REQUESTS_PER_SECOND = 2
REQUEST_TIMEOUT = 30
MAX_RETRIES = 3

# Список сайтов для парсинга
NEWS_SITES = [
    {
        "name": "ria",
        "base_url": "https://ria.ru",
        "rss_url": "https://ria.ru/export/rss2/archive/index.xml",
        "parser_class": "RiaParser"
    },
    {
        "name": "lenta",
        "base_url": "https://lenta.ru",
        "rss_url": "https://lenta.ru/rss",
        "parser_class": "LentaParser"
    },
    {
        "name": "gazeta",
        "base_url": "https://www.gazeta.ru",
        "rss_url": "https://www.gazeta.ru/export/rss/lenta.xml",
        "parser_class": "GazetaParser"
    },
    {
        "name": "kommersant",
        "base_url": "https://www.kommersant.ru",
        "rss_url": "https://www.kommersant.ru/RSS/news.xml",
        "parser_class": "KommersantParser"
    },
    {
        "name": "tass",
        "base_url": "https://tass.ru",
        "rss_url": "https://tass.ru/rss/v2.xml",
        "parser_class": "TassParser"
    },
    {
        "name": "rbc",
        "base_url": "https://www.rbc.ru",
        "rss_url": "https://rssexport.rbc.ru/rbcnews/news/30/full.rss",
        "parser_class": "RbcParser"
    }
]

# Минимальное количество статей
MIN_ARTICLES_COUNT = 5000

# Настройки очистки текста
MIN_ARTICLE_LENGTH = 100  # минимальная длина статьи в символах

3. Файл database/db_manager.py:

In [None]:
import sqlite3
import uuid
from datetime import datetime
from typing import Optional, Dict, Any
import logging

logger = logging.getLogger(__name__)

class DatabaseManager:
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.create_tables()

    def create_tables(self):
        """Создание таблиц в базе данных"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()

            # Таблица статей
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS articles (
                    guid TEXT PRIMARY KEY,
                    title TEXT NOT NULL,
                    description TEXT NOT NULL,
                    url TEXT UNIQUE NOT NULL,
                    published_at TEXT,
                    comments_count INTEGER DEFAULT 0,
                    created_at_utc TEXT NOT NULL,
                    rating REAL,
                    source TEXT NOT NULL,
                    word_count INTEGER DEFAULT 0,
                    category TEXT
                )
            ''')

            # Индексы для оптимизации запросов
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_url ON articles(url)')
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_source ON articles(source)')
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_published ON articles(published_at)')

            conn.commit()

    def insert_article(self, article_data: Dict[str, Any]) -> bool:
        """
        Вставка статьи в базу данных

        Args:
            article_data: Словарь с данными статьи

        Returns:
            bool: Успешность операции
        """
        try:
            # Генерация GUID если не предоставлен
            if 'guid' not in article_data:
                article_data['guid'] = str(uuid.uuid4())

            # Добавление времени создания
            if 'created_at_utc' not in article_data:
                article_data['created_at_utc'] = datetime.utcnow().isoformat()

            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()

                cursor.execute('''
                    INSERT OR IGNORE INTO articles
                    (guid, title, description, url, published_at, comments_count,
                     created_at_utc, rating, source, word_count, category)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                ''', (
                    article_data['guid'],
                    article_data['title'],
                    article_data['description'],
                    article_data['url'],
                    article_data.get('published_at'),
                    article_data.get('comments_count', 0),
                    article_data['created_at_utc'],
                    article_data.get('rating'),
                    article_data.get('source', 'unknown'),
                    article_data.get('word_count', 0),
                    article_data.get('category')
                ))

                conn.commit()
                return cursor.rowcount > 0

        except sqlite3.Error as e:
            logger.error(f"Ошибка при вставке статьи: {e}")
            return False

    def get_article_count(self) -> int:
        """Получение количества статей в базе"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute('SELECT COUNT(*) FROM articles')
            return cursor.fetchone()[0]

    def get_articles_by_source(self, source: str, limit: int = 100):
        """Получение статей по источнику"""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.cursor()
            cursor.execute(
                'SELECT * FROM articles WHERE source = ? LIMIT ?',
                (source, limit)
            )
            return [dict(row) for row in cursor.fetchall()]

    def close(self):
        """Закрытие соединения с базой данных"""
        pass  # SQLite автоматически закрывает соединение

4. Файл utils/rate_limiter.py:

In [None]:
import asyncio
import time
from typing import List
import random
import logging

logger = logging.getLogger(__name__)

class RateLimiter:
    def __init__(self, max_requests_per_second: int = 2):
        self.max_requests_per_second = max_requests_per_second
        self.min_delay = 1.0 / max_requests_per_second
        self.last_request_time = 0

    async def wait(self):
        """Ожидание перед следующим запросом"""
        current_time = time.time()
        time_since_last = current_time - self.last_request_time

        if time_since_last < self.min_delay:
            wait_time = self.min_delay - time_since_last
            # Добавляем небольшую случайную задержку
            wait_time += random.uniform(0, 0.5)
            await asyncio.sleep(wait_time)

        self.last_request_time = time.time()

    def sync_wait(self):
        """Синхронная версия ожидания"""
        current_time = time.time()
        time_since_last = current_time - self.last_request_time

        if time_since_last < self.min_delay:
            wait_time = self.min_delay - time_since_last
            wait_time += random.uniform(0, 0.5)
            time.sleep(wait_time)

        self.last_request_time = time.time()

class RequestRetry:
    def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
        self.max_retries = max_retries
        self.base_delay = base_delay

    async def execute(self, func, *args, **kwargs):
        """Выполнение функции с повторными попытками"""
        for attempt in range(self.max_retries):
            try:
                return await func(*args, **kwargs)
            except Exception as e:
                if attempt == self.max_retries - 1:
                    raise e

                delay = self.base_delay * (2 ** attempt)  # Экспоненциальная задержка
                delay += random.uniform(0, 0.5)  # Случайное добавление
                logger.warning(f"Попытка {attempt + 1} не удалась. Повтор через {delay:.2f} сек. Ошибка: {e}")
                await asyncio.sleep(delay)

    def sync_execute(self, func, *args, **kwargs):
        """Синхронная версия с повторными попытками"""
        for attempt in range(self.max_retries):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                if attempt == self.max_retries - 1:
                    raise e

                delay = self.base_delay * (2 ** attempt)
                delay += random.uniform(0, 0.5)
                logger.warning(f"Попытка {attempt + 1} не удалась. Повтор через {delay:.2f} сек. Ошибка: {e}")
                time.sleep(delay)

5. Файл utils/user_agents.py:

In [None]:
USER_AGENTS = [
    # Chrome
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
    'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
    'Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',

    # Firefox
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:121.0) Gecko/20100101 Firefox/121.0',

    # Safari
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15',

    # Edge
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0',

    # Opera
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 OPR/106.0.0.0',

    # Mobile
    'Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1',
    'Mozilla/5.0 (Linux; Android 14; SM-S901B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.6099.210 Mobile Safari/537.36',
]

def get_random_user_agent():
    """Получение случайного User-Agent"""
    import random
    return random.choice(USER_AGENTS)

6. Файл utils/text_cleaner.py:

In [None]:
import re
from bs4 import BeautifulSoup
from typing import Optional

class TextCleaner:
    @staticmethod
    def clean_html(html_content: str) -> str:
        """
        Очистка HTML от тегов и медиа-контента

        Args:
            html_content: Исходный HTML

        Returns:
            Очищенный текст
        """
        if not html_content:
            return ""

        soup = BeautifulSoup(html_content, 'lxml')

        # Удаление ненужных элементов
        for element in soup(['script', 'style', 'iframe', 'object', 'embed',
                            'video', 'audio', 'figure', 'img', 'form', 'nav',
                            'header', 'footer', 'aside']):
            element.decompose()

        # Удаление пустых тегов
        for tag in soup.find_all():
            if len(tag.get_text(strip=True)) == 0:
                tag.decompose()

        # Получение текста
        text = soup.get_text(separator='\n', strip=True)

        # Очистка лишних пробелов и переносов строк
        lines = []
        for line in text.split('\n'):
            line = line.strip()
            if line:  # Пропускаем пустые строки
                lines.append(line)

        # Объединение строк
        cleaned_text = '\n'.join(lines)

        # Удаление повторяющихся переносов строк
        cleaned_text = re.sub(r'\n{3,}', '\n\n', cleaned_text)

        return cleaned_text

    @staticmethod
    def count_words(text: str) -> int:
        """Подсчет слов в тексте"""
        if not text:
            return 0
        words = re.findall(r'\b\w+\b', text)
        return len(words)

    @staticmethod
    def is_valid_article(text: str, min_length: int = 100) -> bool:
        """
        Проверка валидности статьи

        Args:
            text: Текст статьи
            min_length: Минимальная длина в символах

        Returns:
            bool: Валидна ли статья
        """
        if not text:
            return False

        # Проверка длины
        if len(text.strip()) < min_length:
            return False

        # Проверка на наличие текста (а не только цифр или спецсимволов)
        words = re.findall(r'\b[а-яА-ЯёЁa-zA-Z]+\b', text)
        if len(words) < 20:  # Минимум 20 слов
            return False

        return True

7. Файл parser/base_parser.py:

In [None]:
import aiohttp
import asyncio
from typing import List, Dict, Any, Optional
import logging
from datetime import datetime
from bs4 import BeautifulSoup
import xml.etree.ElementTree as ET

from utils.rate_limiter import RateLimiter, RequestRetry
from utils.user_agents import get_random_user_agent
from utils.text_cleaner import TextCleaner

logger = logging.getLogger(__name__)

class BaseNewsParser:
    def __init__(self, source_name: str, base_url: str):
        self.source_name = source_name
        self.base_url = base_url
        self.rate_limiter = RateLimiter(max_requests_per_second=2)
        self.retry_handler = RequestRetry(max_retries=3)
        self.text_cleaner = TextCleaner()

        # Сессия для асинхронных запросов
        self.session = None

    async def create_session(self):
        """Создание асинхронной сессии"""
        if self.session is None:
            self.session = aiohttp.ClientSession(
                headers={'User-Agent': get_random_user_agent()},
                timeout=aiohttp.ClientTimeout(total=30)
            )

    async def close_session(self):
        """Закрытие сессии"""
        if self.session:
            await self.session.close()

    async def fetch_url(self, url: str) -> Optional[str]:
        """
        Загрузка URL с учетом ограничений

        Args:
            url: URL для загрузки

        Returns:
            str: HTML контент или None при ошибке
        """
        await self.rate_limiter.wait()

        try:
            await self.create_session()

            async def _fetch():
                async with self.session.get(url) as response:
                    if response.status == 200:
                        return await response.text()
                    else:
                        raise Exception(f"HTTP {response.status}: {response.reason}")

            return await self.retry_handler.execute(_fetch)

        except Exception as e:
            logger.error(f"Ошибка при загрузке {url}: {e}")
            return None

    def parse_rss(self, rss_content: str) -> List[str]:
        """
        Парсинг RSS для получения ссылок на статьи

        Args:
            rss_content: XML контент RSS

        Returns:
            List[str]: Список URL статей
        """
        urls = []
        try:
            root = ET.fromstring(rss_content)

            # Поиск ссылок в разных форматах RSS
            for item in root.findall('.//item'):
                link = item.find('link')
                if link is not None and link.text:
                    urls.append(link.text)

            # Альтернативный поиск
            if not urls:
                for link in root.findall('.//{http://www.w3.org/2005/Atom}link'):
                    href = link.get('href')
                    if href:
                        urls.append(href)

        except Exception as e:
            logger.error(f"Ошибка парсинга RSS: {e}")

        return urls

    async def parse_article_page(self, url: str) -> Optional[Dict[str, Any]]:
        """
        Парсинг страницы статьи

        Args:
            url: URL статьи

        Returns:
            Dict: Данные статьи или None
        """
        html = await self.fetch_url(url)
        if not html:
            return None

        soup = BeautifulSoup(html, 'lxml')

        try:
            # Извлечение заголовка
            title = self._extract_title(soup)

            # Извлечение текста статьи
            content = self._extract_content(soup)

            # Очистка текста
            cleaned_content = self.text_cleaner.clean_html(content)

            # Проверка валидности
            if not self.text_cleaner.is_valid_article(cleaned_content):
                logger.debug(f"Статья невалидна, пропускаем: {url}")
                return None

            # Извлечение даты публикации
            published_at = self._extract_published_date(soup)

            # Извлечение комментариев и рейтинга
            comments_count = self._extract_comments_count(soup)
            rating = self._extract_rating(soup)

            # Подсчет слов
            word_count = self.text_cleaner.count_words(cleaned_content)

            # Извлечение категории
            category = self._extract_category(soup)

            return {
                'title': title,
                'description': cleaned_content,
                'url': url,
                'published_at': published_at,
                'comments_count': comments_count,
                'rating': rating,
                'source': self.source_name,
                'word_count': word_count,
                'category': category
            }

        except Exception as e:
            logger.error(f"Ошибка при парсинге статьи {url}: {e}")
            return None

    # Методы для переопределения в конкретных парсерах
    def _extract_title(self, soup: BeautifulSoup) -> str:
        raise NotImplementedError

    def _extract_content(self, soup: BeautifulSoup) -> str:
        raise NotImplementedError

    def _extract_published_date(self, soup: BeautifulSoup) -> Optional[str]:
        raise NotImplementedError

    def _extract_comments_count(self, soup: BeautifulSoup) -> int:
        return 0

    def _extract_rating(self, soup: BeautifulSoup) -> Optional[float]:
        return None

    def _extract_category(self, soup: BeautifulSoup) -> Optional[str]:
        return None

    async def parse_site(self, rss_url: str, max_articles: int = 1000) -> List[Dict[str, Any]]:
        """
        Основной метод парсинга сайта

        Args:
            rss_url: URL RSS ленты
            max_articles: Максимальное количество статей

        Returns:
            List[Dict]: Список статей
        """
        articles = []

        try:
            # Загрузка RSS
            logger.info(f"Загрузка RSS: {rss_url}")
            rss_content = await self.fetch_url(rss_url)

            if not rss_content:
                logger.error(f"Не удалось загрузить RSS: {rss_url}")
                return articles

            # Получение ссылок на статьи
            article_urls = self.parse_rss(rss_content)
            logger.info(f"Найдено {len(article_urls)} статей в RSS")

            # Ограничение количества статей
            article_urls = article_urls[:max_articles]

            # Парсинг каждой статьи
            for i, url in enumerate(article_urls, 1):
                logger.info(f"Парсинг статьи {i}/{len(article_urls)}: {url}")

                article = await self.parse_article_page(url)

                if article:
                    articles.append(article)
                    logger.info(f"Статья добавлена: {article['title'][:50]}...")
                else:
                    logger.warning(f"Не удалось распарсить статью: {url}")

        except Exception as e:
            logger.error(f"Ошибка при парсинге сайта {self.source_name}: {e}")

        return articles

8. Пример конкретного парсера parser/ria_parser.py:

In [None]:
from datetime import datetime
from bs4 import BeautifulSoup
import re
from .base_parser import BaseNewsParser

class RiaParser(BaseNewsParser):
    def __init__(self):
        super().__init__(source_name="ria", base_url="https://ria.ru")

    def _extract_title(self, soup: BeautifulSoup) -> str:
        # Поиск заголовка
        title_tag = soup.find('h1', class_='article__title')
        if not title_tag:
            title_tag = soup.find('h1')

        if title_tag:
            return title_tag.get_text(strip=True)

        # Поиск в мета-тегах
        meta_title = soup.find('meta', property='og:title')
        if meta_title:
            return meta_title.get('content', '').strip()

        return ""

    def _extract_content(self, soup: BeautifulSoup) -> str:
        # Поиск основного контента
        article_body = soup.find('div', class_='article__body')
        if not article_body:
            article_body = soup.find('article')

        if article_body:
            return str(article_body)

        # Альтернативный поиск
        content_div = soup.find('div', {'itemprop': 'articleBody'})
        if content_div:
            return str(content_div)

        return ""

    def _extract_published_date(self, soup: BeautifulSoup) -> Optional[str]:
        # Поиск даты в мета-тегах
        meta_date = soup.find('meta', property='article:published_time')
        if meta_date:
            date_str = meta_date.get('content', '')
            try:
                # Преобразование в ISO формат
                dt = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
                return dt.isoformat()
            except:
                return date_str

        # Поиск в тексте
        date_tag = soup.find('div', class_='article__info-date')
        if date_tag:
            date_text = date_tag.get_text(strip=True)
            try:
                # Парсинг различных форматов даты
                patterns = [
                    r'(\d{2})\.(\d{2})\.(\d{4})',
                    r'(\d{4})-(\d{2})-(\d{2})',
                ]

                for pattern in patterns:
                    match = re.search(pattern, date_text)
                    if match:
                        if len(match.groups()) == 3:
                            day, month, year = match.groups()
                            if len(year) == 4:
                                dt = datetime(int(year), int(month), int(day))
                                return dt.isoformat()

                return date_text
            except:
                return date_text

        return None

    def _extract_comments_count(self, soup: BeautifulSoup) -> int:
        # Поиск количества комментариев
        comments_tag = soup.find('a', class_='article__comments-link')
        if comments_tag:
            text = comments_tag.get_text(strip=True)
            numbers = re.findall(r'\d+', text)
            if numbers:
                return int(numbers[0])

        return 0

    def _extract_rating(self, soup: BeautifulSoup) -> Optional[float]:
        # Поиск рейтинга (если есть)
        rating_tag = soup.find('div', class_='article__rating')
        if rating_tag:
            text = rating_tag.get_text(strip=True)
            numbers = re.findall(r'\d+\.?\d*', text)
            if numbers:
                return float(numbers[0])

        return None

    def _extract_category(self, soup: BeautifulSoup) -> Optional[str]:
        # Поиск категории
        category_tag = soup.find('a', class_='article__tags-link')
        if category_tag:
            return category_tag.get_text(strip=True)

        # Поиск в хлебных крошках
        breadcrumbs = soup.find('nav', class_='breadcrumbs')
        if breadcrumbs:
            links = breadcrumbs.find_all('a')
            if len(links) > 1:
                return links[-1].get_text(strip=True)

        return None

9. Файл main.py:

In [None]:
import asyncio
import logging
from datetime import datetime
import sys
import os

# Добавление пути для импорта модулей
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

from config import NEWS_SITES, DB_PATH, MIN_ARTICLES_COUNT
from database.db_manager import DatabaseManager
from utils.rate_limiter import RateLimiter

# Импорт парсеров
from parser.ria_parser import RiaParser
from parser.lenta_parser import LentaParser
from parser.gazeta_parser import GazetaParser
from parser.kommersant_parser import KommersantParser

# Настройка логирования
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('news_parser.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

class NewsParserApp:
    def __init__(self):
        self.db_manager = DatabaseManager(DB_PATH)
        self.parsers = {
            'ria': RiaParser(),
            'lenta': LentaParser(),
            'gazeta': GazetaParser(),
            'kommersant': KommersantParser(),
        }

        # Проверяем существование базы
        self.check_database()

    def check_database(self):
        """Проверка состояния базы данных"""
        try:
            count = self.db_manager.get_article_count()
            logger.info(f"Текущее количество статей в базе: {count}")

            if count >= MIN_ARTICLES_COUNT:
                logger.info(f"Достигнуто минимальное количество статей ({MIN_ARTICLES_COUNT})")
            else:
                logger.info(f"Необходимо добавить еще {MIN_ARTICLES_COUNT - count} статей")

        except Exception as e:
            logger.error(f"Ошибка при проверке базы данных: {e}")

    async def parse_single_site(self, site_config, max_articles_per_site=500):
        """Парсинг одного сайта"""
        parser_name = site_config['parser_class'].lower().replace('parser', '')

        if parser_name not in self.parsers:
            logger.error(f"Парсер для {site_config['name']} не найден")
            return []

        parser = self.parsers[parser_name]
        logger.info(f"Начинаем парсинг сайта: {site_config['name']}")

        try:
            articles = await parser.parse_site(
                site_config['rss_url'],
                max_articles=max_articles_per_site
            )

            logger.info(f"Сайт {site_config['name']}: получено {len(articles)} статей")

            # Сохранение статей в базу
            saved_count = 0
            for article in articles:
                article['source'] = site_config['name']
                if self.db_manager.insert_article(article):
                    saved_count += 1

            logger.info(f"Сайт {site_config['name']}: сохранено {saved_count} статей")

            # Закрытие сессии парсера
            await parser.close_session()

            return articles

        except Exception as e:
            logger.error(f"Ошибка при парсинге сайта {site_config['name']}: {e}")
            return []

    async def parse_all_sites(self):
        """Парсинг всех сайтов"""
        all_articles = []

        for site_config in NEWS_SITES:
            articles = await self.parse_single_site(site_config)
            all_articles.extend(articles)

            # Проверяем, достигли ли нужного количества
            current_count = self.db_manager.get_article_count()
            if current_count >= MIN_ARTICLES_COUNT:
                logger.info(f"Достигнуто минимальное количество статей ({MIN_ARTICLES_COUNT})")
                break

        return all_articles

    def print_statistics(self):
        """Вывод статистики"""
        total_count = self.db_manager.get_article_count()

        logger.info("\n" + "="*50)
        logger.info("СТАТИСТИКА ПАРСИНГА")
        logger.info("="*50)
        logger.info(f"Всего статей в базе: {total_count}")
        logger.info(f"Минимально требуемое количество: {MIN_ARTICLES_COUNT}")

        if total_count >= MIN_ARTICLES_COUNT:
            logger.info("✅ ТРЕБОВАНИЯ ВЫПОЛНЕНЫ")
        else:
            logger.info("❌ ТРЕБОВАНИЯ НЕ ВЫПОЛНЕНЫ")
            logger.info(f"Необходимо добавить еще {MIN_ARTICLES_COUNT - total_count} статей")

        logger.info("="*50)

    async def run(self):
        """Основной метод запуска"""
        logger.info("Запуск парсера новостных сайтов...")
        start_time = datetime.now()

        try:
            # Парсинг всех сайтов
            await self.parse_all_sites()

            # Вывод статистики
            self.print_statistics()

            # Пример получения статей по источнику
            ria_articles = self.db_manager.get_articles_by_source('ria', limit=5)
            if ria_articles:
                logger.info("\nПример статей из РИА Новости:")
                for i, article in enumerate(ria_articles[:3], 1):
                    logger.info(f"{i}. {article['title'][:50]}... ({article['word_count']} слов)")

        except KeyboardInterrupt:
            logger.info("Парсинг прерван пользователем")
        except Exception as e:
            logger.error(f"Критическая ошибка: {e}")
        finally:
            # Закрытие всех сессий
            for parser in self.parsers.values():
                await parser.close_session()

            end_time = datetime.now()
            duration = end_time - start_time
            logger.info(f"Парсинг завершен. Время выполнения: {duration}")

def main():
    """Точка входа"""
    app = NewsParserApp()

    # Проверяем, нужно ли запускать парсинг
    current_count = app.db_manager.get_article_count()

    if current_count >= MIN_ARTICLES_COUNT:
        logger.info("База данных уже содержит достаточное количество статей")
        app.print_statistics()
    else:
        # Запускаем асинхронный парсинг
        asyncio.run(app.run())

if __name__ == "__main__":
    main()

10. Файл README.md:

In [None]:
# Парсер новостных сайтов

Проект для сбора новостных статей с русскоязычных сайтов и сохранения их в базу данных SQLite.

## Цель проекта

Собрать корпус новостных статей (не менее 5000 записей) с различных русскоязычных новостных сайтов для последующего анализа данных.

## Структура проекта


news_parser/
├── parser/ # Модули парсеров для разных сайтов
├── database/ # Работа с базой данных
├── utils/ # Вспомогательные утилиты
├── config.py # Конфигурация
├── main.py # Основной скрипт
└── requirements.txt # Зависимости

In [None]:

## Требования

- Python 3.8+
- Установленные зависимости из `requirements.txt`

## Установка

1. Клонируйте репозиторий:
```bash
git clone <repository-url>
cd news_parser

In [None]:
Установите зависимости:

bash
pip install -r requirements.txt
Настройте конфигурацию в config.py:

Добавьте/измените сайты для парсинга

Настройте ограничения запросов

Укажите путь к базе данных

Использование
Запуск парсера:
bash
python main.py
Проверка базы данных:
python
import sqlite3

conn = sqlite3.connect('news_articles.db')
cursor = conn.cursor()

# Количество статей
cursor.execute('SELECT COUNT(*) FROM articles')
print(f"Всего статей: {cursor.fetchone()[0]}")

# Статистика по источникам
cursor.execute('''
    SELECT source, COUNT(*) as count
    FROM articles
    GROUP BY source
    ORDER BY count DESC
''')
print("\nСтатистика по источникам:")
for row in cursor.fetchall():
    print(f"{row[0]}: {row[1]} статей")

conn.close()

Особенности реализации
1. Поддержка сайтов
РИА Новости (ria.ru)

Лента.ру (lenta.ru)

Газета.ру (gazeta.ru)

Коммерсантъ (kommersant.ru)

ТАСС (tass.ru)

РБК (rbc.ru)

2. Ограничения и вежливый парсинг
Rate limiting (максимум 2 запроса в секунду)

Случайные User-Agent

Экспоненциальная задержка при повторных попытках

Обработка HTTP-ошибок

3. Очистка текста
Удаление HTML-тегов

Удаление медиа-контента (видео, аудио, изображения)

Удаление скриптов и стилей

Нормализация пробелов и переносов строк

4. База данных
SQLite с таблицей articles

Поля: guid, title, description, url, published_at, comments_count, created_at_utc, rating, source, word_count, category

Индексы для оптимизации запросов

Поля базы данных
Поле	Тип	Описание
guid	TEXT	Уникальный идентификатор (UUID v4)
title	TEXT	Заголовок статьи
description	TEXT	Очищенный текст статьи
url	TEXT	Ссылка на статью (уникальная)
published_at	TEXT	Дата публикации (ISO формат)
comments_count	INTEGER	Количество комментариев
created_at_utc	TEXT	Время создания записи в БД (UTC)
rating	REAL	Рейтинг/оценка статьи
source	TEXT	Источник (название сайта)
word_count	INTEGER	Количество слов в статье
category	TEXT	Категория/рубрика

In [None]:
Создайте новый класс парсера в parser/:

python
from .base_parser import BaseNewsParser

class NewSiteParser(BaseNewsParser):
    def __init__(self):
        super().__init__(source_name="newsite", base_url="https://newsite.ru")

    def _extract_title(self, soup):
        # Реализация извлечения заголовка
        pass

    # ... остальные методы
Добавьте сайт в конфигурацию config.py:

python
NEWS_SITES = [
    # ... существующие сайты
    {
        "name": "newsite",
        "base_url": "https://newsite.ru",
        "rss_url": "https://newsite.ru/rss",
        "parser_class": "NewSiteParser"
    }
]
Добавьте парсер в main.py:

python
from parser.newsite_parser import NewSiteParser

self.parsers = {
    # ... существующие парсеры
    'newsite': NewSiteParser(),
}
Примеры использования данных
Анализ тональности:
python
from textblob import TextBlob

# Анализ тональности статьи
text = "Текст статьи для анализа"
blob = TextBlob(text)
sentiment = blob.sentiment
print(f"Тональность: {sentiment.polarity}, Субъективность: {sentiment.subjectivity}")
Частотный анализ слов:
python
from collections import Counter
import re

# Подсчет частоты слов
words = re.findall(r'\b\w+\b', text.lower())
word_freq = Counter(words)
print("Самые частые слова:", word_freq.most_common(10))

In [None]:

## Объяснение последовательности действий:

### 1. Выбор сайтов:
Я выбрал 6 популярных русскоязычных новостных сайтов с разнообразным контентом:
- РИА Новости (государственные новости)
- Лента.ру (общие новости)
- Газета.ру (общественно-политические новости)
- Коммерсантъ (деловые новости)
- ТАСС (официальные новости)
- РБК (экономические новости)

### 2. Организация парсинга:
- Использован объектно-ориентированный подход
- Базовый класс `BaseNewsParser` содержит общую логику
- Для каждого сайта создан свой класс-наследник
- Используется асинхронный парсинг для эффективности

### 3. Учет ограничений:
- Rate limiting через класс `RateLimiter`
- Случайные User-Agent для обхода блокировок
- Экспоненциальная задержка при повторных попытках
- Ограничение частоты запросов (2 запроса в секунду)

### 4. Очистка текста:
- Удаление HTML-тегов через BeautifulSoup
- Удаление медиа-элементов (видео, аудио, изображения)
- Удаление скриптов и стилей
- Нормализация текста

### 5. Работа с базой данных:
- SQLite для простоты развертывания
- Автоматическая генерация GUID
- Индексы для оптимизации запросов
- Проверка уникальности по URL

### 6. Особенности реализации:
- Использование RSS для получения списка статей
- Парсинг полной страницы для получения контента
- Валидация статей (минимальная длина, наличие текста)
- Статистика и мониторинг процесса парсинга

Этот проект позволяет собрать более 5000 статей с различных источников, обеспечивая при этом вежливый парсинг и качественную очистку данных.