Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NGINX Support for Auth Listener #1

Open
tannertechnology opened this issue Jul 10, 2020 · 2 comments
Open

Add NGINX Support for Auth Listener #1

tannertechnology opened this issue Jul 10, 2020 · 2 comments

Comments

@tannertechnology
Copy link
Owner

No description provided.

@tannertechnology
Copy link
Owner Author

Have gone with email auth for now but we can add an http server later. I can't find any VSA webhooks anyway so there is only the auth to listen for.

@tutume
Copy link

tutume commented Dec 20, 2020

Hello again.

I don't know if it will give you any help on what and how you were thinking to implement it, but I did some google search, copy and past and tweaks and here you are a "sort of working" new VSA_Auth.py working from ../PythonVSA.

U hope it will help... cheers

import configparser
from types import resolve_bases
import requests
import os,sys
import logging
import datetime
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from http.server import HTTPServer, BaseHTTPRequestHandler
from http import HTTPStatus
from time import sleep
import re
from imaplib import IMAP4_SSL
import email
import socket
from PythonVSA import Auth

from http.server import HTTPServer, BaseHTTPRequestHandler, SimpleHTTPRequestHandler
import ssl
import requests
import pathlib
import json
from urllib.parse import urlparse, parse_qs
from datetime import datetime, timedelta
import ipaddress

# def start_server():
#     # Setup stuff here...
#     server.serve_forever()

# # start the server in a background thread
# thread.start_new_thread(start_server)

# print('The server is running but my script is still executing!')


# https://gist.github.com/bloodearnest/9017111a313777b9cce5
def _generateSelfSignedCert(hostname, ip_addresses=None, key=None):
    """Generates self signed certificate for a hostname, and optional IP addresses."""
    from cryptography import x509
    from cryptography.x509.oid import NameOID
    from cryptography.hazmat.primitives import hashes
    from cryptography.hazmat.backends import default_backend
    from cryptography.hazmat.primitives import serialization
    from cryptography.hazmat.primitives.asymmetric import rsa

    # Generate our key
    if key is None:
        key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048,
            backend=default_backend(),
        )

    name = x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, hostname)
    ])

    # best practice seem to be to include the hostname in the SAN, which *SHOULD* mean COMMON_NAME is ignored.    
    alt_names = [x509.DNSName(hostname)]

    # allow addressing by IP, for when you don't have real DNS (common in most testing scenarios 
    if ip_addresses:
        for addr in ip_addresses:
            # openssl wants DNSnames for ips...
            alt_names.append(x509.DNSName(addr))
            # ... whereas golang's crypto/tls is stricter, and needs IPAddresses
            # note: older versions of cryptography do not understand ip_address objects
            alt_names.append(x509.IPAddress(ipaddress.ip_address(addr)))

    san = x509.SubjectAlternativeName(alt_names)

    # path_len=0 means this cert can only sign itself, not other certs.
    basic_contraints = x509.BasicConstraints(ca=True, path_length=0)
    now = datetime.utcnow()
    cert = (
        x509.CertificateBuilder()
        .subject_name(name)
        .issuer_name(name)
        .public_key(key.public_key())
        .serial_number(1000)
        .not_valid_before(now)
        .not_valid_after(now + timedelta(days=10*365))
        .add_extension(basic_contraints, False)
        .add_extension(san, False)
        .sign(key, hashes.SHA256(), default_backend())
    )
    cert_pem = cert.public_bytes(encoding=serialization.Encoding.PEM)
    key_pem = key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.TraditionalOpenSSL,
        encryption_algorithm=serialization.NoEncryption(),
    )

    with open('cert_key.pem','w') as keyFile:
        keyFile.write(key_pem.decode('UTF-8'))

    with open('cert_fullchain.pem','w') as fullchainFile:
        fullchainFile.write(cert_pem.decode('UTF-8') + key_pem.decode('UTF-8'))

