In [None]:
# oauth_server.py

from flask import Flask, request, jsonify, redirect, session, render_template_string
from authlib.integrations.flask_oauth2 import AuthorizationServer, ResourceProtector, current_token
from authlib.oauth2.rfc6749.grants import AuthorizationCodeGrant
from authlib.oauth2.rfc6750 import BearerTokenValidator, BearerToken
from datetime import datetime, timedelta
import json

app = Flask(__name__)
app.secret_key = 'secret_key'


class Client:
    def __init__(self, client_id, client_secret, redirect_uris, response_types, auth_methods, grant_types, allowed_scope):
        self.client_id = client_id
        self.client_secret = client_secret
        self.redirect_uris = redirect_uris
        self.response_types = response_types
        self.auth_methods = auth_methods
        self.grant_types = grant_types
        self.allowed_scope = allowed_scope

    def check_client_secret(self, client_secret):
        """Check if the provided client secret is correct"""
        return self.client_secret == client_secret

    def check_redirect_uri(self, redirect_uri):
        """Check if the provided redirect URI is allowed"""
        return redirect_uri in self.redirect_uris

    def check_response_type(self, response_type):
        """Check if the provided response type is allowed"""
        return response_type in self.response_types

    def check_endpoint_auth_method(self, method, endpoint):
        """Check if the provided authentication method is allowed for the given endpoint"""
        return method in self.auth_methods

    def check_grant_type(self, grant_type):
        """Check if the provided grant type is allowed"""
        return grant_type in self.grant_types

    def get_allowed_scope(self, scope):
        """Get the allowed scope for the client, restricting it to the allowed scope"""
        if scope is None:
            return None
        allowed = set(self.allowed_scope.split())
        return ' '.join(sorted(set(scope.split()) & allowed))


class Token:
    def __init__(self, token_data):
        self.user = token_data['user']
        self.expires_at = token_data['expires_at']
        self.revoked = token_data['revoked']

    def is_expired(self):
        """Check if the token is expired"""
        return self.expires_at < datetime.utcnow()

    def get_scope(self):
        """Get the scope of the token"""
        return 'profile'  # For simplicity, assuming 'profile' as the only scope

    def is_revoked(self):
        """Check if the token is revoked"""
        return self.revoked


class User:
    def __init__(self, user_id, username):
        self.user_id = user_id
        self.username = username

    def get_user_id(self):
        """Get the user's ID"""
        return self.user_id


# Sample in-memory database
db = {
    'auth_codes': {},
    'tokens': {},
    'clients': {
        'client_1': Client(
            'client_1', 'secret_1', 
            ['http://localhost/callback'], 
            ['code'],
            ['client_secret_basic'],
            ['authorization_code'],
            'profile'
        )
    },
    'users': {
        'user_1': User('user_1', 'testuser'),
    },
}


def query_client(client_id):
    """Query the client by ID"""
    return db['clients'].get(client_id)

def save_token(token, request):
    """Save the generated token"""
    db['tokens'][token['access_token']] = {
        'user': request.user,
        'client_id': request.client.client_id,
        'scope': token['scope'],
        'expires_at': datetime.utcnow() + timedelta(seconds=token['expires_in']),
        'revoked': False,
    }

# Initialize Authorization Server
server = AuthorizationServer(app, query_client=query_client, save_token=save_token)
require_oauth = ResourceProtector()

class MyAuthorizationCodeGrant(AuthorizationCodeGrant):
    class AuthorizationCode:
        def __init__(self, code, client_id, redirect_uri, scope, user_id):
            self.code = code
            self.client_id = client_id
            self.redirect_uri = redirect_uri
            self.scope = scope
            self.user_id = user_id

        def get_redirect_uri(self):
            """Get the redirect URI for the authorization code"""
            return self.redirect_uri

        def get_scope(self):
            """Get the scope for the authorization code"""
            return self.scope

    def save_authorization_code(self, code, request):
        """Save the authorization code"""
        db['auth_codes'][code] = {
            'user_id': request.user.get_user_id(),
            'client_id': request.client.client_id,
            'redirect_uri': request.redirect_uri,
            'scope': request.scope
        }
        return code

    def query_authorization_code(self, code, client):
        """Query the authorization code by code and client"""
        item = db['auth_codes'].get(code)
        if item and item['client_id'] == client.client_id:
            return self.AuthorizationCode(
                code=code,
                client_id=item['client_id'],
                redirect_uri=item['redirect_uri'],
                scope=item['scope'],
                user_id=item['user_id']
            )

    def delete_authorization_code(self, authorization_code):
        """Delete the authorization code after use"""
        if authorization_code.code in db['auth_codes']:
            del db['auth_codes'][authorization_code.code]

    def authenticate_user(self, authorization_code):
        """Authenticate the user associated with the authorization code"""
        user_id = authorization_code.user_id
        return db['users'].get(user_id)

class MyBearerTokenValidator(BearerTokenValidator):
    def authenticate_token(self, token_string):
        """Validate the bearer token"""
        token_data = db['tokens'].get(token_string)
        if token_data:
            token = Token(token_data)
            if not self.token_revoked(token):
                return token
        return None

    def token_revoked(self, token):
        """Check if the token is revoked"""
        return token.is_revoked()


# Register the token validator
require_oauth.register_token_validator(MyBearerTokenValidator())

@app.route('/authorize', methods=['GET', 'POST'])
def authorize():
    """Handle the authorization request"""
    user = db['users'].get('user_1')
    if request.method == 'GET':
        # Assuming user is logged in
        session['client_id'] = request.args['client_id']
        session['redirect_uri'] = request.args['redirect_uri']
        session['scope'] = request.args['scope']
        session['state'] = request.args.get('state')
        return render_template_string('''
            <form method="post">
                <label>Authorize?</label>
                <input type="submit" value="Authorize">
            </form>
        ''')
    elif request.method == 'POST':
        # User has authorized
        grant_user = user
        return server.create_authorization_response(grant_user=grant_user)

@app.route('/token', methods=['POST'])
def issue_token():
    """Handle the token issuance request"""
    return server.create_token_response(request)

@app.route('/api')
@require_oauth('profile')
def api_me():
    """Handle the API request"""
    return jsonify(user='me')

if __name__ == '__main__':
    server.register_grant(MyAuthorizationCodeGrant)
    app.run()


 * Serving Flask app "__main__" (lazy loading)
 * Environment: production
[2m   Use a production WSGI server instead.[0m
 * Debug mode: off


 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
127.0.0.1 - - [06/May/2024 13:36:50] "GET /authorize?response_type=code&client_id=client_1&redirect_uri=http://localhost/callback&scope=profile HTTP/1.1" 200 -
127.0.0.1 - - [06/May/2024 13:36:52] "POST /authorize?response_type=code&client_id=client_1&redirect_uri=http://localhost/callback&scope=profile HTTP/1.1" 302 -
127.0.0.1 - - [06/May/2024 13:37:06] "POST /token HTTP/1.1" 200 -
127.0.0.1 - - [06/May/2024 13:37:19] "GET /api HTTP/1.1" 200 -


In [None]:
pip install importlib_metadata --force-reinstall

In [None]:
pip list installed