In [1]:
from flask import Flask, json, request, render_template, current_app, url_for
from flask import abort, Blueprint,  redirect, session
from flask_cors import CORS, cross_origin
import cx_Oracle
import requests
import datetime
import time
#import getpass

In [2]:

from saml2 import (
    BINDING_HTTP_POST,
    BINDING_HTTP_REDIRECT
)
from saml2.client import Saml2Client
from saml2.config import Config as Saml2Config


app = Flask(__name__)


class SAML_SP:
    last_metadata_load = 0
    client = None

    def __init__(self, entity_id, metadata_url, acs_func, xmlsec_path = None):
        self.entity_id = entity_id
        self.metadata_url = metadata_url
        self.acs_func = acs_func
        self.xmlsec_path = xmlsec_path

    def _get_saml_client(self):
        """
        Given the name of an IdP, return a configuation.
        The configuration is a hash for use by saml2.config.Config
        """

        acs_url = url_for(
            self.acs_func,
            _external=True)

        https_acs_url = url_for(
            self.acs_func,
            _external=True,
            _scheme='https')

        # SAML metadata changes very rarely. On a production system,
        # this data should be cached as approprate for your production system.
        rv = requests.get(self.metadata_url)

        current_app.logger.debug('rv.rext: %s', rv.text)

        settings = {
            "logging": {
                "version": 1,
                "formatters": {
                    "simple": {
                        "format": "[%(asctime)s] [%(levelname)s] [%(name)s.%(funcName)s] %(message)s",
                    },
                },
                "handlers": {
                    "stdout": {
                        "class": "logging.StreamHandler",
                        "stream": "ext://sys.stdout",
                        "level": "DEBUG",
                        "formatter": "simple",
                    },
                },
                "loggers": {
                    "saml2": {
                        "level": "DEBUG"
                    },
                },
                "root": {
                    "level": "DEBUG",
                    "handlers": [
                        "stdout",
                    ],
                },
            },
            'entityid': self.entity_id,
            'metadata': {
                'inline': [rv.text],
            },
            'service': {
                'sp': {
                    'endpoints': {
                        'assertion_consumer_service': [
                            (acs_url, BINDING_HTTP_REDIRECT),
                            (acs_url, BINDING_HTTP_POST),
                            (https_acs_url, BINDING_HTTP_REDIRECT),
                            (https_acs_url, BINDING_HTTP_POST)
                        ],
                    },
                    # Don't verify that the incoming requests originate from us via
                    # the built-in cache for authn request ids in pysaml2
                    'allow_unsolicited': True,
                    # Don't sign authn requests, since signed requests only make
                    # sense in a situation where you control both the SP and IdP
                    'authn_requests_signed': False,
                    'logout_requests_signed': True,
                    'want_assertions_signed': True,
                    'want_response_signed': False
                }
            },
        }

        if self.xmlsec_path:
            settings['xmlsec_path'] = self.xmlsec_path
            print (self.xmlsec_path)

        current_app.logger.info('settings: %s', settings)

        saml2_config = Saml2Config()
        saml2_config.load(settings)
        saml2_config.allow_unknown_attributes = True

        saml2_client = Saml2Client(config=saml2_config)

        return saml2_client

    def get_saml_client(self):
        if time.time() > self.last_metadata_load or self.client is None:
            self.client = self._get_saml_client()
        return self.client


    def acs(self):
        try:
            client = self.get_saml_client()

            current_app.logger.debug('request.form: %s', request.form)

            authn_response = client.parse_authn_request_response(
                request.form['SAMLResponse'],
                BINDING_HTTP_POST
            )

            current_app.logger.info('authn_response: %s', authn_response)

            authn_response.get_identity()

            subject = authn_response.get_subject()

            current_app.logger.info('subject: %s', subject)

            user_id = subject.text

            return user_id, authn_response.ava
        except Exception as e:
            current_app.logger.exception('Exception raised during SAML SSO login')
            return None

    def login(self):
        client = self.get_saml_client()
        reqid, info = client.prepare_for_authenticate()

        current_app.logger.info('reqid: %s', reqid)
        current_app.logger.info('info: %s', info)

        redirect_url = None

        # Select the IdP URL to send the AuthN request to
        _, redirect_url = next(filter(lambda k_v: k_v[0] == 'Location', info['headers']))

        current_app.logger.info('redirect_url: %s', redirect_url)

        return redirect_url


app.config.from_mapping(
    SECRET_KEY='dev',
    SAML_CLIENT = SAML_SP(
        'https://hello.avc.edu', # Entity ID
        'https://portalguard.test.avc.edu/sso/Metadata.ashx', # Metadata URL
        'saml_sso' # return from login function
    )
)

cors = CORS(app, resources={
    r"/*": {
        "origins": "*"
    }
})




