In [1]:
%load_ext autoreload
%autoreload 2
# default_exp plugin.authenticators.password

In [2]:
# export 
# hide

import abc
from time import sleep
from pymemri.plugin.states import RUN_USER_ACTION_NEEDED, RUN_USER_ACTION_COMPLETED
from pymemri.cvu.utils import get_cvu
from pymemri.data.schema import CVUStoredDefinition
import time

In [3]:
# export
LOGIN_CVU = "passwordAuth.cvu"

## Password Authenticator

Password Authenticator provides an easy interface to setup standard username-password authentication with 3rd party services.

Simply calling `authenticate()` should load the related account with required credentials.

Inheriting class should implement:
- get_token() that tests username-password combination or gets a new session token to be used for future calls


In [4]:
# export
# hide

class PasswordAuthenticator:
    MAX_LOGIN_ATTEMPTS = 10
    SLEEP_INTERVAL = 1.0
    MAX_POLL_TIME = 10

    def __init__(self, client, pluginRun):
        self.client = client
        self.pluginRun = pluginRun
        self.isTest = False

    def authenticate(self, plugin):
        self.request_user_credentials()

        login_success = False
        for _ in range(self.MAX_LOGIN_ATTEMPTS):
            username, password = self.poll_credentials()
            try:
                plugin.login(username, password)
                login_success = True
                break
            except Exception:
                print("Login failed, invalid credentials.")
                login_success = False
                
        if not login_success:
            self.pluginRun.status = "error"
            self.client.update_item(self.pluginRun)
            raise RuntimeError("Reached max login attempts.")
            
    def request_user_credentials(self):
        cvu = get_cvu(LOGIN_CVU)
        self.client.create(cvu)
        self.pluginRun.add_edge("view", cvu)
        self.client.create_edge(self.pluginRun.get_edges("view")[0])
        self.pluginRun.status = "userActionNeeded"
        self.client.update_item(self.pluginRun)

    def poll_credentials(self):
        # request username and password from the user client
        # WAIT HERE = BLOCK
        start_time = time.time()
        while True:
            if time.time() - start_time > self.MAX_POLL_TIME:
                raise RuntimeError("Stop polling, max time reached.")
            print("polling for credentials...")
            sleep(self.SLEEP_INTERVAL)
            self.pluginRun = self.client.get(self.pluginRun.id)
            if self.pluginRun.status == RUN_USER_ACTION_COMPLETED:
                account = self.pluginRun.account[0]
                return account.identifier, account.secret


In [5]:
from pymemri.plugin.pluginbase import PluginBase
from pymemri.plugin.schema import PluginRun, Account
from pymemri.pod.client import PodClient
import threading

class MyAuthenticatedPlugin(PluginBase):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.logged_in = False
        self.authenticator = PasswordAuthenticator(kwargs["client"], kwargs["pluginRun"])
        
    def login(self, username, password):
        if not (username=="username" and password=="password"):
            raise ValueError("Wrong credentials.")
            
    def run(self):
        self.authenticator.authenticate(self)
        self.logged_in = True
        print("done!")
    
    def add_to_schema(self):
        pass

pod_client = PodClient()

run = PluginRun("", "", "")
account = Account(service="myAuthenticatedPlugin")
run.add_edge("account", account)
run.status = "start"

pod_client.create(run)
pod_client.create(account)
pod_client.create_edge(run.get_edges("account")[0])

True

In [6]:
# Create Plugin
plugin = MyAuthenticatedPlugin(client=pod_client, pluginRun=run)   

In [7]:
# Start plugin in background thread
def run_thread():
    plugin.run()
    assert plugin.logged_in
    
thread = threading.Thread(target=run_thread)
thread.start()

In [8]:
# Enter password and check if plugin is authenticated

def simulate_enter_password(pod_client, run_id):
    run = pod_client.get(run_id)
    account = run.account[0]

    username = "username"
    password = "password"
    account.identifier = username
    account.secret = password
    run.status = "ready"

    pod_client.update_item(account)
    pod_client.update_item(run)

simulate_enter_password(pod_client, run.id)    
time.sleep(4)
assert plugin.logged_in

polling for credentials...
done!
