In [24]:
from azure.devops.connection import Connection
from azure.devops.v7_1.git.models import GitPullRequest
from azure.devops.v7_1.git.models import CommentThread, Comment
from msrest.authentication import BasicAuthentication

import sys
from datetime import timedelta, datetime
import json
import re
import requests
import getopt
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

In [25]:
# DBRKS URL pattern
pattern = r"^https://adb-([0-9]+).([0-9]+).azuredatabricks.net/\?o=([0-9]+)#job/([0-9]+)/run/([0-9]+)$"
pattern_as_text = r"https://adb-([0-9]+).([0-9]+).azuredatabricks.net/\?o=([0-9]+)#job/([0-9]+)/run/([0-9]+)"  
cleanRe = re.compile('<.*?>')

app_summary_map = {}
app_summary_map_list = []

In [26]:
def get_api(api_url, api_token):
    response = requests.get(api_url,
                            verify=False,
                            headers={'Authorization': api_token})
    json_obj = json.loads(response.content)
    return json_obj

def check_response_on_get(json_val):
    if 'message' in json_val :
        if json_val['message'] == 'INVALID_TOKEN' :
           raise ValueError('INVALID_TOKEN')

def post_api(api_url, api_token, kv_dict):
    response = requests.post(api_url,
                             data = json.dumps(kv_dict),
                             verify=False,
                             headers={'Authorization': api_token,
                                      'accept': 'application/json',
                                      'Content-Type': 'application/json'})
    json_obj = json.loads(response.content)
    return json_obj

def check_response_on_post(json_val):
    if 'message' in json_val :
        if json_val['message'] == 'INVALID_TOKEN' :
           raise ValueError('INVALID_TOKEN')
    elif len(json_val) == 0:
        print(json_val)
        raise ValueError('Response is empty') 
    elif 'results' not in json_val:
        print(json_val)
        raise ValueError('KEY results NOT FOUND')

In [27]:
def search_by_job_id(base_url, api_token, start_time, end_time, job_id):
    api_url = base_url + \
              '/api/v1/ds/api/v1/databricks/runs'
    kv_pairs = {"from":0,
                "appTypes":["db"],
                "appStatus":["K","F","R","S","P","U","W"],
                "size":"1000",
                "start_time":start_time,
                "end_time":end_time,
                "jobIds": [ job_id ],
                "sort": [{"startTime": {"order": "desc"}}],
                "queryOnFinishedTime": False,
                }
    print("URL: " + api_url)
    print(kv_pairs)
    json_val  = post_api(api_url, api_token, kv_pairs)
    check_response_on_post(json_val)
    return json_val

In [28]:
def search_by_job_id(base_url, api_token, start_time, end_time, job_id):
    api_url = base_url + \
              '/api/v1/ds/api/v1/databricks/runs'
    kv_pairs = {"from":0,
                "appTypes":["db"],
                "appStatus":["K","F","R","S","P","U","W"],
                "size":"1000",
                "start_time":start_time,
                "end_time":end_time,
                "jobIds": [ job_id ],
                "sort": [{"startTime": {"order": "desc"}}],
                "queryOnFinishedTime": False,
                }
    print("URL: " + api_url)
    print(kv_pairs)
    json_val  = post_api(api_url, api_token, kv_pairs)
    check_response_on_post(json_val)
    return json_val

In [29]:
def search_by_globalsearchpattern(base_url, api_token, start_time, end_time, gsp):
    api_url = base_url + \
              '/api/v1/ds/api/v1/databricks/runs'
    kv_pairs = {"from":0,
                "appTypes":["db"],
                "appStatus":["K","F","R","S","P","U","W"],
                "size":"1",
                "start_time":start_time,
                "end_time":end_time,
                "globalsearchpattern": gsp,
                "sort": [{"startTime": {"order": "desc"}}],
                "queryOnFinishedTime": False,
                }
    print("URL: " + api_url)
    print(kv_pairs)
    json_val  = post_api(api_url, api_token, kv_pairs)
    check_response_on_post(json_val)
    return json_val

In [30]:
def search_summary_by_globalsearchpattern(base_url, api_token, start_time, end_time, gsp):
    api_url = base_url + \
              '/api/v1/ds/api/v1/databricks/runs/' + gsp + '/tasks/summary'
    print("URL: " + api_url)
    json_val  = get_api(api_url, api_token)
    check_response_on_get(json_val)
    return json_val

