/
__init__.py
101 lines (90 loc) · 2.89 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import bleach
from datasette import hookimpl, __version__
from feedgen.feed import FeedGenerator
import hashlib
import html
REQUIRED_COLUMNS = {"atom_id", "atom_updated", "atom_title"}
@hookimpl
def register_output_renderer():
return {"extension": "atom", "render": render_atom, "can_render": can_render_atom}
def render_atom(args, data, view_name):
from datasette.views.base import DatasetteError
columns = set(data["columns"])
if not REQUIRED_COLUMNS.issubset(columns):
raise DatasetteError(
"SQL query must return columns {}".format(", ".join(REQUIRED_COLUMNS)),
status=400,
)
fg = FeedGenerator()
fg.generator(
generator="Datasette",
version=__version__,
uri="https://github.com/simonw/datasette",
)
sql = data["query"]["sql"]
fg.id(data["database"] + "/" + hashlib.sha256(sql.encode("utf8")).hexdigest())
fg.updated(max(row["atom_updated"] for row in data["rows"]))
title = args.get("_feed_title", sql)
if data.get("table"):
title += "/" + data["table"]
if data.get("human_description_en"):
title += ": " + data["human_description_en"]
fg.title(title)
# And the rows
for row in reversed(data["rows"]):
entry = fg.add_entry()
entry.id(str(row["atom_id"]))
if "atom_content_html" in columns:
entry.content(clean(row["atom_content_html"]), type="html")
elif "atom_content" in columns:
entry.content(row["atom_content"], type="text")
entry.updated(row["atom_updated"])
entry.title(str(row["atom_title"]))
# atom_link is optional
if "atom_link" in columns:
entry.link(href=row["atom_link"])
if "atom_author_name" in columns and row["atom_author_name"]:
author = {
"name": row["atom_author_name"],
}
for key in ("uri", "email"):
colname = "atom_author_{}".format(key)
if colname in columns and row[colname]:
author[key] = row[colname]
entry.author(author)
return {
"body": fg.atom_str(pretty=True),
"content_type": "application/xml; charset=utf-8",
"status_code": 200,
}
def can_render_atom(columns):
return {"atom_id", "atom_title", "atom_updated"}.issubset(columns)
def clean(html):
cleaned = bleach.clean(
html,
tags=[
"a",
"abbr",
"acronym",
"b",
"blockquote",
"code",
"em",
"i",
"li",
"ol",
"strong",
"ul",
"pre",
"p",
"h1",
"h2",
"h3",
"h4",
"h5",
"h6",
"img",
],
attributes={"a": ["href", "title"], "img": ["alt", "src"]},
)
return cleaned