Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
travislee89 committed Aug 18, 2022
1 parent aaba1dd commit f89f496
Show file tree
Hide file tree
Showing 12 changed files with 330 additions and 0 deletions.
22 changes: 22 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
!.gitignore

# Vim
*.swp
.idea

# Byte-compiled / optimized / DLL files
__pycache__
*.py[cod]

# Database
*.db

# Config
/conf/config.yaml

# venv
/venv/

# macOS
._*
.DS_Store
9 changes: 9 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-

from stackernews import StackerNews


if __name__ == '__main__':
stacker = StackerNews()
stacker.run()
8 changes: 8 additions & 0 deletions conf/config-sample.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

db_name: sqlite:///db/stacker.db

log_name: StackerNews
log_file: log/stacker-news-top.log

tg_chat_id: -1000000000001
tg_token: 123456789:TELEGRAM_BOT_TOKEN_SAMPLE
Empty file added db/.gitkeep
Empty file.
5 changes: 5 additions & 0 deletions lib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# -*- coding: utf-8 -*-

from .utils import session, logger, config, replace
from .tg_bot import telegram_bot_send_text
from .db import Database
78 changes: 78 additions & 0 deletions lib/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# -*- coding: utf-8 -*-

import datetime
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from lib import config

Base = declarative_base()


class Stacker(Base):
"""
The Database model for the Stacker table.
"""
__tablename__ = 'threads'

id = Column(Integer, primary_key=True)
thread_id = Column(Integer)
title = Column(String)
sats = Column(Integer)
comments = Column(Integer)
create_at = Column(DateTime)

def __init__(self, thread_id, title, sats, comments, create_at):
self.thread_id = thread_id
self.title = title
self.sats = sats
self.comments = comments
self.create_at = create_at


class Database(object):
def __init__(self):
self.engine = create_engine(config['db_name'], echo=False)
# create tables
Base.metadata.create_all(self.engine)

# create a Session
session_made = sessionmaker(bind=self.engine)
self.session = session_made()

def add_thread(self, thread_id: int, title: str, sats: int, comments: int):
"""
Add a thread to the database.
:param thread_id: The thread id to add.
:param title: The thread title to add.
:param sats: The thread sats to add.
:param comments: The thread comments to add.
"""
thread_db = self.get_thread(thread_id)
if thread_db:
return
thread = Stacker(thread_id, title, sats, comments, datetime.datetime.now())
self.session.add(thread)
self.session.commit()

def get_thread(self, thread_id: int) -> Stacker:
"""
Get a thread from the database.
:param thread_id: The thread id to get.
"""
# for thread in self.session.query(Stacker).filter(Stacker.title == 'Hello World'):
# print(thread.thread_id, thread.title, thread.sats, thread.comments, thread.create_at)
thread = self.session.query(Stacker).filter_by(thread_id=thread_id).first()
return thread

def del_thread(self, thread_id: int):
"""
Delete a thread from the database.
:param thread_id: The thread id to delete.
"""
thread = self.get_thread(thread_id)
if not thread:
return
self.session.delete(thread)
self.session.commit()
16 changes: 16 additions & 0 deletions lib/tg_bot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-

from lib import session, config

url = 'https://api.telegram.org/bot{}/sendMessage'.format(config['tg_token'])


def telegram_bot_send_text(message):
"""
Send message to telegram channel/chat_id/group via telegram bot.
"""
data = {'chat_id': config['tg_chat_id'],
'text': message,
'parse_mode': 'MarkdownV2'}
response = session.post(url, json=data, headers={'Accept': 'application/json'})
return response.json()
84 changes: 84 additions & 0 deletions lib/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-

import yaml
import logging
import requests
import urllib3
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from requests.packages.urllib3.exceptions import InsecureRequestWarning

replace_map = {'_': '\\_',
'*': '\\*',
'[': '\\[',
']': '\\]',
'(': '\\(',
')': '\\)',
'~': '\\~',
'`': '\\`',
'>': '\\>',
'#': '\\#',
'+': '\\+',
'-': '\\-',
'=': '\\=',
'|': '\\|',
'{': '\\{',
'}': '\\}',
'.': '\\.',
'!': '\\!'
}


def create_session(connections=20, retries=5, backoff_factor=2,
status_forcelist=None, disable_warnings=False) -> requests.Session:
_session = requests.Session()
if disable_warnings is True:
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
urllib3.disable_warnings()
status_forcelist = status_forcelist or (429, 500, 502, 503, 504)
retry = Retry(total=retries, backoff_factor=backoff_factor, status_forcelist=status_forcelist)
adapters = HTTPAdapter(pool_connections=connections, max_retries=retry)
_session.mount('https://', adapters)
return _session


def create_logger(logger_name: str, log_file: str) -> logging.Logger:
_logger = logging.getLogger(logger_name)
_logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
_logger.addHandler(file_handler)
return _logger