In [31]:
def search_analysis(base_url, api_token, clusterUId, id):
    api_url = base_url + '/api/v1/spark/' + clusterUId + '/' + id + '/analysis'
    print("URL: " + api_url)
    json_val  = get_api(api_url, api_token)
    check_response_on_get(json_val)
    return json_val

In [32]:
def search_summary(base_url, api_token, clusterUId, id):
    api_url = base_url + "/api/v1/spark/" + clusterUId + "/" + id + "/appsummary"
    print("URL: " + api_url)
    json_val = get_api(api_url, api_token)
    check_response_on_get(json_val)
    return json_val

In [33]:
def getCurrentDateTime():   
    today = datetime.today()
    return  datetime(year=today.year, month=today.month, day=today.day, hour=today.hour, second=today.second) 

In [34]:

# Fill in with your personal access token and org URL
#personal_access_token = 'nvc4yu4wwu257m4fsk7unpopbjwjcrtksjxdfskdmvvwmuxx5bcq'
#organization_url = 'https://dev.azure.com/unraveldevops'
#project_name = 'DBRK-CICD'
#build_id = 88

def search_dbrks_job_runs(connection, project_name, build_id):

  # build client
  build_client = connection.clients.get_build_client()

  build = build_client.get_build(project_name, build_id)
  build_json = build.as_dict()

  print("reason:" + build_json['reason'])
  print("pr:" + build_json['trigger_info']['pr.number'])
  print("repo_id:" + build_json['repository']['id'])
          
  #with open("build.json", "w") as outfile:
  #    json.dump(build.as_dict(), outfile)


  # git client
  git_client = connection.clients.get_git_client()
  pr = git_client.get_pull_request_by_id(build_json['trigger_info']['pr.number'])
  pr_json = pr.as_dict()
  pr_id = pr_json['pull_request_id']
  runs_json = json.loads(pr_json['description'])
  #print(runs_json)

  job_run_list = []
  for run_url in runs_json['runs']:
      match = re.search(pattern, run_url)
      if match:
        workspace_id = match.group(3)
        job_id = match.group(4)
        run_id = match.group(5)
        job_run_list.append({'pr_id': pr_id, 'pdbrks_url': run_url, 'workspace_id': workspace_id, 'job_id': job_id, 'run_id': run_id})
  
  return job_run_list

In [35]:
def get_devops_build(connection, project_name, build_id):
  # build client
  build_client = connection.clients.get_build_client()
  build = build_client.get_build(project_name, build_id)

  return build

In [36]:
def get_git_pr(connection, pr_number):
  # git client
  git_client = connection.clients.get_git_client()
  pr = git_client.get_pull_request_by_id(pr_number)

  return pr

In [37]:
def update_git_pr(connection, pr, repository_id, pr_number, project_name):
  # git client
  git_client = connection.clients.get_git_client()
  pr = git_client.update_pull_request(pr, repository_id, pr_number, project_name)

In [38]:
def create_pr_comments(connection, repository_id, pr_number, comment_thread):
  # git client
  git_client = connection.clients.get_git_client()
  git_client.create_thread(comment_thread, repository_id, pr_number)

In [39]:
def get_job_runs_from_description(pr_id, description_json):
  job_run_list = []
  for run_url in description_json['runs']:
      match = re.search(pattern, run_url)
      if match:
        print(run_url)
        workspace_id = match.group(3)
        job_id = match.group(4)
        run_id = match.group(5)
        job_run_list.append({'pr_id': pr_id, 'pdbrks_url': run_url, 'workspace_id': workspace_id, 'job_id': job_id, 'run_id': run_id})
  
  return job_run_list

In [40]:
def get_job_runs_from_description_as_text(pr_id, description_text):
  job_run_list = []
  print("Description:\n" + description_text)
  print("Patten: " + pattern_as_text)
  matches = re.findall(pattern_as_text, description_text)
  if matches:
    for match in matches:
      workspace_id = match[2]
      job_id = match[3]
      run_id = match[4]
      job_run_list.append({'pr_id': pr_id, 'workspace_id': workspace_id, 'job_id': job_id, 'run_id': run_id})
  else:
    print("no match")
  return job_run_list

In [41]:
def get_organization_connection(organization_url, personal_access_token):
  credentials = BasicAuthentication('', personal_access_token)
  connection = Connection(base_url=organization_url, creds=credentials)
  return connection

