In [None]:
import os
import json
import requests
from bs4 import BeautifulSoup
import pandas as pd

## List of Kafka streams
based on <confluence link>

### Get Confluence content

In [None]:
contentApiUrl = '/rest/api/content'
# Change these based on your instance
confluenceBaseUrl = '<confluence_base_url>'
pageId = '<page_id>'
user = '<confluence_username>'
key = '<api_key>'

In [None]:
requestUrl = (f'<kafka_connector_request_url>')
print(requestUrl)

requestResponse = requests.get(requestUrl, auth=(user, key))
requestResponsejson = requestResponse.json()

content = requestResponsejson.get("body").get("storage").get("value")

tables_raw = [[[cell.text for cell in row("th") + row("td")]
                    for row in table("tr")]
                    for table in BeautifulSoup(content, features="lxml")("table")]

print(tables_raw)
 
# Create the pandas DataFrame
streamdf = pd.DataFrame(tables_raw[0][1:], columns = tables_raw[0][0])
streamdf.head(100)

In [None]:
base_url = "<>"

sourcelist = streamdf['Source Connector'].tolist()
sourcelist = list(filter(None, sourcelist))
print("list of source connectors:",sourcelist)
print("number of source connectors:",len(sourcelist))

sinklist = streamdf['Sink Connector'].tolist()
sinklist = list(filter(None, sinklist))
print("list of sink connectors:",sinklist)
print("number of sink connectors:",len(sinklist))

## Kafka status check function

In [None]:
def kafka_status(stream):     
    status_url = (f"{base_url}/{stream}/status")
    print(status_url)
    
    response = requests.get(status_url)
    if response.status_code == 200:
        print("Response Success")
    else:
        print("Response Failure")
        exit()
    
    status = BeautifulSoup(response.content, 'html.parser')
    status = status.prettify()
    status_json = json.loads(status)
    print(status_json)
    
    fail_ind = 0
    
    connector = status_json["connector"]
    print("connector details:",connector)
    connector_status = connector.get('state')
    print("connector", connector_status)    
    if connector_status != "RUNNING":
        fail_ind = fail_ind + 1
        
    tasks = status_json["tasks"]
    print("task details:",tasks)
    print("number of tasks",len(tasks))
    for i in range(len(tasks)):
        task_status = tasks[i].get('state')
        if task_status != "RUNNING":
            fail_ind = fail_ind + 1
        print("task id",i,task_status)
    
    return ("failure indicator", fail_ind)  


## Kafka restart function

In [None]:
def kafka_restart(stream):
    status_url = (f"{base_url}/{stream}/status")
    print(status_url)
    
    response = requests.get(status_url)
    if response.status_code == 200:
        print("Response Success")
    else:
        print("Response Failure")
        exit()
    
    status = BeautifulSoup(response.content, 'html.parser')
    status = status.prettify()
    status_json = json.loads(status)
    
    connector = status_json["connector"]
    connector_status = connector.get('state')      
    if (connector_status != "RUNNING"):
        connector_restart_url = (f"{base_url}/{stream}/restart?includeTasks=true&onlyFailed=true")
        connector_restart = requests.post(connector_restart_url)
        print(connector_restart)
    else:
        print("connector",connector_status)
    
    tasks = status_json["tasks"]      
    for i in range(len(tasks)):
        task_status = tasks[i].get('state')
        if (task_status != "RUNNING"):
            print("task id",i,task_status, "and needs restart")   
            task_restart_url = (f"{base_url}/{stream}/tasks/{i}/restart")
            print(task_restart_url)
            task_restart = requests.post(task_restart_url)
            print(task_restart.status_code)
        else:
            print("task id",i,task_status)


## Source stream validation

In [None]:
for i in range(len(sourcelist)):
    stream = sourcelist[i]
    print(f"############### {stream} validation started ###############")
    status_op = kafka_status(stream)
    print(status_op)
    fail_ind = status_op[-1]

    if fail_ind != 0:
        kafka_restart(stream)
        kafka_status(stream)
        print("failed connectors / tasks restarted for the stream", stream)
        print(f"############### {stream} validation complete ###############")
    else:
        print(stream, "stream is healthy")
        print(f"############### {stream} validation complete ###############")

## Sink stream validation

In [None]:
for i in range(len(sinklist)):
    stream = sinklist[i]
    print(f"############### {stream} validation started ###############")
    status_op = kafka_status(stream)
    print(status_op)
    fail_ind = status_op[-1]

    if fail_ind != 0:
        kafka_restart(stream)
        kafka_status(stream)
        print("failed connectors / tasks restarted for the stream", stream)
        print(f"############### {stream} validation complete ###############")
    else:
        print(stream, "stream is healthy")
        print(f"############### {stream} validation complete ###############")