Skip to content

Commit

Permalink
micropub: implement token lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
snarfed committed Sep 20, 2022
1 parent 9ee0985 commit 97fc8e6
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 59 deletions.
43 changes: 39 additions & 4 deletions micropub.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,27 @@
from werkzeug.exceptions import HTTPException

from flask_app import app
from flickr import Flickr
from github import GitHub
from oauth_dropins.flickr import FlickrAuth
from oauth_dropins.github import GitHubAuth
from oauth_dropins.mastodon import MastodonAuth
from oauth_dropins.twitter import TwitterAuth
from mastodon import Mastodon
from publish import PublishBase
import models
from twitter import Twitter
import util
import webmention

logger = logging.getLogger(__name__)

SOURCE_CLASSES = (
(Twitter, TwitterAuth, TwitterAuth.token_secret),
(Mastodon, MastodonAuth, MastodonAuth.access_token_str),
(GitHub, GitHubAuth, GitHubAuth.access_token_str),
(Flickr, FlickrAuth, FlickrAuth.token_secret),
)
RESERVED_PARAMS = ('access_token', 'action', 'q', 'url')
RESERVED_PREFIX = 'mp-'

Expand All @@ -31,20 +45,41 @@ def remove_reserved(params):

class Micropub(PublishBase):
"""Micropub endpoint."""

def load_source(self):
"""Looks up the auth entity by the provided access token."""
auth = request.headers.get('Authorization')
if auth:
parts = auth.split(' ')
if len(parts) != 2 or parts[0] != 'Bearer':
return self.error('Unsupported token format in Authorization header', status=401)
token = parts[1]
else:
token = request.values.get('access_token')

if not token:
return self.error('No token found in Authorization header or access_token param',
status=401)

for src_cls, auth_cls, prop in SOURCE_CLASSES:
auth_entity = auth_cls.query(prop == token).get()
if auth_entity:
return src_cls.query(src_cls.auth_entity == auth_entity.key).get()

return self.error('No user found with that token', status=401)

def dispatch_request(self):
logging.info(f'Params: {list(request.values.items())}')

# TODO: look up token
from tests import testutil
self.source = testutil.FakeSource.query().get()

# Micropub query; currently only config is supported
q = request.values.get('q')
if q == 'config':
return jsonify({})
elif q:
return self.error(error='not_implemented')

self.source = self.load_source()

# handle input
if request.is_json:
mf2 = request.json
Expand Down
120 changes: 65 additions & 55 deletions tests/test_micropub.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,52 @@
"""Unit tests for micropub.py."""
import html
from io import BytesIO

import micropub
from models import Publish, PublishedPage
from . import testutil
from .testutil import AppTest, FakeAuthEntity, FakeSource


class MicropubTest(testutil.AppTest):
class MicropubTest(AppTest):

@classmethod
def setUpClass(cls):
micropub.SOURCE_CLASSES = (
(FakeSource, FakeAuthEntity, FakeAuthEntity.access_token),
) + micropub.SOURCE_CLASSES

def setUp(self):
super().setUp()

self.auth_entity = testutil.FakeAuthEntity(id='0123456789')
self.auth_entity = FakeAuthEntity(id='0123456789', access_token='towkin')
auth_key = self.auth_entity.put()
self.source = testutil.FakeSource(
id='foo.com', features=['publish'], auth_entity=auth_key)
self.source = FakeSource(id='foo.com', features=['publish'],
auth_entity=auth_key)
self.source.put()

def assert_response(self, status=201, **kwargs):
def assert_response(self, status=201, token='towkin', **kwargs):
if token:
kwargs.setdefault('headers', {})['Authorization'] = f'Bearer {token}'

resp = self.client.post('/micropub', **kwargs)

body = resp.get_data(as_text=True)
self.assertEqual(status, resp.status_code,
f'{status} != {resp.status_code}: {body}')
self.assertEqual('http://fake/url', resp.headers['Location'])
self.assertEqual(status, resp.status_code, body)
if status // 100 == 2:
self.assertEqual('http://fake/url', resp.headers['Location'])
return resp

def check_entity(self, url='http://foo', content='foo bar baz',
html_content=None, expected_html=None):
# if html_content is None:
# html_content = content
def check_entity(self, url='http://foo', **kwargs):
self.assertTrue(PublishedPage.get_by_id(url))
publish = Publish.query().get()
self.assertEqual(self.source.key, publish.source)
self.assertEqual('complete', publish.status)
self.assertEqual('post', publish.type)
self.assertEqual('FakeSource post label', publish.type_label())
# if expected_html is None:
# expected_html = (self.post_html % html_content)
# self.assertEqual(expected_html + self.backlink, publish.html)
self.assertEqual({
'id': 'fake id',
'url': 'http://fake/url',
'content': content,
'content': 'foo bar baz',
'granary_message': 'granary message',
}, publish.published)

