In [None]:


# config/settings.py
from pydantic import BaseSettings
from typing import Optional
import os
from dotenv import load_dotenv

load_dotenv()

class Settings(BaseSettings):
    # Zoom API Settings
    ZOOM_CLIENT_ID: str = os.getenv('ZOOM_CLIENT_ID')
    ZOOM_CLIENT_SECRET: str = os.getenv('ZOOM_CLIENT_SECRET')
    ZOOM_ACCOUNT_ID: str = os.getenv('ZOOM_ACCOUNT_ID')
    ZOOM_BASE_URL: str = "https://api.zoom.us/v2"
    
    # Telegram Settings
    TELEGRAM_BOT_TOKEN: str = os.getenv('TELEGRAM_BOT_TOKEN')
    TELEGRAM_CHAT_IDS: list = os.getenv('TELEGRAM_CHAT_IDS', '').split(',')
    
    # Application Settings
    CHECK_INTERVAL_MINUTES: int = 30
    MAX_RETRIES: int = 3
    RETRY_DELAY_SECONDS: int = 5
    
    class Config:
        case_sensitive = True

settings = Settings()

# services/zoom_client.py
import requests
import time
import jwt
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from config.settings import settings
from utils.helpers import retry_on_failure

class ZoomClient:
    def __init__(self):
        self.base_url = settings.ZOOM_BASE_URL
        self.client_id = settings.ZOOM_CLIENT_ID
        self.client_secret = settings.ZOOM_CLIENT_SECRET
        self.account_id = settings.ZOOM_ACCOUNT_ID
        self.access_token = None
        self.token_expiry = None
        self.logger = logging.getLogger(__name__)

    def _generate_jwt_token(self) -> str:
        token_exp = datetime.utcnow() + timedelta(minutes=60)
        
        payload = {
            "iss": self.client_id,
            "exp": token_exp.timestamp()
        }
        
        return jwt.encode(
            payload,
            self.client_secret,
            algorithm='HS256'
        )

    def _ensure_access_token(self):
        if not self.access_token or datetime.now() >= self.token_expiry:
            self._refresh_access_token()

    def _refresh_access_token(self):
        token_url = "https://zoom.us/oauth/token"
        auth_token = self._generate_jwt_token()
        
        response = requests.post(
            token_url,
            headers={
                "Authorization": f"Basic {auth_token}"
            },
            params={
                "grant_type": "account_credentials",
                "account_id": self.account_id
            }
        )
        
        if response.status_code == 200:
            token_data = response.json()
            self.access_token = token_data["access_token"]
            self.token_expiry = datetime.now() + timedelta(seconds=token_data["expires_in"])
        else:
            raise Exception(f"Failed to refresh access token: {response.text}")

    @retry_on_failure
    def get_meetings_with_summaries(self, from_date: Optional[datetime] = None) -> List[Dict]:
        self._ensure_access_token()
        
        if not from_date:
            from_date = datetime.now() - timedelta(days=1)
            
        meetings_url = f"{self.base_url}/users/me/meetings"
        meetings_with_summaries = []
        
        try:
            response = requests.get(
                meetings_url,
                headers={
                    "Authorization": f"Bearer {self.access_token}",
                    "Content-Type": "application/json"
                },
                params={
                    "type": "scheduled",
                    "from": from_date.strftime("%Y-%m-%d")
                }
            )
            
            if response.status_code == 200:
                meetings = response.json().get("meetings", [])
                
                for meeting in meetings:
                    if self._has_ai_summary(meeting["id"]):
                        meetings_with_summaries.append(meeting)
                        
                return meetings_with_summaries
            else:
                self.logger.error(f"Failed to fetch meetings: {response.text}")
                return []
                
        except Exception as e:
            self.logger.error(f"Error fetching meetings: {str(e)}")
            raise

    @retry_on_failure
    def get_ai_summary(self, meeting_id: str) -> Dict:
        self._ensure_access_token()
        
        summary_url = f"{self.base_url}/meetings/{meeting_id}/summary"
        
        try:
            response = requests.get(
                summary_url,
                headers={
                    "Authorization": f"Bearer {self.access_token}",
                    "Content-Type": "application/json"
                }
            )
            
            if response.status_code == 200:
                return response.json()
            else:
                self.logger.error(f"Failed to fetch AI summary: {response.text}")
                return None
                
        except Exception as e:
            self.logger.error(f"Error fetching AI summary: {str(e)}")
            raise

    def _has_ai_summary(self, meeting_id: str) -> bool:
        try:
            summary = self.get_ai_summary(meeting_id)
            return bool(summary and summary.get("summary_text"))
        except:
            return False

# services/telegram_bot.py
from telegram import Bot
from telegram.error import TelegramError
import asyncio
import logging
from config.settings import settings
from utils.helpers import retry_on_failure

