Skip to content
Permalink
Browse files

WIP configuration screen

Some really nasty hacks in this...

* I am inspecting the stack to find variables needed to render a template
* I copied and pasted across classes so I can run unit tests against a full Datasette instance
  • Loading branch information
simonw committed Oct 3, 2019
1 parent 885d4e1 commit c0e3bd9556d7b31f253a8bf666d42205cd24f4fc
Showing with 194 additions and 16 deletions.
  1. +80 −9 datasette_atom/__init__.py
  2. +14 −0 datasette_atom/templates/configure_atom.html
  3. +2 −5 setup.py
  4. +8 −2 tests/test_atom.py
  5. +90 −0 tests/utils.py
@@ -2,6 +2,10 @@
from feedgen.feed import FeedGenerator
import hashlib
import html
import inspect


PARAMETER_NAMES = {"atom_title": "Entry title"}


@hookimpl
@@ -10,11 +14,14 @@ def register_output_renderer(datasette):


def render_atom(args, data, view_name):
# print(args)
# import json
# print(json.dumps(data, default=repr, indent=2))
request = get_variable_from_stack("request")
datasette = get_variable_from_stack("ds", True)
fg = FeedGenerator()
fg.generator(generator="Datasette", version=__version__, uri="https://github.com/simonw/datasette")
fg.generator(
generator="Datasette",
version=__version__,
uri="https://github.com/simonw/datasette",
)
sql = data["query"]["sql"]
fg.id(data["database"] + "/" + hashlib.sha256(sql.encode("utf8")).hexdigest())
fg.subtitle(sql)
@@ -24,18 +31,37 @@ def render_atom(args, data, view_name):
if data.get("human_description_en"):
title += ": " + data["human_description_en"]
fg.title(title)

import json

print(json.dumps(data, default=repr, indent=4))

# atom:id - for tables, this is the database/table/rowid - but for arbitrary queries
# we instead use a sha256 of the row contents unless ?_atom_id= is provided

# atom:updated - if there is an obvious candidate based on column name + content we use
# that, otherwise we require ?_atom_updated=

# atom:title - we require ?_atom_title= for this. Later we will try to autodetect it
atom_title = args.get("_atom_title")
if not atom_title or atom_title not in data["columns"]:
return prompt_for_parameters(datasette, ["atom_title"], data["columns"])

# atom:content - if ?_atom_content= is there, use it - otherwise HTML of all key/pairs

# And the rows
for row in data["rows"]:
entry = fg.add_entry()
entry.id(repr(list(row)))
entry.content(build_content(row), type="html")
entry.title(repr(list(row)))
entry.title(str(row[atom_title]))
if dict(args).get("_rss"):
# Link is required for RSS:
fg.link(href="https://example.com/")
fg.link(href=request.url)
body = fg.rss_str(pretty=True)
else:
body = fg.atom_str(pretty=True)