In [42]:
def create_comments_with_markdown(job_run_result_list):
    comments = ""
    if job_run_result_list:
        for r in job_run_result_list:
            comments += "----\n"
            comments += "<details>\n"
            # comments += "<img src='https://www.unraveldata.com/wp-content/themes/unravel-child/src/images/unLogo.svg' alt='Logo'>\n\n"
            comments += "<summary> <img src='https://www.unraveldata.com/wp-content/themes/unravel-child/src/images/unLogo.svg' alt='Logo'> Job Id: {}, Run Id: {}</summary>\n\n".format(
                r["job_id"], r["run_id"]
            )
            comments += "#### Workspace Id:" + r["workspace_id"] + "\n"
            comments += "#### Job Id:" + r["job_id"] + "\n"
            comments += "#### Run Id:" + r["run_id"] + "\n"
            comments += "----\n"
            comments += "#### [{}]({})\n".format('Unravel url', r["unravel_url"])
            if r['app_summary']:
                # Get all unique keys from the dictionaries while preserving the order
                headers = []
                for key in r['app_summary'].keys():
                    if key not in headers:
                        headers.append(key)

                # Generate the header row
                header_row = "| " + " | ".join(headers) + " |"

                # Generate the separator row
                separator_row = "| " + " | ".join(["---"] * len(headers)) + " |"

                # Generate the data rows
                data_rows = "\n".join(
                    [
                        "| " + " | ".join(str(r['app_summary'].get(h, "")) for h in headers)
                    ]
                )

                # Combine the header, separator, and data rows
                comments += "----\n"
                comments += "# App Summary\n"
                comments += "----\n"
                comments += header_row + "\n" + separator_row + "\n" + data_rows + "\n"
            if r["unravel_insights"]:
                comments += "----\n"
                comments += "## Unravel Insights\n"
                for insight in r["unravel_insights"]:
                    categories = insight["categories"]
                    if categories:
                        for k in categories.keys():
                            instances = categories[k]["instances"]
                            if instances:
                                for i in instances:
                                    if i["key"].upper() != "SPARKAPPTIMEREPORT":
                                        comments += (
                                            "#### "
                                            + i["key"].upper()
                                            + ": "
                                            + i["title"]
                                            + "\n"
                                        )
                                        comments += "##### EVENT: " + i["events"] + "\n"
                                        comments += (
                                            "##### ACTIONS: " + i["actions"] + "\n"
                                        )
            comments += "</details>\n\n"

    return comments

In [43]:
def fetch_app_summary(unravel_url, unravel_token, clusterUId, appId):
    app_summary_map = {}
    autoscale_dict = {}
    summary_dict = search_summary(unravel_url, unravel_token, clusterUId, appId)
    summary_dict = summary_dict["annotation"]
    url = '{}/#/app/application/spark?execId={}&clusterUid={}'.format(unravel_url,appId,clusterUId)
    app_summary_map["Spark App"] = '[{}]({})'.format(appId, url)
    app_summary_map["Cluster"] = clusterUId
    app_summary_map["Total cost"] = '${}'.format(summary_dict["cents"] + summary_dict["dbuCost"])
    runinfo = json.loads(summary_dict["runInfo"])
    app_summary_map["Executor Node Type"] = runinfo["node_type_id"]
    app_summary_map["Driver Node Type"] = runinfo["driver_node_type_id"]
    app_summary_map["Tags"] = runinfo["default_tags"]
    if 'custom_tags' in runinfo.keys():
        app_summary_map["Tags"] = {**app_summary_map["Tags"], **runinfo["default_tags"]}
    if "autoscale" in runinfo.keys():
        autoscale_dict["autoscale_min_workers"] = runinfo["autoscale"]["min_workers"]
        autoscale_dict["autoscale_max_workers"] = runinfo["autoscale"]["max_workers"]
        autoscale_dict["autoscale_target_workers"] = runinfo["autoscale"][
            "target_workers"
        ]
        app_summary_map['Autoscale'] = autoscale_dict
    else:
        app_summary_map['Autoscale'] = 'Autoscale is not enabled.'
    return app_summary_map

