In [None]:
pip install sqlalchemy==2.0 psycopg psycopg[binary,pool]

In [2]:
import time
import asyncio
import asyncssh
import aiohttp
from http.client import HTTPConnection
from urllib.parse import urlparse
import ipywidgets
import threading
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session

async def site_is_online_async(link, timeout,username,password):
    """Return True if the target URL is online.
    Raise an exception otherwise.
    """
    error = Exception("unknown scheme")
    url = link.url
    parser = urlparse(url)
    host = parser.netloc or parser.path.split("/")[0]
    
    
    if parser.scheme in ("http", "https"):
        async with aiohttp.ClientSession() as session:
            try:
                await session.head(url, timeout=timeout,ssl=False)
                return True
            except asyncio.TimeoutError:
                error = Exception("timed out")
            except Exception as e:
                error = e
                
    if parser.scheme in ("ssh"):
        if  parser.port == None : 
            port=22 
        else: 
            port=parser.port
            host=host.split(":")[0]

        #print(host,port)
        
        try:
            async with await asyncio.wait_for(asyncssh.connect(host=host,port=port,username=username, password=password, known_hosts=None), timeout=timeout) as conn:
                print((await conn.run('tty', term_type='ansi')).stdout, end='')
                return True
        except asyncio.TimeoutError:
                error = Exception("timed out")
        except Exception as e:
                error = e
                
    if parser.scheme in ("postgre"):                
        if  parser.port == None : 
            port=5432
        else: 
            port=parser.port
            host=host.split(":")[0]
        
        coro = asyncio.open_connection(host, port)
        # execute the coroutine with a timeout
        try:
            # open the connection and wait for a moment
            _,writer = await asyncio.wait_for(coro, timeout)
            # close connection once opened
            writer.close()
            # indicate the connection can be opened
            return True
        except asyncio.TimeoutError:
                error = Exception("timed out")
        except Exception as e:
                error = e
                
    if parser.scheme in ("kafka"):                
        if  parser.port == None : 
            port=9092
        else: 
            port=parser.port
            host=host.split(":")[0]
        
        coro = asyncio.open_connection(host, port)
        # execute the coroutine with a timeout
        try:
            # open the connection and wait for a moment
            _,writer = await asyncio.wait_for(coro, timeout)
            # close connection once opened
            writer.close()
            # indicate the connection can be opened
            return True
        except asyncio.TimeoutError:
                error = Exception("timed out")
        except Exception as e:
                error = e
                
    if parser.scheme in ("raw"):                
        if  parser.port == None : 
            port=0
        else: 
            port=parser.port
            host=host.split(":")[0]
        
        coro = asyncio.open_connection(host, port)
        # execute the coroutine with a timeout
        try:
            # open the connection and wait for a moment
            _,writer = await asyncio.wait_for(coro, timeout)
            # close connection once opened
            writer.close()
            # indicate the connection can be opened
            return True
        except asyncio.TimeoutError:
                error = Exception("timed out")
        except Exception as e:
                error = e                
                
    raise error
    
#out_box = ipywidgets.Output(layout={'border': '2px solid black', 'overflow':'visible'},width='100px')   

out_box = ipywidgets.Output(layout=ipywidgets.Layout(width='100%',
                                           height='auto',
                                           max_height='300px',
                                           overflow='hidden scroll',border ='2px solid gray'))

    
def display_check_result(link, latency_ns, error, sink, agent_token):
    """Display the result of a connectivity check."""
    out_box.append_stdout(f'\nService "{link.service_name}" [link: "{link.url}"] for role "{link.role_name}" is:')
    
    if (latency_ns):
        sink.execute(text(f'''
                           insert into dbo.probes 
                           (link_id,role_id,latency_ns,agent_token)
                           values
                           ({link.link_id},{link.role_id},{latency_ns},'{agent_token}');
                           '''))
        sink.commit()
        out_box.append_stdout(f'"Online!" 👍, latency: {latency_ns*10e-9}s')
    else:
        sink.execute(text(f'''insert into dbo.probes 
                              (link_id,role_id,latency_ns,error_message,agent_token) 
                              values 
                              ({link.link_id},{link.role_id},null,'{error}','{agent_token}');
                           '''))
        sink.commit()
        out_box.append_stdout(f'"Offline?" 👎 \n  Error: "{error}"')