def replace(string: str) -> str:
"""
Replace parts of a string based on a dictionary.
This function takes a string a dictionary of
replacement mappings. For example, if I supplied
the string "Hello world.", and the mappings
{"H": "J", ".": "!"}, it would return "Jello world!".
ref: https://core.telegram.org/bots/api#formatting-options
:param string: string to replace characters in.
"""
for character, replacement in replace_map.items():
string = string.replace(character, replacement)
return string


def load_config():
"""
Load config from conf/config.yaml
"""
with open('../conf/config.yaml', 'r') as f:
_config = yaml.safe_load(f)
return _config


config = load_config()
session = create_session()
logger = create_logger(config['log_name'], config['log_file'])
Empty file added log/.gitkeep
Empty file.
5 changes: 5 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
requests>=2.26.0
urllib3>=1.26.9
sqlalchemy>=1.4.40
beautifulsoup4>=4.11.1
PyYAML>=6.0
3 changes: 3 additions & 0 deletions stackernews/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-

from .stacker import StackerNews
100 changes: 100 additions & 0 deletions stackernews/stacker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# -*- coding: utf-8 -*-

from bs4 import BeautifulSoup
from lib import Database, session, logger, replace, telegram_bot_send_text


class StackerNews(object):
def __init__(self):
self.domain = 'https://stacker.news'
self.url = f'{self.domain}/top/posts/day'
self.headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36',
'Referer': self.domain
}
self.message_template = '*{title}*\n_{sats} sats, {comments} comments_\n{url}' # Markdown format
self.session = session
self.db = Database()

self.soup_top = None # BeautifulSoup object
self.threads = [] # BeautifulSoup objects of threads
self.threads_list = [] # List of threads, dicts with keys: id, title, sats, comments
self.threads_send = [] # List of threads to be sent, dicts with keys: id, title, sats, comments

def get_top(self):
"""
Get top page of Stacker News and save it to self.soup_top
"""
logger.info('Getting html from: {}'.format(self.url))
try:
response = self.session.get(self.url, headers=self.headers, timeout=60)
response.encoding = 'utf-8'
self.soup_top = BeautifulSoup(response.text, 'html.parser')
except Exception as e:
logger.error(e)

def get_threads(self):
"""
Get threads from top page of Stacker News and save them to self.threads
"""
logger.info('Getting threads.')
self.threads = self.soup_top.find_all('div', class_='item_hunk__12-LR')
logger.info('Got {} threads.'.format(len(self.threads)))

def get_thread(self):
"""
Decode thread from BeautifulSoup object and save it to self.threads_list
"""
for thread in self.threads:
thread_url = thread.find('a', class_='item_title__3l-8a text-reset mr-2').get('href')
# /items/59757
thread_id = int(thread_url.split('/')[-1])
thread_title = thread.find('a', class_='item_title__3l-8a text-reset mr-2').text
thread_sats = int(thread.find('div', class_='item_other__2N34Y').find('span').text.replace(' sats', ''))
thread_comments = int(thread.find_all('a', class_='text-reset')[1].text.replace(' comments', ''))
thread = {'id': thread_id, 'title': thread_title, 'sats': thread_sats, 'comments': thread_comments}
self.threads_list.append(thread)
logger.info('Decode {} threads.'.format(len(self.threads_list)))

def check_sent(self):
"""
Check if thread is already sent and if not, if not, add it to self.threads_send
"""
for thread in self.threads_list:
thread_db = self.db.get_thread(thread['id'])
if thread_db:
logger.info('Thread "{}" exist in db, skip sending.'.format(thread['id']))
continue
logger.info('Thread "{}" not exist in db, will be sent.'.format(thread['id']))
self.threads_send.append(thread)

def send_threads(self):
"""
Send threads from self.threads_send to telegram
"""
logger.info('There are {} threads to be sent.'.format(len(self.threads_send)))
for thread in self.threads_send:
logger.info('Sending thread "{}".'.format(thread['id']))
logger.info('Adding thread "{}" to db.'.format(thread['id']))
self.db.add_thread(thread['id'], thread['title'], thread['sats'], thread['comments'])
url = '{domain}/items/{thread}'.format(domain=self.domain, thread=thread['id'])
message = self.message_template.format(title=replace(thread['title']),
sats=thread['sats'],
comments=thread['comments'],
url=replace(url))
logger.info('Message "{}" will be sent.'.format(message))
result = telegram_bot_send_text(message)
logger.info('Sending result: {}'.format(result))

def init(self):
self.threads = []
self.threads_list = []
self.threads_send = []

def run(self):
self.init()
self.get_top()
self.get_threads()
self.get_thread()
self.check_sent()
self.send_threads()

0 comments on commit f89f496

Please sign in to comment.