In [44]:
def fetch_app_summary(unravel_url, unravel_token, clusterUId, appId):
    app_summary_map = {}
    autoscale_dict = {}
    summary_dict = search_summary(unravel_url, unravel_token, clusterUId, appId)
    summary_dict = summary_dict["annotation"]
    url = '{}/#/app/application/spark?execId={}&clusterUid={}'.format(unravel_url,appId,clusterUId)
    app_summary_map["Spark App"] = '[{}]({})'.format(appId, url)
    app_summary_map["Cluster"] = clusterUId
    app_summary_map["Total cost"] = '${}'.format(summary_dict["cents"] + summary_dict["dbuCost"])
    runinfo = json.loads(summary_dict["runInfo"])
    app_summary_map["Executor Node Type"] = runinfo["node_type_id"]
    app_summary_map["Driver Node Type"] = runinfo["driver_node_type_id"]
    app_summary_map["Tags"] = runinfo["default_tags"]
    if 'custom_tags' in runinfo.keys():
        app_summary_map["Tags"] = {**app_summary_map["Tags"], **runinfo["default_tags"]}
    if "autoscale" in runinfo.keys():
        autoscale_dict["autoscale_min_workers"] = runinfo["autoscale"]["min_workers"]
        autoscale_dict["autoscale_max_workers"] = runinfo["autoscale"]["max_workers"]
        autoscale_dict["autoscale_target_workers"] = runinfo["autoscale"][
            "target_workers"
        ]
        app_summary_map['Autoscale'] = autoscale_dict
    else:
        app_summary_map['Autoscale'] = 'Autoscale is not enabled.'
    return app_summary_map

In [46]:
def main():
  unravel_url = ''
  unravel_token = ''
  organization_url = ''
  pat = ''
  project_name = ''
  build_id = ''
  
  try:
    opts, args = getopt.getopt(sys.argv[1:], 'hu:t:o:a:p:b:',
      ['unravel=', 'token=',  'organization=', 'pat=' , 'project=', 'build='])
  except getopt.GetoptError:
    print(
      'azdevopsclient.py -u <unravel_url> -t <unravel_api_token>  -o <organization_url> -a <pat> -p <project_name> -b <build_id>)')
    sys.exit(2)

  for opt, arg in opts:
    if opt == '-h':
      print(
        'azdevopsclient.py -u <unravel_url> -t <unravel_api_token>  -o <organization_url> -a <pat> -p <project_name> -b <build_id>)')
      sys.exit()
    elif opt in ('-u', '--unravel'):
        unravel_url = arg
    elif opt in ('-t', '--token'):
        unravel_token = arg
    elif opt in ('-o', '--organization'):
        organization_url = arg
    elif opt in ('-a', '--pat'):
        pat = arg
    elif opt in ('-p', '--project'):
        project_name = arg
    elif opt in ('-b', '--build'):
        build_id = arg

  print('-u : ' + unravel_url)
  print('-t : ' + unravel_token)
  print('-o : ' + organization_url)
  print('-a : ' + pat)
  print('-p : ' + project_name)
  print('-b : ' + build_id)

  # azure devops connection
  connection = get_organization_connection(organization_url, pat)
  #job_run_list = search_dbrks_job_runs(connection, project_name, build_id)


  # get build
  build = get_devops_build(connection, project_name, build_id)
  build_json = build.as_dict()
  
  build_reason = build_json['reason']
  if build_reason != 'pullRequest':
     print("Nothing to do without PR")
     sys.exit(0)

  pr_number = build_json['trigger_info']['pr.number']
  repo_id = build_json['repository']['id']

  print()
  print("pr_number:" + pr_number)
  print("repo_id:" + repo_id)

  # get PR
  pr = get_git_pr(connection, pr_number)
  pr_json = pr.as_dict()
  pr_id = pr_json['pull_request_id']
  #description_json = json.loads(pr_json['description'])
  #job_run_list = get_job_runs_from_description(pr_id, description_json)
  raw_description = pr_json['description']
  description = ' '.join(raw_description.splitlines())
  description = re.sub(cleanRe, '', description)
  job_run_list = get_job_runs_from_description_as_text(pr_id, description)

  # start and end TS
  today = datetime.today()
  endDT = datetime(year=today.year, month=today.month, day=today.day, hour=today.hour, second=today.second) 
  startDT = endDT - timedelta(days=14)
  start_time = startDT.astimezone().isoformat()
  end_time = endDT.astimezone().isoformat()
  print('start: ' + start_time)
  print('end: '  + end_time)

  job_run_result_list = []
  for run in job_run_list:
    gsp = run['workspace_id'] + '_' + run['job_id'] + '_' + run['run_id']
    job_runs_json = search_summary_by_globalsearchpattern(unravel_url, unravel_token, start_time, end_time, gsp)
    
    if job_runs_json:
      '''
      gsp_file = gsp + '_summary.json'
      with open(gsp_file, "w") as outfile:
        json.dump(job_runs_json, outfile)
      '''
      clusterUId = job_runs_json[0]['clusterUid']
      appId      = job_runs_json[0]['sparkAppId']
      print("clusterUid: " + clusterUId)
      print("sparkAppId: " + appId)

      result_json = search_analysis(unravel_url, unravel_token, clusterUId, appId)
      if result_json:
        '''
        gsp_file = gsp + '_analysis.json'
        with open(gsp_file, "w") as outfile:
          json.dump(result_json, outfile)
        '''
        insights_json = result_json['insightsV2']
        recommendation_json = result_json['recommendation']
        insights2_json = []
        for item in insights_json:
           #if item['key'] != 'SparkAppTimeReport':
           insights2_json.append(item)
        ''' 
        unravel_json = { 'workspace_id': run['workspace_id'],
                         'job_id': run['job_id'],
                         'run_id': run['run_id'],
                         'dbrks_url': run['dbrks_url'],
                         'unravel_url': unravel_url + '/#/jobs/runs',
                         'unravel_keyword': gsp,
                         'unravel_insights' : insights2_json,
                         'unravel_recommendation': recommendation_json,
        }
        ''' 
        run['unravel_url'] = unravel_url + '/#/jobs/runs'
        run['unravel_keyword'] = gsp
        run['unravel_insights'] = insights2_json
        run['unravel_recommendation'] = recommendation_json
        run["app_summary"] = fetch_app_summary(unravel_url, unravel_token, clusterUId, appId)
        
        # add to the list
        job_run_result_list.append(run)
    else:
       print("job_run not found: " + gsp)

  '''  
  gsp_file = 'cicd.json'
  with open(gsp_file, "w") as outfile:
    json.dump(job_run_result_list, outfile)
  '''
  if job_run_result_list:
    '''
    print("Current Description:")
    print(pr.description)
    description_json['unravel'] = job_run_result_list
    new_description = json.dumps(description_json)
    clean_description = re.sub(cleanRe, '', new_description)
    print("Updated Description:")
    print(clean_description)
    new_pr = GitPullRequest()
    new_pr.description = clean_description
    update_git_pr(connection, new_pr, repo_id, pr_number, project_name)  
    '''
    #unravel_comments = re.sub(cleanRe, '', json.dumps(job_run_result_list, indent=4))
    unravel_comments = create_comments_with_markdown(job_run_result_list)
    comment_thread = CommentThread(
      comments=[Comment(content=unravel_comments)],
      status="Active",
    )
    create_pr_comments(connection, repo_id, pr_number, comment_thread)
  else:
    print("Nothing to do without Unravel integration")
    sys.exit(0)
     
    
    
     
                                   





