diff --git a/ash.py b/ash.py
index bb3aae2..205b602 100644
--- a/ash.py
+++ b/ash.py
@@ -10,6 +10,8 @@
from urllib.parse import urlsplit
from collections.abc import Mapping
+from collections.abc import Iterator
+
import flask
import requests
from elasticsearch import Elasticsearch
@@ -22,10 +24,7 @@ class DefaultConfig:
T_EXTERNAL_TWEETS = False
-app = flask.Flask(
- __name__,
- static_url_path='/tweet/static'
-)
+app = flask.Flask(__name__, static_url_path='/tweet/static')
app.config.from_object(DefaultConfig)
try:
app.config.from_object('config.Config')
@@ -35,7 +34,6 @@ class DefaultConfig:
# Set up external Tweets support
if app.config.get('T_EXTERNAL_TWEETS'):
-
# https://developer.twitter.com/en/docs/basics/authentication/api-reference/token
resp = requests.post(
'https://api.twitter.com/oauth2/token',
@@ -51,7 +49,7 @@ class DefaultConfig:
app.config['T_TWITTER_TOKEN'] = bearer_token
-def toot_to_tweet(status):
+def toot_to_tweet(status: dict) -> dict:
'''Transform toot to be compatible with tweet-interface'''
# Status is a tweet
if status.get('user'):
@@ -82,36 +80,33 @@ def toot_to_tweet(status):
class TweetsDatabase(Mapping):
- def __init__(self, es_host, es_index):
+ def __init__(self, es_host: str, es_index: str) -> None:
self.es = Elasticsearch(es_host)
self.es_index = es_index
- def _search(self, **kwargs):
+ def _search(self, **kwargs) -> Iterator[dict]:
if not kwargs.get('index'):
kwargs['index'] = self.es_index
hits = self.es.search(**kwargs)['hits']['hits']
- tweets = []
for hit in hits:
tweet = hit['_source']
tweet['@index'] = hit['_index']
tweet = toot_to_tweet(tweet)
- tweets.append(tweet)
- return tweets
+ yield tweet
- def __getitem__(self, tweet_id):
+ def __getitem__(self, tweet_id: str | int) -> dict:
resp = self._search(
query={
'term': {
'_id': tweet_id
}
})
- if len(resp) == 0:
- raise KeyError(f'Tweet ID {tweet_id} not found')
- else:
- tweet = resp[0]
- return tweet
+ try:
+ return next(resp)
+ except StopIteration:
+ raise KeyError(f'Tweet ID {tweet_id} not found') from None
- def __iter__(self):
+ def __iter__(self) -> Iterator[int]:
resp = self._search(
sort=['@timestamp'],
#size=1000,
@@ -119,7 +114,7 @@ def __iter__(self):
for tweet in resp:
yield tweet['id']
- def __reversed__(self):
+ def __reversed__(self) -> Iterator[int]:
resp = self._search(
sort=[{
'@timestamp': {'order': 'desc'}
@@ -129,10 +124,10 @@ def __reversed__(self):
for tweet in resp:
yield tweet['id']
- def __len__(self):
+ def __len__(self) -> int:
return self.es.count(index=self.es_index)['count']
- def search(self, *, keyword=None, user_screen_name=None, index=None, limit=100):
+ def search(self, *, keyword=None, user_screen_name=None, index=None, limit=100) -> Iterator[dict]:
keyword_query = {
'simple_query_string': {
'query': keyword,
@@ -142,7 +137,7 @@ def search(self, *, keyword=None, user_screen_name=None, index=None, limit=100):
}
if user_screen_name and '@' in user_screen_name: # Mastodon
screen_name_field = 'account.fqn.keyword'
- else:
+ else: # Twitter
screen_name_field = 'user.screen_name.keyword'
user_query = {
'term': {
@@ -166,7 +161,7 @@ def search(self, *, keyword=None, user_screen_name=None, index=None, limit=100):
)
return resp
- def get_users(self):
+ def get_users(self) -> Iterator[dict]:
agg_name_twitter = 'user_screen_names'
agg_name_mastodon = 'account_fqn'
resp = self.es.search(
@@ -186,16 +181,14 @@ def get_users(self):
},
)
buckets = resp['aggregations'][agg_name_twitter]['buckets'] + resp['aggregations'][agg_name_mastodon]['buckets']
- users = [
- {
+ for bucket in buckets:
+ user = {
'screen_name': bucket['key'],
'tweets_count': bucket['doc_count']
}
- for bucket in buckets
- ]
- return users
+ yield user
- def get_indexes(self):
+ def get_indexes(self) -> Iterator[dict]:
agg_name = 'index_names'
resp = self.es.search(
index=self.es_index,
@@ -208,17 +201,15 @@ def get_indexes(self):
}
},
)
- indexes = [
- {
+ for bucket in resp['aggregations'][agg_name]['buckets']:
+ index = {
'name': bucket['key'],
'tweets_count': bucket['doc_count']
}
- for bucket in resp['aggregations'][agg_name]['buckets']
- ]
- return indexes
+ yield index
-def get_tdb():
+def get_tdb() -> TweetsDatabase:
if not hasattr(flask.g, 'tdb'):
flask.g.tdb = TweetsDatabase(
app.config['T_ES_HOST'],
@@ -228,7 +219,7 @@ def get_tdb():
@app.template_global('get_tweet_link')
-def get_tweet_link(screen_name, tweet_id, original_link=False):
+def get_tweet_link(screen_name: str, tweet_id: str | int, original_link: bool = False) -> str:
if original_link:
return f'https://twitter.com/{screen_name}/status/{tweet_id}'
else:
@@ -236,8 +227,7 @@ def get_tweet_link(screen_name, tweet_id, original_link=False):
@app.template_filter('format_tweet_text')
-def format_tweet_text(tweet):
-
+def format_tweet_text(tweet: dict) -> str:
try:
tweet_text = tweet['full_text']
except KeyError:
@@ -286,15 +276,13 @@ def format_tweet_text(tweet):
# true and has a valid "retweeted_status". Tweets that are ingested via
# Twitter Archive always has "retweeted" set to false (identical to a
# "traditional" RT.
- retweeted_status = tweet.get('retweeted_status')
- if retweeted_status:
+ if retweeted_status := tweet.get('retweeted_status'):
link = get_tweet_link('status', retweeted_status['id'])
a = f'RT'
tweet_text = tweet_text.replace('RT', a, 1)
# Format reblogged toot
- reblogged_status = tweet.get('reblog')
- if reblogged_status:
+ if reblogged_status := tweet.get('reblog'):
status_link = reblogged_status['url']
author = reblogged_status['account']['fqn']
author_link = reblogged_status['account']['url']
@@ -305,7 +293,7 @@ def format_tweet_text(tweet):
@app.template_filter('format_created_at')
-def format_created_at(timestamp, fmt):
+def format_created_at(timestamp: str, fmt: str) -> str:
try:
dt = datetime.strptime(timestamp, '%a %b %d %H:%M:%S %z %Y')
except ValueError:
@@ -317,7 +305,7 @@ def format_created_at(timestamp, fmt):
@app.template_filter('in_reply_to_link')
-def in_reply_to_link(tweet):
+def in_reply_to_link(tweet: dict) -> str:
if tweet.get('account'): # Mastodon
# If this is a self-thread, return local link
if tweet['in_reply_to_account_id'] == tweet['account']['id']:
@@ -329,7 +317,7 @@ def in_reply_to_link(tweet):
return get_tweet_link('status', tweet['in_reply_to_status_id'])
-def replace_media_url(url):
+def replace_media_url(url: str) -> str:
if app.config['T_MEDIA_FROM'] == 'direct':
return url
elif app.config['T_MEDIA_FROM'] == 'filesystem':
@@ -343,6 +331,8 @@ def replace_media_url(url):
return url.replace(orig, repl)
else:
return url
+ else:
+ return url
@app.route('/')
@@ -352,11 +342,9 @@ def root():
@app.route('/tweet/')
def index():
-
tdb = get_tdb()
total_tweets = len(tdb)
- default_user = app.config.get('T_DEFAULT_USER')
- if default_user:
+ if default_user := app.config.get('T_DEFAULT_USER'):
latest_tweets = tdb.search(keyword='*', user_screen_name=default_user, limit=10)
else:
latest_tweets = [tdb[tid] for tid in itertools.islice(reversed(tdb), 10)]
@@ -372,10 +360,8 @@ def index():
@lru_cache(maxsize=1024)
-def fetch_tweet(tweet_id):
-
+def fetch_tweet(tweet_id: int | str) -> dict:
token = app.config['T_TWITTER_TOKEN']
-
resp = requests.get(
'https://api.twitter.com/1.1/statuses/show.json',
headers={
@@ -395,7 +381,6 @@ def fetch_tweet(tweet_id):
@app.route('/tweet/.')
def get_tweet(tweet_id, ext):
-
if ext not in ('txt', 'json', 'html'):
flask.abort(404)
@@ -424,7 +409,7 @@ def get_tweet(tweet_id, ext):
# HTML output
- # Extract list images
+ # Extract media
images = []
videos = []
try:
@@ -434,7 +419,7 @@ def get_tweet(tweet_id, ext):
entities = tweet['entities']
media = entities.get('media', [])
for m in media:
- # type = video
+ # type is video
if m.get('type') == 'video':
variants = m['video_info']['variants']
hq_variant = max(variants, key=lambda v: v.get('bitrate', -1))
@@ -444,7 +429,7 @@ def get_tweet(tweet_id, ext):
videos.append({
'url': media_url,
})
- # type = photo
+ # type is photo
elif m.get('type') == 'photo':
media_url = m['media_url_https']
if not _is_external_tweet:
@@ -453,7 +438,7 @@ def get_tweet(tweet_id, ext):
'url': media_url,
'description': m.get('description', '')
})
- # type = unknown
+ # type is unknown
else:
pass
@@ -470,12 +455,12 @@ def get_tweet(tweet_id, ext):
@app.route('/tweet/media/')
-def get_media_from_filesystem(fs_path):
+def get_media_from_filesystem(fs_path: str):
return flask.send_from_directory(app.config['T_MEDIA_FS_PATH'], fs_path)
@app.route('/tweet/search.')
-def search_tweet(ext):
+def search_tweet(ext: str):
if ext not in ('html', 'txt', 'json'):
flask.abort(404)
@@ -491,9 +476,8 @@ def search_tweet(ext):
indexes = tdb.get_indexes()
user = flask.request.args.get('u', '')
- keyword = flask.request.args.get('q', '')
index = flask.request.args.get('i', '')
- if keyword:
+ if keyword := flask.request.args.get('q', ''):
tweets = tdb.search(
keyword=keyword,
user_screen_name=user,