In [1]:
import os
import json
import logging
import asyncio 
import nest_asyncio
nest_asyncio.apply()
from dotenv import load_dotenv
from datetime import datetime
from pathlib import Path
from telethon import TelegramClient, events, errors
load_dotenv()

API_ID = os.getenv("API_ID")
API_HASH = os.getenv("API_HASH")

In [2]:
CHANNELS = [
    'https://t.me/lobelia4cosmetics',
    'https://t.me/tikvahpharma',
    'chemedtelegram', # Username for Chemed
]

# Path Setup
BASE_DATA_PATH = Path("data/raw")
IMAGE_BASE_PATH = BASE_DATA_PATH / "images"
JSON_BASE_PATH = BASE_DATA_PATH / "telegram_messages"
LOG_PATH = Path("logs")

In [3]:
for path in [IMAGE_BASE_PATH, JSON_BASE_PATH, LOG_PATH]:
    path.mkdir(parents=True, exist_ok=True)

# Logging Setup
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(LOG_PATH / "scraping.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

In [None]:
async def scrape_channel(client, channel_username):
    """Scrapes messages and images from a single channel."""
    try:
        # Clean channel name for folder usage
        entity = await client.get_entity(channel_username)
        channel_name = entity.username or entity.title
        logger.info(f"Starting scrape for: {channel_name}")

        messages_data = []
        
        async for message in client.iter_messages(entity, limit=100): # Limit set to 100 for testing
            # Prepare message metadata
            msg_id = message.id
            msg_date = message.date.strftime("%Y-%m-%d")
            
            # 1. Download Image if present
            image_path = None
            if message.photo:
                image_folder = IMAGE_BASE_PATH / channel_name
                image_folder.mkdir(parents=True, exist_ok=True)
                
                # Format: data/raw/images/{channel_name}/{message_id}.jpg
                file_name = f"{msg_id}.jpg"
                save_path = image_folder / file_name
                
                image_path = await client.download_media(message.photo, file=str(save_path))
                logger.info(f"Downloaded image for msg {msg_id}")

            # 2. Extract Data structure
            data = {
                "message_id": msg_id,
                "date": message.date.isoformat(),
                "text": message.text,
                "views": message.views,
                "forwards": message.forwards,
                "image_path": str(image_path) if image_path else None,
                "channel": channel_name
            }
            messages_data.append(data)

        # 3. Store in Data Lake (Partitioned by Date)
        today_str = datetime.now().strftime("%Y-%m-%d")
        partition_path = JSON_BASE_PATH / today_str
        partition_path.mkdir(parents=True, exist_ok=True)
        
        json_file = partition_path / f"{channel_name}.json"
        with open(json_file, 'w', encoding='utf-8') as f:
            json.dump(messages_data, f, ensure_ascii=False, indent=4)
        
        logger.info(f"Successfully saved {len(messages_data)} messages for {channel_name}")

    except errors.FloodWaitError as e:
        logger.error(f"Rate limit exceeded. Wait for {e.seconds} seconds.")
        await asyncio.sleep(e.seconds)
    except Exception as e:
        logger.error(f"Error scraping {channel_username}: {str(e)}")
async def main():
    async with TelegramClient('scraper_session', API_ID, API_HASH) as client:
        for channel in CHANNELS:
            await scrape_channel(client, channel)

if __name__ == "__main__":
    asyncio.run(main())

2026-01-22 13:08:10,242 - INFO - Connecting to 149.154.167.91:443/TcpFull...
2026-01-22 13:08:10,360 - INFO - Connection to 149.154.167.91:443/TcpFull complete!
