In [None]:
from telethon import TelegramClient
import telethon
from telethon import functions, utils
from telethon.tl.types import Message, Channel
from telethon.tl.types.messages import DialogFilters
from telethon.tl.custom.file import File
from tqdm import tqdm
from dotenv import load_dotenv
from typing import List, Dict, Any
import os
import asyncio
import sqlite3
import json
from datetime import date, datetime
from telethon.tl.types import InputMessagesFilterDocument
from telethon.errors import FloodWaitError


load_dotenv()  # take environment variables from .env.
api_id = os.environ['TG_API_ID']
api_hash = os.environ['TG_API_HASH']
phone = os.environ['TG_PHONE']
username = 'Hex'


# Paths
MEDIA_PATH = './static/media/'
DB_PATH = './GOTTH/goth.db'


# Ensure media directory exists
os.makedirs(MEDIA_PATH, exist_ok=True)

class DownloadProgressBar(tqdm):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs, leave=False)
    def update_to(self, current, total):
        self.total = total
        self.update(current - self.n)


"""
async def download_media(client, message: Message) -> str:
    with DownloadProgressBar(unit='B', unit_scale=True) as pb:
        return await client.download_media(message, MEDIA_PATH, progress_callback=pb.update_to)
"""
async def download_media(client, message: Message) -> str:
    return await client.download_media(message, MEDIA_PATH)


# Flood prevention
MAX_CONCURRENT_TASKS = 5
semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS)


async def delay():
    await asyncio.sleep(.1)

async def exponential_backoff(attempt):
    wait_time = 2 ** attempt
    print(f"Rate limit hit. Waiting for {wait_time} seconds before retrying.")
    await asyncio.sleep(wait_time)

async def process_with_semaphore(callback: asyncio.Task, task_id: int) -> None:
    async with semaphore:
        max_attempts = 5
        for attempt in range(max_attempts):
            try:
                await delay()
                await callback
                break
            except FloodWaitError as e:
                if attempt < max_attempts - 1:
                    await exponential_backoff(attempt)
                else:
                    print(f"Max attempts reached. Skipping: {task_id}")
                    return



## Experiments and scratch

In [None]:

async def main():
    # Getting information about yourself
    me = await client.get_me()

    # "me" is a user object. You can pretty-print
    # any Telegram object with the "stringify" method:
    print(me.stringify())

    # When you print something, you see a representation of it.
    # You can access all attributes of Telegram objects with
    # the dot operator. For example, to get the username:
    username = me.username
    print(username)
    print(me.phone)

    # You can print all the dialogs/conversations that you are part of:
    # async for dialog in client.iter_dialogs():
    #     print(dialog.name, 'has ID', dialog.id)

    # You can send messages to yourself...
    await client.send_message('me', f'Hello, {username}!')
    # ...to some chat ID
    # await client.send_message(-100123456, 'Hello, group!')
    # ...to your contacts
    # await client.send_message('+34600123123', 'Hello, friend!')
    # ...or even to any username
    # await client.send_message('username', 'Testing Telethon!')

    # You can, of course, use markdown in your messages:
    message = await client.send_message(
        'me',
        'This message has **bold**, `code`, __italics__ and '
        'a [nice website](https://example.com)!',
        link_preview=False
    )

    # Sending a message returns the sent message object, which you can use
    print(message.raw_text)

    # You can reply to messages directly if you have a message object
    await message.reply('Cool!')

    # Or send files, songs, documents, albums...
    # await client.send_file('me', '/home/me/Pictures/holidays.jpg')

    # You can print the message history of any chat:
    async for message in client.iter_messages('me', limit=10):
        print(message.id, message.text)

        # You can download media from messages, too!
        # The method will return the path where the file was saved.
        """
        if message.photo:
            path = await message.download_media()
            print('File saved to', path)  # printed after download is done
        """

async with client:
    await main()

In [None]:
client = TelegramClient(username, api_id, api_hash)
await client.connect()

In [None]:
dialogs = await client.get_dialogs()

dialog_by_id = {}
for d in dialogs:
    real_id, peer_type = utils.resolve_id(d.id)
    print(real_id, peer_type)
    dialog_by_id[real_id] = d

# client.get_channels(dialog_by_id.keys())

In [None]:
# Find recent messages in a chat
filtered = filter(lambda dialog: dialog.name.find('Abrahams') > -1, dialogs)
f: telethon.tl.custom.dialog.Dialog = next(filtered)
last_messages = await client.get_messages(f, limit=20)