class OpeningDayReg:
    def __init__(self):

        self.user = 'opening_day_tracking' # getpass.getpass('user:')  # 'generic_user'
        self.password = 'N07h1ng1553cur3bu7l0ngp455!' # getpass.getpass('password:')   # 'N07h1ng1553cur3bu7l0ngp455!'
        self.dsn_tns = cx_Oracle.makedsn('bandb-at1.test.avc.edu', '1521', 'avcprep')
        self.engine_connection = self.connection()

    def connection(self):
        try:
            _connection = cx_Oracle.connect(self.user, self.password,
                                            dsn=self.dsn_tns,
                                            encoding="UTF-8")
            return _connection

        except Exception as e:
            print(e)

    def execute_sql_query(self, _query):
        print(_query)
        with self.engine_connection.cursor() as _cur:
            _cur.execute(_query)
            app_data = _cur.fetchall()

        return app_data

    def execute_sql_insert(self, insert_statement):
        print(insert_statement)
        with self.engine_connection.cursor() as _cur:
            try:
                _cur.execute(insert_statement)
                _result = self.engine_connection.commit()
            except Exception as e:
                self.engine_connection.rollback()
                return e

        return _result

    def has_user_logged_this_year(self, user_name):
        results_logged_in_this_year = self.execute_sql_query(f"select USER_NAME,LOGIN_TIME from AVC.OPENING_DAY_LOGIN where TO_CHAR(SYSDATE, 'YYYY') = TO_CHAR(LOGIN_TIME, 'YYYY') and USER_NAME = '{user_name.lower()}'")
        #print(results_logged_in_this_year)
        is_there = (True if results_logged_in_this_year else False)
        #print(is_there)
        return results_logged_in_this_year, is_there

    def check_user_in(self, user_name):
        signed_in, is_there = self.has_user_logged_this_year(user_name)
        this_year = datetime.datetime.now().strftime("%Y")

        if signed_in and signed_in[0][1].strftime("%Y") == this_year:
            signed_in = signed_in[0][1]
            #print('already signed in', signed_in.strftime("%A %H:%M %p %Y-%b-%d"))
            return signed_in
        else:
            #print('init sign in', signed_in)
            insert_statement = f"INSERT INTO AVC.OPENING_DAY_LOGIN(USER_NAME, LOGIN_TIME) VALUES('{user_name.lower()}', SYSDATE)"
            sign_in = self.execute_sql_insert(insert_statement)
            signed_in, _ = self.has_user_logged_this_year(user_name)
            signed_in = signed_in[0][1]
            return signed_in



@app.route('/')
@app.route('/index')
def index():
    if not_logged_in:
        return redirect(url_for('saml_login'))
    else:

        user_name = 'rwalden'
        return render_template('index.html', title='Welcome', username=user_name)



flask_session = OpeningDayReg()

@app.route('/registered', methods=['GET'])
def is_user_registered():
    user_name = 'rwalden'
    is_reg = flask_session.has_user_logged_this_year(user_name)
    print(is_reg)
    return str(is_reg)

@app.route('/register', methods=['POST'])
@cross_origin()
def register_user():
    #print(type(post_companies), dir(post_companies))
    #print(request.form)
    #print(request.json)
    #print(request.json['user_name'])
    user_check_in_status = flask_session.check_user_in(request.json['user_name'])
    print('user_check_in_status', user_check_in_status)

    return {'sing_in_dt': user_check_in_status.strftime("%A %H:%M %p %Y-%b-%d")}   # json.dumps({"success": True}), 201







#if __name__ == '__main__':
#api.run()

In [3]:
app.run(debug = False)

 * Serving Flask app '__main__' (lazy loading)
 * Environment: production
[2m   Use a production WSGI server instead.[0m
 * Debug mode: off


 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
[2022-10-31 07:36:25,246] ERROR in app: Exception on / [GET]
Traceback (most recent call last):
  File "C:\Users\rwalden\Anaconda3\envs\cx_oracle\lib\site-packages\flask\app.py", line 2073, in wsgi_app
    response = self.full_dispatch_request()
  File "C:\Users\rwalden\Anaconda3\envs\cx_oracle\lib\site-packages\flask\app.py", line 1519, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "C:\Users\rwalden\Anaconda3\envs\cx_oracle\lib\site-packages\flask_cors\extension.py", line 165, in wrapped_function
    return cors_after_request(app.make_response(f(*args, **kwargs)))
  File "C:\Users\rwalden\Anaconda3\envs\cx_oracle\lib\site-packages\flask\app.py", line 1517, in full_dispatch_request
    rv = self.dispatch_request()
  File "C:\Users\rwalden\Anaconda3\envs\cx_oracle\lib\site-packages\flask\app.py", line 1503, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_ar

select USER_NAME,LOGIN_TIME from AVC.OPENING_DAY_LOGIN where TO_CHAR(SYSDATE, 'YYYY') = TO_CHAR(LOGIN_TIME, 'YYYY') and USER_NAME = 'rwalden'
([], False)


127.0.0.1 - - [31/Oct/2022 07:37:25] "GET /register HTTP/1.1" 405 -