return {
"body": body,
"content_type": "application/xml; charset=utf-8",
@@ -45,12 +71,57 @@ def render_atom(args, data, view_name):

def build_content(row):
bits = []
for key, value in row.items():
for key, value in dict(row).items():
if isinstance(value, dict) and {"value", "label"} == set(value.keys()):
value = '{} <span style="color: #666; font-size: 0.8em">({})</span>'.format(
html.escape(value["label"]), html.escape(str(value["value"]))
)
else:
value = html.escape(str(value))
bits.append('<p><strong>{}</strong>: {}</p>'.format(html.escape(str(key)), value))
return repr('\n'.join(bits))
bits.append(
"<p><strong>{}</strong>: {}</p>".format(html.escape(str(key)), value)
)
return repr("\n".join(bits))


def get_variable_from_stack(name, try_on_self=False):
# This is a work-around until Datasette is updated to make these
# objects available to the register_output_renderer render callback
frame = inspect.currentframe()
while frame:
print(frame.f_locals.keys())
if "self" in frame.f_locals:
print(" self = ", frame.f_locals["self"])
if name in frame.f_locals:
return frame.f_locals[name]
elif (
try_on_self
and "self" in frame.f_locals
and hasattr(frame.f_locals["self"], name)
):
return getattr(frame.f_locals["self"], name)
else:
frame = frame.f_back
return None


def prompt_for_parameters(datasette, parameters, columns):
return {
"body": render_template(
datasette,
"configure_atom.html",
{
"parameters": parameters,
"columns": columns,
"PARAMETER_NAMES": PARAMETER_NAMES,
},
),
"content_type": "text/html; charset=utf-8",
"status_code": 400,
}


def render_template(datasette, template_name, context=None):
context = context or {}
template = datasette.jinja_env.select_template([template_name])
return template.render(context)
@@ -0,0 +1,14 @@
{% extends "base.html" %}

{% block content %}
<h1>Configure this Atom feed</h1>

{% for parameter in parameters %}
<h2>{{ PARAMETER_NAMES[parameter] }}</h2>
{% for column in columns %}
<a href="?_{{ parameter }}={{ column|urlencode }}">{{ column }}</a>{% if not loop.last %}, {% endif %}
{% endfor %}

{% endfor %}

{% endblock %}
@@ -24,10 +24,7 @@ def get_long_description():
packages=["datasette_atom"],
entry_points={"datasette": ["atom = datasette_atom"]},
install_requires=["datasette", "feedgen"],
extras_require={
"test": [
"pytest"
]
},
extras_require={"test": ["pytest"]},
tests_require=["datasette-atom[test]"],
package_data={"datasette_atom": ["templates/*.html"]},
)
@@ -1,2 +1,8 @@
def test_atom():
assert False
from .utils import make_app_client


def test_missing_parameters_produces_400_page():
app = make_app_client()
response = app.get("/:memory:.atom?sql=select+sqlite_version()")
assert 400 == response.status
assert "text/html; charset=utf-8" == response.headers["content-type"]
@@ -0,0 +1,90 @@
import json
from urllib.parse import quote, unquote

from asgiref.sync import async_to_sync
from asgiref.testing import ApplicationCommunicator
from datasette.app import Datasette


class Response:
def __init__(self, status, headers, body):
self.status = status
self.headers = headers
self.body = body

@property
def json(self):
return json.loads(self.text)

@property
def text(self):
return self.body.decode("utf8")


class Client:
max_redirects = 5

def __init__(self, asgi_app):
self.asgi_app = asgi_app

@async_to_sync
async def get(self, path, allow_redirects=True, redirect_count=0, method="GET"):
return await self._get(path, allow_redirects, redirect_count, method)

async def _get(self, path, allow_redirects=True, redirect_count=0, method="GET"):
query_string = b""
if "?" in path:
path, _, query_string = path.partition("?")
query_string = query_string.encode("utf8")
if "%" in path:
raw_path = path.encode("latin-1")
else:
raw_path = quote(path, safe="/:,").encode("latin-1")
scope = {
"type": "http",
"http_version": "1.0",
"method": method,
"path": unquote(path),
"raw_path": raw_path,
"query_string": query_string,
"headers": [[b"host", b"localhost"]],
}
instance = ApplicationCommunicator(self.asgi_app, scope)
await instance.send_input({"type": "http.request"})
# First message back should be response.start with headers and status
messages = []
start = await instance.receive_output(2)
messages.append(start)
assert start["type"] == "http.response.start"
headers = dict(
[(k.decode("utf8"), v.decode("utf8")) for k, v in start["headers"]]
)
status = start["status"]
# Now loop until we run out of response.body
body = b""
while True:
message = await instance.receive_output(2)
messages.append(message)
assert message["type"] == "http.response.body"
body += message["body"]
if not message.get("more_body"):
break
response = Response(status, headers, body)
if allow_redirects and response.status in (301, 302):
assert (
redirect_count < self.max_redirects
), "Redirected {} times, max_redirects={}".format(
redirect_count, self.max_redirects
)
location = response.headers["Location"]
return await self._get(
location, allow_redirects=True, redirect_count=redirect_count + 1
)
return response


def make_app_client():
ds = Datasette([], immutables=[], memory=True)
client = Client(ds.app())
client.ds = ds
return client

0 comments on commit c0e3bd9

Please sign in to comment.
You can’t perform that action at this time.