[print(m) for m in last_messages]

In [None]:
# Delete most recent message
# await client.delete_messages(f, last_messages[0], revoke=True)

In [None]:
chatFolders: DialogFilters = await client(functions.messages.GetDialogFiltersRequest())
# print(chatFolders.stringify())

In [None]:
mediaFolder = None
for folder in chatFolders.filters:
    if not hasattr(folder, 'title'):
        continue
    if folder.title == 'MediaView':
        mediaFolder = folder
        break

# print(mediaFolder.stringify())

In [None]:
# target_channels = [dialog_by_id[peer.channel_id] for peer in mediaFolder.include_peers]
# target_channels = [client.get_entity(peer) for peer in mediaFolder.include_peers]
# await client.get_entity(peer)

target_channels = await asyncio.gather(*[client.get_entity(peer) for peer in mediaFolder.include_peers])
for channel in target_channels:
    print(channel.stringify())

In [None]:
# Test messages in the first two channels
"""
for channel_id in target_channels[0:2]:
    async for message in client.iter_messages(channel_id, limit=5):
        print(message.stringify())
"""

# Grab most recent post @ self to test twitter embed
test_message = None
async for message in client.iter_messages('me', limit=1):
    test_message = message

print(test_message.stringify())

# test_message.media = MessageMediaWebPage
print(test_message.document)

In [None]:
"""
async def collect_posts(client, target_channels, limit=5):
    post_collection = []
    for channel in target_channels:
        async for message in client.iter_messages(channel, limit=limit):
            post_collection.append(message)
    return post_collection
posts = await collect_posts(client, target_channels, limit=5)
"""

## Older version

In [None]:


async def get_media_info(client, message: Message, channel: Channel, downloaded_files: Dict[str, str]) -> Dict[str, Any]:
    media_info = {
        'id': message.id,
        'date': str(message.date),  # Convert to string for JSON serialization
        'channel': channel.title,
        'text': message.text,  # Include message text
    }

    if message.file:
        file_id = message.file.id
        if file_id in downloaded_files:
            media_info['path'] = downloaded_files[file_id]
        else:
            media_info['path'] = await download_media(client, message)
            downloaded_files[file_id] = media_info['path']

        if message.photo:
            media_type = 'photo'
        elif message.gif:
            media_type = 'gif'
        elif message.video:
            media_type = 'video'
        elif message.document:
            mime_type = message.document.mime_type
            media_type = mime_type.split('/')[-1] if 'image/' in mime_type else 'document'
        else:
            media_type = 'unknown'

        media_info.update({
            'type': media_type,
            'file_id': file_id,
            'file_name': message.file.name,
            'file_size': message.file.size,
        })
    elif message.web_preview:
        media_info.update({
            'type': 'web_preview',
            'url': message.web_preview.url,
        })
    else:
        return None

    return media_info

async def collect_media(client, message_collection: List[Message], channels: Dict[int, Channel]) -> List[Dict[str, Any]]:
    media_collection = []
    downloaded_files = {}
    for message in tqdm(message_collection):
        channel = channels.get(message.peer_id.channel_id)
        media_info = await get_media_info(client, message, channel, downloaded_files)
        if media_info:
            media_collection.append(media_info)
            print(f"Has {media_info['type']}: {message.id}")
        else:
            print(f"No media found: {message.id}")
    return media_collection


# Export to JSON
def json_serial(obj):
    """JSON serializer for objects not serializable by default json code"""

    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    elif isinstance(obj, telethon.tl.custom.file.File):
        return json.dumps({
            'id': obj.id, 'name': obj.name, 'ext': obj.ext, 'mime_type': obj.mime_type})
    elif hasattr(obj, 'stringify'):
        return obj.stringify()
    raise TypeError ("Type %s not serializable" % type(obj))

def export_to_json(media_collection: List[Dict[str, Any]], filename: str = 'media_data.json') -> None:
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(media_collection, f, ensure_ascii=False, indent=2, default=json_serial)
    print(f"Media data exported to {filename}")





### NEW CODE ###

# SQLite database setup
db_path = './GOTTH/goth.db'
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

# Create table if it doesn't exist
cursor.execute('''
CREATE TABLE IF NOT EXISTS media_items (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    file_id TEXT UNIQUE,
    date DATETIME,
    channel TEXT,
    text TEXT,
    type TEXT,
    file_name TEXT,
    file_size INTEGER,
    url TEXT
)
''')
conn.commit()