class Callback_LocalHTTPS(BaseHTTPRequestHandler):
    def do_GET(self):
        print('GET Request received...')
        try:
            oauthcode = parse_qs(urlparse(self.requestline).query)['code'][0].split(' ')[0]

            data={ "grant_type": "authorization_code",
                    "code": oauthcode,
                    "redirect_uri": redirect_uri,
                    "client_id": client_id,
                    "client_secret": client_secret }

            response = requests.post(f"{vsa_uri}/api/v1.0/authorize",data=data)

            try:
                ktoken = json.loads(response.text)
                config['Auth']['access_token'] = ktoken['access_token']
                config['Auth']['token_type'] = ktoken['token_type']
                config['Auth']['expires_in'] = str(ktoken['expires_in'])
                config['Auth']['refresh_token'] = ktoken['refresh_token']

                with open(fullpath, 'w') as configfile:
                    print(f"{config['Auth']['access_token']}")
                    config.write(configfile)

            except Exception as e:
                print(f"No Token found... {e}")


            self.send_response(HTTPStatus.OK)
            self.send_header("Content-Type", "text/html")
            self.end_headers()
            self.wfile.write(b"OAuth2 Token generated... you can go back to terminal...")

            input("Access Token written on 'config.ini', Press <CTRL>+<C> 2 times to exit")
        except:
            print("No code found...\n")
        return

def doInitialAuth(code, config):
    vsa_uri = config['VSA']['vsa_uri']
    client_id = config['VSA']['client_id']
    client_secret = config['VSA']['client_secret']
    authendpoint = vsa_uri + "/api/v1.0/authorize"
    redirect_uri = config['Listener']['redirect_uri']

    r = requests.post(authendpoint, json={
        "grant_type": "authorization_code",
        "code": code,
        "redirect_uri": redirect_uri,
        "client_id": client_id,
        "client_secret": client_secret})
    print("First refresh token:")
    print(r.text)
    if(r.status_code == 400):
        print("An error has occurred")
        print(r.text)
        exit()
    refreshtoken = r.json()['refresh_token']
    print("Got token:")
    print(refreshtoken)
    print("Response code:")
    print(str(r.status_code))
    config['Auth'] = r.json()
    config['Auth']['refreshed_at'] = datetime.datetime.now().strftime("%Y%m%d%H%M")
    with open(fullpath, 'w') as configfile:
        config.write(configfile)
    Auth.doRefresh(refreshtoken)

def getInfobyMail():
    msg = MIMEMultipart('mixed')
    msg['Subject'] = "PythonVSA Authentication"
    msg['From'] = smtp_emailfrom
    msg['To'] = smtp_emailto
    text = f"""\
        Please follow this link to authenticate your new integration:  {urlforuser}

        Once you have authorized you will be redirected to a page that doesn't load/resolve. Copy the address from your address bar and reply to this email with it."""
    msg.attach(MIMEText(text))

    print(f"Trying to connect to {urlforuser}...")
    smtp_server = smtplib.SMTP(smtp_server, smtp_port)
    smtp_server.ehlo()
    smtp_server.starttls()
    smtp_server.ehlo()
    smtp_server.login(smtp_username, smtp_password)
    smtp_server.sendmail(smtp_emailfrom, smtp_emailto, msg.as_string())
    smtp_server.close()

    print(f"Waiting {imap_refresh_interval} seconds to give a chance to respond.")
    sleep(imap_refresh_interval)

    connection = IMAP4_SSL(imap_server, imap_port)
    connection.login(imap_username, imap_password)
    typ, data = connection.select('INBOX')
    typ, data = connection.search(None, '(UNSEEN)')

    if(data == [b'']):
        print(f"No emails found. Checking again every {imap_refresh_interval} seconds.")
        i = 0
        while i < 5:
            i = i + 1
            print(f"Checking {5 - i} more times.")
            sleep(imap_refresh_interval)
            typ, data = connection.search(None, '(UNSEEN)')
            if(data[0] != b''):
                break

    for num in data[0].split():
        typ, data = connection.fetch(num, '(RFC822)')
        try:
            msg = email.message_from_bytes(data[1][1])
        except(IndexError):
            msg = email.message_from_bytes(data[0][1])

        typ, data = connection.store(num, '+FLAGS', '\\Seen')
        pattern = r'https://.*\/\?code(=[\w\d]{34})'
        pattern1 = r'https://.*\/\?code(=[\w\d]{32})'
        try:
            match = re.match(pattern, msg._payload)
            matchraw = re.match(pattern1, msg._payload)
        except(TypeError):
            print("It appears we have a message with a format we can't understand. Deleting.")
            continue
        if(match):
            code = match.group(1)
            code = code.replace("=3D", "")
            connection.close()
            connection.logout()
            doInitialAuth(code, config)
        elif(matchraw):
            connection.close()
            connection.logout()
            doInitialAuth(code, config)
        else:
            print("Didn't find URL. Deleting.")