progress = ipywidgets.FloatProgress(value=0.0, min=0.0, max=1.0)
        

async def _asynchronous_check(links,timeout,username,password,sink,agent_token):
    async def _check(link,timeout,username,password,sink,agent_token):
        error = ""
        startTime = time.time_ns()
        try:
            await site_is_online_async(link,timeout,username,password)
            latency = time.time_ns() - startTime
        except Exception as e:
            latency = None
            #template = 'An exception of type "{0}" occurred. Arguments:\n{1!r}'
            error = str(e) #template.format(type(e).__name__, e.args)
        display_check_result(link, latency,error,sink,agent_token)
        return (error == "")

    values_list = await asyncio.gather(*(_check(link,timeout,username,password,sink,agent_token) for link in links))
    sink.commit()
    sink.close()
    return values_list
   
def check(timeout,links,username,password,sink,agent_token,callback_proc):
    
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:  # 'RuntimeError: There is no current event loop...'
        loop = None

    if loop and loop.is_running():
        #print('Async event loop already running. Adding coroutine to the event loop.')
        
        tsk = loop.create_task(_asynchronous_check(links,timeout,username,password,sink,agent_token)) 
        # ^-- https://docs.python.org/3/library/asyncio-task.html#task-object
        
        # Optionally, a callback function can be executed when the coroutine completes
        tsk.add_done_callback(
            callback_proc
        )
    else:
        print('Starting new event loop')
        result = asyncio.run(_asynchronous_check(links,timeout,username,password,sink,agent_token))
    
username = ipywidgets.Text(description='User:', placeholder='Введите имя пользователя',value='')
password = ipywidgets.Password(description='Password:', placeholder='Введите пароль')
role     = ipywidgets.Select(
                options=[
                    'Developer',
                    'QA Engineer',
                    'Release Engineer',
                    'Analyst',
                    'Data Scientist',
                    'DevOps Engineer',
                    'Support Engineer'
                ],
                value='Developer',
                rows=3,
                description='Roles',
                disabled=False,
                tooltip='Check beacons visibility',
            )


token    = ipywidgets.Password(description='Token:', placeholder='Введите токен',value='')

bvButton = ipywidgets.Button(
            value=False,
            description='Beacons',
            disabled=False,
            button_style='', 
            tooltip='Check beacons visibility',
            icon='check' 
)

sButton = ipywidgets.Button(description="Start!",disabled=True, tooltip='Start tracing')
cButton = ipywidgets.Button(description="Reset", tooltip='Reset agent')


def on_sButton_clicked(b):
    progress.value = 0.0
    out_box.append_stdout(f'\n{time.time()} - Start tracing of services!\n')
    
    engine=create_engine(f'postgresql+psycopg://{token.value}',
                        echo=True, isolation_level="AUTOCOMMIT")
    
    out_box.append_stdout(f'\n{engine} - engine created!\n') 
    
    session = Session(engine)
    
    out_box.append_stdout(f'\n{session} - session created!\n') 
    
    resultSet = session.execute(text(
                                        f''' 
                                        SELECT url, role_name, service_name, link_id, role_id, is_beacon
                                        FROM dbo.eligible_links where role_name in ('{role.value}') 
                                        ''')
                                )
    out_box.append_stdout(f'\n{resultSet} - resultSet done!\n') 
    
    eligible_links = resultSet.fetchall()
    
    session.commit()
    
    out_box.append_stdout(f'\nEngine.url:{engine.url}\n') 
    
    out_box.append_stdout(f'\nAgent_token:{engine.url.username}\n') 
    
    check(timeout=15, links=eligible_links,
          username=username.value,password=password.value, 
          sink=session, agent_token=engine.url.username,
          callback_proc = lambda lmd: [out_box.append_stdout(f'\nTracing of services: done with result={lmd.result()}  << return _asynchronous_check()\n'),
                                       check_callback(check_value_list=lmd.result())
                                      ] 
          
         )        
                            
    out_box.append_stdout(f'\nAll tracing subtasks were submited!\n') 