async def process_channel(client, channel):
    download_folder = './static/media/'
    os.makedirs(download_folder, exist_ok=True)

    async for message in client.iter_messages(channel, limit=10):  # Adjust limit as needed
        if message.media:
            file_path = await download_media(client, message, download_folder)
            if file_path:
                file_name = os.path.basename(file_path)
                file_id = message.file.id if message.file else None
                media_type = message.file.mime_type.split('/')[0] if message.file else 'unknown'

                cursor.execute('''
                INSERT OR REPLACE INTO media_items
                (file_id, date, channel, text, type, file_name, file_size, url)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                ''', (
                    file_id,
                    message.date,
                    channel.title,
                    message.text,
                    media_type,
                    file_name,
                    message.file.size if message.file else 0,
                    f'/media/{file_name}'
                ))
                conn.commit()

conn.close()
await client.disconnect()

## New version

In [28]:
client = TelegramClient(username, api_id, api_hash)
await client.connect()
conn = sqlite3.connect(DB_PATH)


In [None]:
async def collect_posts(
    client, target_channels: List[Channel], limit: int = 10
) -> List[Message]:
    return [
        message
        for channel in target_channels
        async for message in client.iter_messages(channel, limit=limit)
    ]


async def process_message(client, conn, message: Message, channel: Channel) -> None:
    if message.file:
        file: File = message.file
        file_id = file.media.id
        if file.size > 50_000_000:
            print(f"Skipping large file: {file_id}")
            return
        if file.sticker_set is not None:
            print(f"Skipping sticker: {file_id}")
            return

        # Check if file_id already exists in database
        cursor = conn.cursor()
        cursor.execute("SELECT file_id FROM media_items WHERE file_id = ?", (file_id,))
        if cursor.fetchone():
            # Update with channel ID and message ID
            # TODO: Disable after running this once!!
            cursor.execute(
                """
            UPDATE media_items
            set channel_id = ?, message_id = ?
            where file_id = ?
            """,
                (channel.id, message.id, file_id),
            )
            conn.commit()

            print(f"Skipping download for existing file_id: {file_id}")
            return

        file_path = await download_media(client, message)
        if file_path:
            file_name = os.path.basename(file_path)

            if message.video or file_name.lower().endswith(".mp4"):
                media_type = "video"
            elif message.gif:
                media_type = "gif"
            elif message.photo:
                media_type = "photo"
            elif message.document:
                mime_type = message.document.mime_type
                media_type = (
                    mime_type.split("/")[-1] if "image/" in mime_type else "document"
                )
            else:
                media_type = "unknown"

            cursor.execute(
                """
            INSERT OR REPLACE INTO media_items
            (file_id, date, channel_id, text, type, file_name, file_size, url, message_id)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            """,
                (
                    file_id,
                    message.date,
                    channel.id,
                    message.text,
                    media_type,
                    file_name,
                    message.file.size,
                    f"/media/{file_name}",
                    message.id,
                ),
            )
            conn.commit()
            print(f"Processed {media_type}: {file_id}")
    elif message.web_preview:
        print(f"Skipping web preview: {message.id}")
    else:
        print(f"No media found: {message.id}")


async def process_channel(client: TelegramClient, channel: Channel):
    print(f"Processing channel: {channel.title}")
    cursor = conn.cursor()
    query = cursor.execute(
        """
        SELECT message_id FROM media_items
        WHERE media_items.channel_id == ?
        AND media_items.message_id IS NOT NULL
        ORDER BY message_id ASC
        """,
        (channel.id,),
    )
    oldest_post = query.fetchone()


    if oldest_post:
        message_task = client.iter_messages(channel, limit=100, offset_id = oldest_post[0])
    else:
        message_task = client.iter_messages(channel, limit=100)


    # messages = [message async for message in client.iter_messages(channel, limit=10)]
    tasks = []
    async for message in message_task:
        # Adjust limit as needed - 300
        # print(f"Message is {(datetime.now().astimezone() - message.date).days} days old")
        # if (datetime.now().astimezone() - message.date).days > 7:
        #    break
        task = asyncio.create_task(
            process_with_semaphore(
                process_message(client, conn, message, channel), message.id
            )
        )
        tasks.append(task)
    await asyncio.gather(*tasks)