Expand All @@ -56,26 +61,46 @@ def test_query_source_not_implemented(self):
self.assertEqual({'error': 'not_implemented'}, resp.json)

def test_bad_content_type(self):
resp = self.client.post('/micropub', data='foo', content_type='text/plain')
self.assertEqual(400, resp.status_code)
resp = self.assert_response(status=400, data='foo', content_type='text/plain')
self.assertEqual({
'error': 'invalid_request',
'error_description': 'Unsupported Content-Type text/plain',
}, resp.json)

# def test_no_token(self):
def test_no_token(self):
self.assert_response(status=401, token=None)

def test_invalid_token(self):
self.assert_response(status=401, token='bad', data={'x': 'y'})
self.assert_response(status=401, token=None, data={'x': 'y'},
headers={'Authorization': 'foo bar'})

# def test_invalid_token(self):
def test_token_query_param(self):
self.assert_response(data={
'url': 'http://foo',
'h': 'entry',
'content': 'foo bar baz',
'access_token': 'towkin',
})

def test_already_published(self):
page = PublishedPage(id='http://foo')
Publish(parent=page.key, source=self.source.key, status='complete',
type='post', published={'content': 'foo'}).put()

# def test_already_published(self):
self.assert_response(status=400, data={
'url': 'http://foo',
'h': 'entry',
'content': 'foo bar baz',
})

def test_create_form_encoded(self):
resp = self.assert_response(data={
'url': 'http://foo',
'h': 'entry',
'content': 'foo bar baz',
'url': 'http://foo',
})
self.check_entity()
self.check_entity(content='foo bar baz')

# def test_create_form_encoded_token_param(self):
# resp = self.client.post('/micropub', data={
Expand All @@ -87,19 +112,15 @@ def test_create_form_encoded(self):
# f'201 != {resp.status_code}: {body}')
# self.assertEqual('xyz', resp.headers['Location'])

# def test_create_form_encoded_one_category(self):
# Content-type: application/x-www-form-urlencoded; charset=utf-8

# h=entry
# content=Micropub+test+of+creating+an+h-entry+with+one+category.+This+post+should+have+one+category,+test1
# category=test1

# def test_create_form_encoded_multiple_categories(self):
# Content-type: application/x-www-form-urlencoded; charset=utf-8

# h=entry
# content=Micropub+test+of+creating+an+h-entry+with+categories.+This+post+should+have+two+categories,+test1+and+test2
# category[]=test1category[]=test2
# def test_create_form_encoded_multiple_categories(self):
# resp = self.assert_response(data={
# 'url': 'http://foo',
# 'h': 'entry',
# 'content': 'foo bar baz',
# 'category[]': 'A',
# 'category[]': 'B',
# })
# self.check_entity(content='foo bar baz')

# def test_create_form_encoded_photo_url(self):
# Content-type: application/x-www-form-urlencoded; charset=utf-8
Expand Down Expand Up @@ -229,24 +250,13 @@ def test_create_json(self):
# }
# }

# def test_create_multipart_photo(self):
# multipart/form-data; boundary=553d9cee2030456a81931fb708ece92c

# --553d9cee2030456a81931fb708ece92c
# Content-Disposition: form-data; name="h"

# entry
# --553d9cee2030456a81931fb708ece92c
# Content-Disposition: form-data; name="content"

# Hello World!
# --553d9cee2030456a81931fb708ece92c
# Content-Disposition: form-data; name="photo"; filename="aaronpk.png"
# Content-Type: image/png
# Content-Transfer-Encoding: binary

# ... (binary data) ...
# --553d9cee2030456a81931fb708ece92c--
# def test_create_multipart_photo(self):
# resp = self.assert_response(data={
# 'url': 'http://foo',
# 'h': 'entry',
# 'photo': (BytesIO('photo contents'), 'filename'),
# })
# self.check_entity()

# def test_create_multipart_multiple_photos(self):
# multipart/form-data; boundary=553d9cee2030456a81931fb708ece92c
Expand Down
1 change: 1 addition & 0 deletions tests/testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

class FakeAuthEntity(BaseAuth):
user_json = ndb.TextProperty()
access_token = ndb.StringProperty()


class FakeGrSource(gr_source.Source):
Expand Down

0 comments on commit 97fc8e6

Please sign in to comment.