In [47]:
if __name__ == "__main__":
    main()

-u : http://44.214.236.147:3000
-t : JWT eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoiYWRtaW4iLCJhcHBJZCI6ImNpY2QiLCJpZCI6IiIsInVzZXJuYW1lIjoiIiwidGFncyI6e30sImF1ZCI6IjNwIiwiaWF0IjoxNjgzNTY4NzE1fQ.Mthh9x832yQyJXOpQX6Ulj2b0CgUDLfWFoAjCAY67Jw
-o : https://dev.azure.com/unravelcicdtest
-a : i7zj4xhci332d6qm2qusuj22qydz7h5xnpuxgg5txp5t2r7oxoza
-p : cicd
-b : 28

pr_number:8
repo_id:7dbb1ce2-f239-4e62-a40d-ca1ebc894e27
Description:
"https://adb-7575549084929882.2.azuredatabricks.net/?o=7575549084929882#job/867255411699781/run/26772 ", "https://adb-7575549084929882.2.azuredatabricks.net/?o=7575549084929882#job/824732723619648/run/27000 "
Patten: https://adb-([0-9]+).([0-9]+).azuredatabricks.net/\?o=([0-9]+)#job/([0-9]+)/run/([0-9]+)
start: 2023-05-24T11:00:01+05:30
end: 2023-06-07T11:00:01+05:30
URL: http://44.214.236.147:3000/api/v1/ds/api/v1/databricks/runs/7575549084929882_867255411699781_26772/tasks/summary
clusterUid: 0518-204437-vka2deom
sparkAppId: local-1684442747797
URL: http://