# Telegram Bot Message From Interested Twitter Users

## Setup Environment

In [1]:
import time
import feedparser
import requests
from telegram import Bot
from datetime import datetime,timedelta
from bs4 import BeautifulSoup
from translate import Translator
from dotenv import load_dotenv
import os
import asyncio
import aiohttp
from typing import List, Dict
from dateutil import tz, parser

### Environment Variable

In [2]:
load_dotenv()  # Load environment variables from .env file

TOKEN = os.getenv("TOKEN")
target_chat_id = os.getenv("target_chat_id")
TELEGRAM_API_BASE_URL=os.getenv("TELEGRAM_API_BASE_URL", "https://api.telegram.org/bot")
RSS_BASE_URL=os.getenv("RSS_BASE_URL", "http://rsshub.app")

### Paths, Constants, Globals

In [3]:
# at least specify the username, name optional
TWITTER_USER_LIST_FILE="twitter_list.txt"

# here to save the cutoff time
CUTOFF_TIME_FILE="cutoff_time.txt"

# here to save the log file
LOG_FILE="log_file.txt"


- Globals

In [4]:
# the newest time from fetched twitter entries
# use this to filter newer ones or fetch newer ones 
newest_time_str = ""

# one hour?
wait_interval = 3600  # in seconds

# we are runing async
loop = asyncio.get_event_loop()


## Read Input

### Read Twitter User List

In [5]:
def read_twitter_user_url_list():
    with open(TWITTER_USER_LIST_FILE, "r", encoding="utf-8") as file:
        lines = file.readlines()
    
    url_list = []
    for line in lines:
        info = line.strip().split(',')
        twitter_id = info[0].strip()
        url = f"{RSS_BASE_URL}/twitter/user/{twitter_id}"
        url_list.append(url)
    
    print(f"interested users: {len(responses)}\n")

    return url_list

## Helper Function

### Cutoff Time Helper Function

In [6]:
def read_cutoff_time():
    """
    record the newest time at the bottom
    return: datetime object
    """
    try:
        with open(CUTOFF_TIME_FILE, "r") as f:
            lines = f.readlines()
    except FileNotFoundError:
        return use_yesterday_as_cutoff()
        
    if len(lines) == 0:
        return use_yesterday_as_cutoff()
    
    # in case you opened this file and hit some enters
    stripped_lines = [line for line in lines if len(line.strip()) > 0]
    if len(stripped_lines) == 0:
        return use_yesterday_as_cutoff()
        
    cutoff_time = stripped_lines[-1]
    
    # print(f"read, cutoff time is {cutoff_time}")
    
    try:
        # must use the format we defined, strictly
        time_converted = parser.parse(cutoff_time)
    except:
        raise
    
    # print(f"read, time converted is {time_converted}")
    
    return time_converted

def use_yesterday_as_cutoff():
    """
    第一次运行，获取一天前或一小时前的内容，等等，可自定义
    Take care of timezone for international twitter users
    """
    local_tz = tz.tzlocal()
    now = datetime.now(local_tz)
    # fetch contents from 1 hour ago
    # or 1 day ago, etc
    one_day_ago = now - timedelta(hours=1)
    write_cutoff_time(one_day_ago)
    return one_day_ago

def write_cutoff_time(cutoff_time):
    """
    time: str or datetime object
    return: None
            time_str write to file
    """
    if isinstance(cutoff_time, str):
        # test if format is correct
        try:
            # if is str and with correct format
            # print(f"cutoff time in write, is str, is {cutoff_time}")
            time_converted = parser.parse(cutoff_time)
        except:
            raise
        
        time_str = cutoff_time
        
    elif isinstance(cutoff_time, datetime):
        # must be timezone aware
        # already checked this, able to print out timezone, if input has tz
        TIME_RECORD_FORMAT="%Y-%m-%d %H:%M:%S %Z"
        time_str = cutoff_time.strftime(TIME_RECORD_FORMAT)
    else:
        raise("not str or datetime.datetime")
    
    # print(f"writing, time str is {time_str}")
    
    # overwrite everything in the file
    with open(CUTOFF_TIME_FILE, "w") as f:
        f.write(time_str)


### Time Format Helper Function

In [7]:
def twitter_rss_time_converter(datetime_string:str) -> datetime:
    aware_datetime = parser.parse(datetime_string)
    return aware_datetime

### Filter Function

In [8]:
def filter_sort_twitter_entries(entries):
    cutoff_time = read_cutoff_time()

    # filtered_entries = list(filter(lambda x: twitter_rss_time_converter(x['published']) > cutoff_time, entries))
    # or simpler
    
    filtered_entries = [x for x in entries if twitter_rss_time_converter(x['published']) > cutoff_time]
    
    print(f"Cutoff Time: {cutoff_time}\n")
    # print([x.published for x in filtered_entries])
    if len(filtered_entries) > 0:
        print(f"After Filter, {len(filtered_entries)} items will be sent to bot.\n")
    
    sorted_results = sorted(filtered_entries, key=lambda x: twitter_rss_time_converter(x['published']), reverse=True)
    return sorted_results