def on_cButton_clicked(b):
    progress.value = 0
    sButton.disabled=True
    out_box.clear_output()

    
def check_callback(check_value_list):
    progress.value = 1.0
    if len(check_value_list) > 0: 
            if (False in check_value_list):
                nrqty =0
                for res in check_value_list:
                    if not(res): nrqty +=1
                
                out_box.append_stdout(
                   f'''\n#######################################################################################
                       \n{nrqty} service(s) from {len(check_value_list)} are not reachable, incident reported to upstream 
                       \n#######################################################################################
                    '''
                                  )
    else:
                out_box.append_stdout(
                    '''\n#######################################################################
                       \nServices are not available in ACL for current role (eligible_links view)
                       \n#######################################################################
                    '''
                                  )
    pass
    
    
def beacons_discovery_callback(discovery_value_list):
    progress.value = 1.0
    ## discovery_value_list = [False,True]
    if len(discovery_value_list) > 0: 
            if (True in discovery_value_list):
                   sButton.disabled=False
            else:
                nrqty =0
                for res in discovery_value_list:
                    if not(res): nrqty +=1
                out_box.append_stdout(
                   f'''\n#####################################################
                       \nBeacons({nrqty} from {len(discovery_value_list)}) are not reachable, check VPN connectivity
                       \n#####################################################
                    '''
                                  )
    else:
                out_box.append_stdout(
                    '''\n#######################################################################
                       \nBeacons are not available in ACL for current role (eligible_links view)
                       \n#######################################################################
                    '''
                                  )
    pass

def on_bvButton_clicked(b):
    out_box.append_stdout(f'\n{time.time()} - Beacons discovery started!\n')
    
    engine=create_engine(f'postgresql+psycopg://{token.value}',
                        echo=True, isolation_level="AUTOCOMMIT")
    
    out_box.append_stdout(f'\n{engine} - engine created!\n') 
    
    session = Session(engine)
    
    out_box.append_stdout(f'\n{session} - session created!\n') 
    
    resultSet = session.execute(text(
                                        f''' 
                                        SELECT url, role_name, service_name, link_id, role_id, is_beacon
                                        FROM dbo.eligible_links where role_name in ('{role.value}')
                                        and is_beacon
                                        ''')
                                )
    out_box.append_stdout(f'\n{resultSet} - resultSet done!\n') 
    
    eligible_links = resultSet.fetchall()
    
    session.commit()
    
    out_box.append_stdout(f'\nEngine.url:{engine.url}\n') 
    
    out_box.append_stdout(f'\nAgent_token:{engine.url.username}\n') 
    
    #check_beacon_visibility(timeout=1, links=eligible_links,
        
    check(timeout=3, links=eligible_links,
          username=username.value,password=password.value, 
          sink=session, agent_token=engine.url.username,
          callback_proc = lambda lmd: [out_box.append_stdout(f'\nBeacons discovery: done with result={lmd.result()}  << return _asynchronous_check()\n'),
                                         beacons_discovery_callback(discovery_value_list=lmd.result())
                                        ] 
         )        
                            
bvButton.on_click(on_bvButton_clicked)
sButton.on_click(on_sButton_clicked)
cButton.on_click(on_cButton_clicked)

display(ipywidgets.VBox([username,password,role,token]),
        ipywidgets.HBox([bvButton,sButton,cButton]),
        ipywidgets.HBox([progress]))
display(out_box)

VBox(children=(Text(value='', description='User:', placeholder='Введите имя пользователя'), Password(descripti…

HBox(children=(Button(description='Beacons', icon='check', style=ButtonStyle(), tooltip='Check beacons visibil…

HBox(children=(FloatProgress(value=0.0, max=1.0),))

Output(layout=Layout(border_bottom='2px solid gray', border_left='2px solid gray', border_right='2px solid gra…