# PROBLEMS
# 1. How to handle media that is not a file (e.g. web previews)?
# 2. Check what happens to Twitter embeds
# 3. Paginate / search by date?
async def main(conn: sqlite3.Connection):
    chat_folders: DialogFilters = await client(
        functions.messages.GetDialogFiltersRequest()
    )
    media_folder = next(
        (
            folder
            for folder in chat_folders.filters
            if hasattr(folder, "title") and folder.title == "MediaView"
        ),
        None,
    )

    if not media_folder:
        print("MediaView folder not found")
        return

    target_channels = await asyncio.gather(
        *[client.get_entity(peer) for peer in media_folder.include_peers]
    )
    # Keep only channels where channel.title is not in the database
    # cursor = conn.cursor()
    # cursor.execute('SELECT DISTINCT channel FROM media_items')
    # existing_channels = set(row[0] for row in cursor.fetchall())
    # target_channels = [channel for channel in target_channels if channel.title not in existing_channels]

    # target_channels = [channel for channel in target_channels if channel.title.find("Macro") > -1]
    print([channel.title for channel in target_channels])
    await asyncio.gather(
        *[process_channel(client, channel) for channel in target_channels]
    )

    print("Processing complete")


# Run the main function
await main(conn)

In [27]:

conn.close()
await client.disconnect()

## Extra cleanup scripts

In [None]:
import sqlite3
import os

DB_PATH = './GOTTH/goth.db'
MEDIA_PATH = './static/media/'

def create_db():
    # SQLite database setup
    cursor = conn.cursor()
    # Create table if it doesn't exist
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS media_items (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        file_id INTEGER UNIQUE,
        date DATETIME,
        channel_id INTEGER,
        text TEXT,
        type TEXT,
        file_name TEXT,
        file_size INTEGER,
        url TEXT,
        seen BOOLEAN DEFAULT 0
    )
    ''')
    conn.commit()
# create_db()

def create_channels_db():
    cursor = conn.cursor()
    cursor.execute('''
                   create table if not exists channels (
                   id long primary key,
                    title TEXT
                   )
                   ''')
    conn.commit()
# create_channels_db()

def populate_channels_db():
    cursor = conn.cursor()
    for d in dialogs:
        if isinstance(d.entity, Channel):
            print(d.entity.title)

            cursor.execute('''
            INSERT OR REPLACE INTO channels
            (id, title)
            VALUES (?, ?)
            ''', (
                d.entity.id,
                d.entity.title
            ))
            conn.commit()
populate_channels_db()

def fix_mp4_entries():
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    # Get all entries
    cursor.execute('SELECT id, file_name, type FROM media_items')
    entries = cursor.fetchall()
    updated_count = 0
    for entry_id, file_name, current_type in entries:
        if file_name.lower().endswith('.mp4') and current_type != 'video':
            # Update the entry
            cursor.execute('UPDATE media_items SET type = ? WHERE id = ?', ('video', entry_id))
            updated_count += 1
            print(f"Updated entry {entry_id}: {file_name} from {current_type} to video")
        # Check if the file exists and has the correct extension
        file_path = os.path.join(MEDIA_PATH, file_name)
        if os.path.exists(file_path):
            _, file_extension = os.path.splitext(file_path)
            if file_extension.lower() != '.mp4' and current_type == 'video':
                new_file_name = f"{os.path.splitext(file_name)[0]}.mp4"
                new_file_path = os.path.join(MEDIA_PATH, new_file_name)
                os.rename(file_path, new_file_path)
                cursor.execute('UPDATE media_items SET file_name = ?, url = ? WHERE id = ?',
                               (new_file_name, f'/media/{new_file_name}', entry_id))
                updated_count += 1
                print(f"Renamed file and updated entry {entry_id}: {file_name} to {new_file_name}")
    conn.commit()
    conn.close()
    print(f"Updated {updated_count} entries")
# fix_mp4_entries()


def add_column():
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()

    cursor.execute('ALTER TABLE media_items ADD COLUMN channel_id INTEGER')
    conn.commit()
    conn.close()
# add_column()

def set_viewed_true():
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    cursor.execute('UPDATE media_items SET seen = 1')
    conn.commit()
    conn.close()
# set_viewed_true()




def fix_unavailable_entries():
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    cursor.execute('SELECT id, file_name, type FROM media_items')
    entries = cursor.fetchall()
    updated_count = 0
    for entry_id, file_name, current_type in entries:
        if not os.path.exists(os.path.join(MEDIA_PATH, file_name)):
            # Delete the entry
            cursor.execute('DELETE FROM media_items WHERE id = ?', (entry_id,))
            updated_count += 1
            print(f"Removed entry {entry_id} - file {file_name} ")
    conn.commit()
    print(f"Updated {updated_count} entries")
    conn.close()
# fix_unavailable_entries()