### Logging

In [9]:
def log_time():
    """
    todo, might log more info
    """
    with open(LOG_FILE, "a") as f:
        f.write(f"now is {datetime.now()}; newest cutoff time is {newest_time_str}\n")


## Async Fetching URLs

### Async Fetch

In [10]:
# with this, able to run event loop in Jupyter
%autoawait

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            task = asyncio.ensure_future(fetch(session, url))
            tasks.append(task)
        responses = await asyncio.gather(*tasks)
        return responses
    
async def fetch_twitter_entries():
    # print(f"async, start fetching\n")
    urls = read_twitter_user_url_list()
    
    print(f"Fetching ...\n")

    responses = await fetch_all(urls)
    # print(f"After Async Running, Fetched Users: {len(responses)}\n")
    # print(f"Fetched Users: {len(responses)}\n")

    entries = []
    for i, response in enumerate(responses):
        feed = feedparser.parse(response)
        entries += feed.entries
    
    return entries



IPython autoawait is `on`, and set to use `asyncio`


## Format Telegram Bot Messages

### Bot Message Formatting

In [11]:
def bot_message_from_entrie(item):
    author = item["author"]
    title = item["title"]
    link = item["link"]
    pub_date_parsed = parser.parse(item["published"])
    description = parse_html_from_rss(item["description"])

    message = (
        f"{author}  {pub_date_parsed}\n"
        f"{description}\n"  
        f"{link}"
    )
    
    return message

def parse_html_from_rss(description_html):
    soup = BeautifulSoup(description_html, 'html.parser')
    # Convert div with class rsshub-quote
    rsshub_quotes = soup.find_all('div', class_='rsshub-quote')
    for rsshub_quote in rsshub_quotes:
        rsshub_quote.string = f"\n&gt; {rsshub_quote.get_text(separator=' ', strip=True)}\n\n"

    for br in soup.find_all('br'):
        br.replace_with('\n')

    description = "\n".join(soup.stripped_strings)
    
    return description


## Filter Content and Send Messages

### Filter Twitter Entries

In [12]:
async def telegram_message_list_to_send():
    
    # todo, maybe log twitter account as well
    log_time()
    
    entries = await fetch_twitter_entries()
    print(f"Fetched twitter: {len(entries)}\n")
                               
    filtered_entries = filter_sort_twitter_entries(entries)
    
    if len(filtered_entries) > 0:
        global newest_time_str
        newest_time_str = filtered_entries[0]['published']
        message_list = [bot_message_from_entrie(x) for x in filtered_entries]
        return message_list
    else:
        # print(f"no new twitter entry")
        return []


### Send Bot Message

In [13]:
async def send_to_telegram_bot():
    bot = Bot(
        token=TOKEN,
        base_url=TELEGRAM_API_BASE_URL,
    )
    
    ml = await telegram_message_list_to_send()
    
    sleep_time_msg = f"Now sleep time, next run will be after {wait_interval} seconds.\n"
    
    if len(ml) == 0:
        # nothing to do
        print(f"No Messages to Send.\n")
        print(sleep_time_msg)
        return
    
    print(f"Sending Telegram Bot Messages\n")
    
    # todo, record to log file and later send to AI

    global newest_time_str
    
    try:
        for message in ml:
            bot.send_message(
                chat_id=target_chat_id, 
                text=message,
                timeout=10,
            )  
    
        # update cutoff time
        write_cutoff_time(newest_time_str)
        print(f"Cutoff time updated to: {newest_time_str}\n")
        print(sleep_time_msg)

    except:
        raise
    


## Task Management

### Start Task

In [14]:
# Here in Jupyter-lab, do not use asyncio.run
# this will have conflict with Jupyter
# use %autoawait is the solution

cancel_event = asyncio.Event()

async def main(cancel_event):    
    try:
        while not cancel_event.is_set():
            await send_to_telegram_bot()
            await asyncio.sleep(wait_interval)
    except asyncio.CancelledError:
        print("Coroutine cancelled.")
    finally:
        print("Coroutine stopped. 程序已结束.")


### Cancel Task

- 在下面出现的输入框中敲击回车，即可停止程序运行
- 或者在输入框中输入任何字符后回车，也可停止

In [15]:
async def cancel_on_keypress(task):
    # print("Press Enter to cancel the task.")
    await asyncio.to_thread(input)
    task.cancel()

task = asyncio.create_task(main(cancel_event))
cancel_task = asyncio.create_task(cancel_on_keypress(task))

try:
    await asyncio.gather(task, cancel_task, return_exceptions=True)
except asyncio.CancelledError:
    pass

Fetching Content ...

Fetched Users: 111

Fetched Entries: 1980

Cutoff Time: 2023-04-18 11:04:09+00:00

After Filter, 417 items will be sent to bot.

Sending Telegram Bot Messages

Cutoff time updated to: Wed, 19 Apr 2023 06:23:25 GMT

Now sleep time, next run will be after 3600 seconds.



 


Coroutine cancelled.
Coroutine stopped. 程序已结束.