class TelegramBot:
    def __init__(self):
        self.bot = Bot(token=settings.TELEGRAM_BOT_TOKEN)
        self.chat_ids = settings.TELEGRAM_CHAT_IDS
        self.logger = logging.getLogger(__name__)

    @retry_on_failure
    async def send_message(self, chat_id: str, text: str):
        try:
            # Split message if it exceeds Telegram's limit
            max_length = 4096
            messages = [text[i:i+max_length] for i in range(0, len(text), max_length)]
            
            for message in messages:
                await self.bot.send_message(
                    chat_id=chat_id,
                    text=message,
                    parse_mode='HTML'
                )
                
        except TelegramError as e:
            self.logger.error(f"Failed to send message to chat {chat_id}: {str(e)}")
            raise

    async def broadcast_summary(self, summary: str):
        tasks = []
        for chat_id in self.chat_ids:
            task = asyncio.create_task(self.send_message(chat_id, summary))
            tasks.append(task)
            
        await asyncio.gather(*tasks)

# services/summary_processor.py
from typing import Dict
import re
from datetime import datetime

class SummaryProcessor:
    @staticmethod
    def format_summary(meeting_data: Dict, summary_data: Dict) -> str:
        """Format the meeting summary for Telegram message"""
        
        meeting_topic = meeting_data.get("topic", "Untitled Meeting")
        meeting_time = datetime.fromisoformat(meeting_data.get("start_time", "").replace('Z', '+00:00'))
        
        formatted_summary = f"""
🎯 <b>{meeting_topic}</b>
📅 {meeting_time.strftime('%Y-%m-%d %H:%M')} UTC

<b>Meeting Summary:</b>
{summary_data.get('summary_text', 'No summary available')}

"""

        # Add chapters if available
        chapters = summary_data.get('chapters', [])
        if chapters:
            formatted_summary += "\n<b>Key Points:</b>\n"
            for chapter in chapters:
                formatted_summary += f"• {chapter.get('chapter_text', '')}\n"

        # Add highlights if available
        highlights = summary_data.get('highlights', [])
        if highlights:
            formatted_summary += "\n<b>Highlights:</b>\n"
            for highlight in highlights:
                formatted_summary += f"⭐️ {highlight}\n"

        return formatted_summary.strip()

    @staticmethod
    def clean_text(text: str) -> str:
        """Clean and format the text"""
        # Remove multiple newlines
        text = re.sub(r'\n\s*\n', '\n\n', text)
        # Remove special characters
        text = re.sub(r'[^\w\s\n.,:;!?()-]', '', text)
        return text.strip()

# utils/helpers.py
import time
import logging
from functools import wraps
from config.settings import settings

logger = logging.getLogger(__name__)

def retry_on_failure(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        max_retries = settings.MAX_RETRIES
        retry_delay = settings.RETRY_DELAY_SECONDS
        
        for attempt in range(max_retries):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                if attempt == max_retries - 1:
                    logger.error(f"Final retry attempt failed for {func.__name__}: {str(e)}")
                    raise
                    
                logger.warning(f"Attempt {attempt + 1} failed for {func.__name__}: {str(e)}")
                time.sleep(retry_delay)
                
    return wrapper

def setup_logging():
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.StreamHandler(),
            logging.FileHandler('app.log')
        ]
    )

# main.py
import asyncio
import schedule
import time
import logging
from datetime import datetime, timedelta
from config.settings import settings
from services.zoom_client import ZoomClient
from services.telegram_bot import TelegramBot
from services.summary_processor import SummaryProcessor
from utils.helpers import setup_logging

class ZoomTelegramBot:
    def __init__(self):
        self.zoom_client = ZoomClient()
        self.telegram_bot = TelegramBot()
        self.summary_processor = SummaryProcessor()
        self.logger = logging.getLogger(__name__)

    async def process_meetings(self):
        try:
            # Get meetings from the last 24 hours
            from_date = datetime.now() - timedelta(days=1)
            meetings = self.zoom_client.get_meetings_with_summaries(from_date)
            
            for meeting in meetings:
                summary_data = self.zoom_client.get_ai_summary(meeting["id"])
                
                if summary_data:
                    formatted_summary = self.summary_processor.format_summary(
                        meeting,
                        summary_data
                    )
                    
                    await self.telegram_bot.broadcast_summary(formatted_summary)
                    self.logger.info(f"Successfully processed meeting: {meeting['id']}")
                    
        except Exception as e:
            self.logger.error(f"Error processing meetings: {str(e)}")

async def run_bot():
    bot = ZoomTelegramBot()
    await bot.process_meetings()

def schedule_job():
    asyncio.run(run_bot())

def main():
    setup_logging()
    logger = logging.getLogger(__name__)
    
    # Schedule the job to run periodically
    schedule.every(settings.CHECK_INTERVAL_MINUTES).minutes.do(schedule_job)
    
    logger.info("Starting Zoom-Telegram Summary Bot...")
    
    # Run the first job immediately
    schedule_job()
    
    # Keep the script running
    while True:
        schedule.run_pending()
        time.sleep(1)

if __name__ == "__main__":
    main()

In [None]:
def main():
    # Initialize configs and clients
    zoom_client = ZoomClient(credentials)
    telegram_bot = TelegramBot(bot_token)
    
    # Get meetings with summaries
    meetings = zoom_client.get_meetings_with_summaries()
    
    # Process each meeting
    for meeting in meetings:
        summary = zoom_client.get_ai_summary(meeting.id)
        formatted_summary = SummaryProcessor.format(summary)
        telegram_bot.send_to_chats(formatted_summary)