# from somewhere I forgot and didn't save found on google
def startLocalHTTPSCallback(config):

    ssl_key = pathlib.Path("./cert_key.pem").expanduser()
    ssl_cert = pathlib.Path("./cert_fullchain.pem").expanduser()

    print("Setting Up Local Callback Server...")
    httpHandlerFB = SimpleHTTPRequestHandler

    # server_address = (listen_ip, server_port)
    # httpd = HTTPServer(server_address, httpHandlerFB)
    httpd = HTTPServer((config['Listener']['listen_ip'], int(config['Listener']['listen_port'])), Callback_LocalHTTPS)
    # I can comment out the following line and it'll work
    httpd.socket = ssl.wrap_socket(httpd.socket, keyfile=ssl_key, certfile=ssl_cert, server_side=True)
    print(f"Server started at localhost: {config['Listener']['listen_ip']}")
    httpd.serve_forever()

if __name__ == "__main__":
    #Dev var
    os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
    logging.getLogger().setLevel(logging.DEBUG)

    # Init config
    config = configparser.ConfigParser()
    fullpath = os.getcwd() + "/PythonVSA/config.ini"
    config.read(fullpath, encoding='utf-8')
    try:
        client_id = config['VSA']['client_id']
        client_secret = config['VSA']['client_secret']
        vsa_uri = config['VSA']['vsa_uri']

        redirect_uri = config['Listener']['redirect_uri']
        listen_ip = config['Listener']['listen_ip']
        listen_port = config['Listener']['listen_port']

        smtp_username = config['Email']['smtp_username']
        smtp_password = config['Email']['smtp_password']
        smtp_emailfrom = config['Email']['smtp_emailfrom']
        smtp_emailto = config['Email']['smtp_emailto']
        smtp_server = config['Email']['smtp_server']
        smtp_port = int(config['Email']['smtp_port'])
        imap_username = config['Email']['imap_username']
        imap_password = config['Email']['imap_password']
        imap_email = config['Email']['imap_email']
        imap_server = config['Email']['imap_server']
        imap_port = int(config['Email']['imap_port'])
        imap_refresh_interval = int(config['Email']['imap_refresh_interval'])

    except(KeyError):
        print("A required variable is missing from the configuration. Please check config.ini. ")
        pass

    print(f"Go to your VSA under System > Server Configuration > OAuth Clients and [+Register Client]")
    print(f"- Choose anything to your 'Client Name', prefer something that will make sense like PythonVSA")
    print(f"- By now, it will work only if you run VSA_Auth.py from your computer, so the 'Redirect_Url' should be https://localhost:8443")
    print(f"- For eMail, use something that is working and you have access and configured in the `config.ini`")
    input(f"When you are ready, please, press <ENTER>")

    urlforuser = vsa_uri + "/vsapres/web20/core/login.aspx?response_type=code&redirect_uri=" + redirect_uri + "&client_id=" + client_id
    print("")
    print("Now, please visit this link and copy the entire resulting URL.")
    print(urlforuser)

    # TODO: This this and configure options to use argparse
    # getInfobyMail()

    # prepare
    print("Generating Temporary SSL Certificates for the local server...")
    _generateSelfSignedCert('localhost')

    startLocalHTTPSCallback(config)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants