# jira_defects_dashboard_talent - DEV

In [3]:
import base64
import json
import requests
import mysql.connector

# def fetch_jira_issues_with_project_id(api_url, auth_header, project_id, status_filter):
def fetch_jira_issues_with_project_id(api_url, auth_header, project_id):
    try:
        response = requests.get(api_url, headers={'Authorization': auth_header, 'Content-Type': 'application/json'})
        response.raise_for_status()
        issues_data = response.json().get('issues', [])
        result = []
        for item in issues_data:
            current_project_id = item['fields']['project']['key']
            sprint_info_list = item['fields']['customfield_10020']
            issue_status = item['fields']['status']['name']
            
#             if current_project_id == project_id and issue_status == status_filter:
            if current_project_id == project_id:
                if sprint_info_list:
                    sprint_info = sprint_info_list[0]
                    result.append({
                        'project_id': current_project_id,
                        'Reporter': item['fields']['assignee']['displayName'],
                        'sprint_name': sprint_info.get('name', ''),
                        'sprint_start_date': sprint_info.get('startDate', ''),
                        'sprint_completed_date': sprint_info.get('completeDate', ''),
                        'sprint_state': sprint_info.get('state', ''),
                        'issue_key': item['key'],
                        'issue_summary': item['fields'].get('summary', ''),
                        'issue_status': issue_status,
                        'story_points': item['fields'].get('customfield_10016', None),
                        'priority_type': item['fields']['priority']['name'],
                        'issue_type': item['fields']['issuetype']['name'],
                        'time_spent': item['fields'].get('timespent', None)
                    })
                else:
                    result.append({
                        'projectID': current_project_id,
                        'Reporter': item['fields']['assignee']['displayName'],
                        'sprint_name': '',
                        'sprint_start_date': '',
                        'sprint_completed_date': '',
                        'sprint_state': '',
                        'issue_key': item['key'],
                        'issue_summary': item['fields'].get('summary', ''),
                        'issue_status': issue_status,
                        'story_points': item['fields'].get('customfield_10016', None),
                        'priority_type': item['fields']['priority']['name'],
                        'issue_type': item['fields']['issuetype']['name'],
                        'time_spent': item['fields'].get('timespent', None)
                    })
        return result
    except requests.exceptions.HTTPError as errh:
        raise
    except requests.exceptions.RequestException as err:
        raise

def load_credentials():
    with open('credentials.json') as f:
        return json.load(f)

def mysql_connection():
    credentials = load_credentials()
    params = {
        'host': credentials.get('DB_HOST', ''),
        'user': credentials.get('DB_USER', ''),
        'password': credentials.get('DB_PASSWORD', ''),
        'database': credentials.get('DB_DATABASE', ''),
        'port': int(credentials.get('DB_PORT', 3306))
    }
    return mysql.connector.connect(**params)

def get_jira_credentials(host, database, user, password, pod_id, c_id):
    mails = []
    try:
        connection = mysql.connector.connect(
            host=host,
            database=database,
            user=user,
            password=password
        )
        if connection.is_connected():
            cursor = connection.cursor()
            query = '''SELECT
                        p.id as pod_id,
                        pc.candidateID as candidate_id,
                        pc.jira_username AS podcandidate_jira_username,
                        at.Jira_URL AS auth_tokens_Jira_URL,
                        at.Jira_admin_login_email_address AS auth_tokens_Jira_admin_login_email_address,
                        at.Jira_admin_API_Token AS auth_tokens_Jira_admin_API_Token,
                        at.Jira_Kanben_Project_name AS auth_tokens_Jira_Kanben_Project_name
                    FROM
                        PodCandidates AS pc
                    INNER JOIN
                        pods AS p ON pc.podID = p.id
                    INNER JOIN
                        Authorization_Tokens AS at ON p.ART_id = at.ART_id WHERE p.id = %s and pc.candidateID = %s'''
            params = (pod_id, c_id)
            cursor.execute(query, params)
            myresult = cursor.fetchall()
            for row in myresult:
                jira_username = row[2]
                jira_url = row[3]
                jira_admin_login_email_address = row[4]
                jira_admin_api_token = row[5]
                jira_kanben_project_name = row[6]
                return jira_username, jira_url, jira_admin_login_email_address, jira_admin_api_token, jira_kanben_project_name
        else:
            return None, None, None, None, None
    except Exception as e:
        return None, None, None, None, None
    finally:
        cursor.close()
        if connection.is_connected():
            connection.close()

def run_code(event=None, context=None):
    outputData = []
    outputData1 = {}
    try:
        credentials = load_credentials()
        host = credentials.get('DB_HOST', '')
        database = credentials.get('DB_DATABASE', '')
        user = credentials.get('DB_USER', '')
        password = credentials.get('DB_PASSWORD', '')
        pod_id = event.get('pod_id')
        c_id = event.get('c_id')

        jira_username, jira_url, jira_admin_login_email_address, jira_admin_api_Token, jira_kanben_project_name = get_jira_credentials(
            host, database, user, password, pod_id, c_id)

#         issue_status_filter = 'To Do'
        # project_id = 'G10'  # Replace with your actual project ID
#         issue_type_filter = 'Bug'

        assignee_jql = f'assignee = "{jira_username}"'
#         assignee_jql = ' OR '.join([f'assignee = "{assignee}"' for assignee in jira_usernames])
        api_url = f"{jira_url}/rest/api/3/search?jql={assignee_jql}"
        auth_header = f"Basic {base64.b64encode(f'{jira_admin_login_email_address}:{jira_admin_api_Token}'.encode()).decode()}"

        jira_issues_data = fetch_jira_issues_with_project_id(api_url, auth_header, jira_kanben_project_name)
        for item in jira_issues_data:
            if item['sprint_state'] == '':
                item['sprint_state'] = 'closed'

        filtered_bug_issues = [issue for issue in jira_issues_data if issue['issue_type'] == 'Bug']
        open_defects = [issue for issue in filtered_bug_issues if issue['issue_status'] in ['To Do', 'In Progress']]
#         open_defects = [issue for issue in filtered_bug_issues if issue['issue_status'] == 'To Do']
        resolved_defects = [issue for issue in filtered_bug_issues if issue['issue_status'] == 'Done']
        open_user_defects = len(open_defects)
        resolved_user_defects = len(resolved_defects)

        outputData.append({
            'Reporter': jira_username,
            'open_defects': open_user_defects,
            'resolved_defects': resolved_user_defects
        })
        outputData1['reporters'] = []
        return {
            'statusCode': 200,
            'body': json.dumps({'reporters': outputData})
        }

    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps(f"An unexpected error occurred: {e}")
        }

    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps(outputData1)
        }
        
    except ValueError as ve:
        return {
            'statusCode': 500,
            'body': json.dumps(outputData1)
        }
        
    except TypeError as te:
        return {
            'statusCode': 500,
            'body': json.dumps(outputData1)
        }

def lambda_handler(event, context):
    return run_code(event, context)

# jira_defects_dashboard_talent - PROD

In [None]:
import base64
import json
import requests
import mysql.connector

# def fetch_jira_issues_with_project_id(api_url, auth_header, project_id, status_filter):
def fetch_jira_issues_with_project_id(api_url, auth_header, project_id):
    try:
        response = requests.get(api_url, headers={'Authorization': auth_header, 'Content-Type': 'application/json'})
        response.raise_for_status()
        issues_data = response.json().get('issues', [])
        result = []
        for item in issues_data:
            current_project_id = item['fields']['project']['key']
            sprint_info_list = item['fields']['customfield_10020']
            issue_status = item['fields']['status']['name']
            
#             if current_project_id == project_id and issue_status == status_filter:
            if current_project_id == project_id:
                if sprint_info_list:
                    sprint_info = sprint_info_list[0]
                    result.append({
                        'project_id': current_project_id,
                        'Reporter': item['fields']['assignee']['displayName'],
                        'sprint_name': sprint_info.get('name', ''),
                        'sprint_start_date': sprint_info.get('startDate', ''),
                        'sprint_completed_date': sprint_info.get('completeDate', ''),
                        'sprint_state': sprint_info.get('state', ''),
                        'issue_key': item['key'],
                        'issue_summary': item['fields'].get('summary', ''),
                        'issue_status': issue_status,
                        'story_points': item['fields'].get('customfield_10016', None),
                        'priority_type': item['fields']['priority']['name'],
                        'issue_type': item['fields']['issuetype']['name'],
                        'time_spent': item['fields'].get('timespent', None)
                    })
                else:
                    result.append({
                        'projectID': current_project_id,
                        'Reporter': item['fields']['assignee']['displayName'],
                        'sprint_name': '',
                        'sprint_start_date': '',
                        'sprint_completed_date': '',
                        'sprint_state': '',
                        'issue_key': item['key'],
                        'issue_summary': item['fields'].get('summary', ''),
                        'issue_status': issue_status,
                        'story_points': item['fields'].get('customfield_10016', None),
                        'priority_type': item['fields']['priority']['name'],
                        'issue_type': item['fields']['issuetype']['name'],
                        'time_spent': item['fields'].get('timespent', None)
                    })
        return result
    except requests.exceptions.HTTPError as errh:
        raise
    except requests.exceptions.RequestException as err:
        raise

def load_credentials():
    with open('credentials.json') as f:
        return json.load(f)

def mysql_connection():
    credentials = load_credentials()
    params = {
        'host': credentials.get('DB_HOST', ''),
        'user': credentials.get('DB_USER', ''),
        'password': credentials.get('DB_PASSWORD', ''),
        'database': credentials.get('DB_DATABASE', ''),
        'port': int(credentials.get('DB_PORT', 3306))
    }
    return mysql.connector.connect(**params)

def get_jira_credentials(host, database, user, password, pod_id, c_id):
    mails = []
    try:
        connection = mysql.connector.connect(
            host=host,
            database=database,
            user=user,
            password=password
        )
        if connection.is_connected():
            cursor = connection.cursor()
            query = '''SELECT
                        p.id as pod_id,
                        pc.candidateID as candidate_id,
                        pc.jira_username AS podcandidate_jira_username,
                        at.Jira_URL AS auth_tokens_Jira_URL,
                        at.Jira_admin_login_email_address AS auth_tokens_Jira_admin_login_email_address,
                        at.Jira_admin_API_Token AS auth_tokens_Jira_admin_API_Token,
                        at.Jira_Kanben_Project_name AS auth_tokens_Jira_Kanben_Project_name
                    FROM
                        PodCandidates AS pc
                    INNER JOIN
                        pods AS p ON pc.podID = p.id
                    INNER JOIN
                        Authorization_Tokens AS at ON p.ART_id = at.ART_id WHERE p.id = %s and pc.candidateID = %s'''
            params = (pod_id, c_id)
            cursor.execute(query, params)
            myresult = cursor.fetchall()
            for row in myresult:
                jira_username = row[2]
                jira_url = row[3]
                jira_admin_login_email_address = row[4]
                jira_admin_api_token = row[5]
                jira_kanben_project_name = row[6]
                return jira_username, jira_url, jira_admin_login_email_address, jira_admin_api_token, jira_kanben_project_name
        else:
            return None, None, None, None, None
    except Exception as e:
        return None, None, None, None, None
    finally:
        cursor.close()
        if connection.is_connected():
            connection.close()

def run_code(event=None, context=None):
    outputData = []
    outputData1 = {}
    try:
        credentials = load_credentials()
        host = credentials.get('DB_HOST', '')
        database = credentials.get('DB_DATABASE', '')
        user = credentials.get('DB_USER', '')
        password = credentials.get('DB_PASSWORD', '')
        pod_id = event.get('pod_id')
        c_id = event.get('c_id')

        jira_username, jira_url, jira_admin_login_email_address, jira_admin_api_Token, jira_kanben_project_name = get_jira_credentials(
            host, database, user, password, pod_id, c_id)

#         issue_status_filter = 'To Do'
        # project_id = 'G10'  # Replace with your actual project ID
#         issue_type_filter = 'Bug'

        assignee_jql = f'assignee = "{jira_username}"'
#         assignee_jql = ' OR '.join([f'assignee = "{assignee}"' for assignee in jira_usernames])
        api_url = f"{jira_url}/rest/api/3/search?jql={assignee_jql}"
        auth_header = f"Basic {base64.b64encode(f'{jira_admin_login_email_address}:{jira_admin_api_Token}'.encode()).decode()}"

        jira_issues_data = fetch_jira_issues_with_project_id(api_url, auth_header, jira_kanben_project_name)
        for item in jira_issues_data:
            if item['sprint_state'] == '':
                item['sprint_state'] = 'closed'

        filtered_bug_issues = [issue for issue in jira_issues_data if issue['issue_type'] == 'Bug']
        open_defects = [issue for issue in filtered_bug_issues if issue['issue_status'] in ['To Do', 'In Progress']]
#         open_defects = [issue for issue in filtered_bug_issues if issue['issue_status'] == 'To Do']
        resolved_defects = [issue for issue in filtered_bug_issues if issue['issue_status'] == 'Done']
        open_user_defects = len(open_defects)
        resolved_user_defects = len(resolved_defects)

        outputData.append({
            'Reporter': jira_username,
            'open_defects': open_user_defects,
            'resolved_defects': resolved_user_defects
        })
        outputData1['reporters'] = []
        return {
            'statusCode': 200,
            'body': json.dumps({'reporters': outputData})
        }

    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps(f"An unexpected error occurred: {e}")
        }

    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps(outputData1)
        }
        
    except ValueError as ve:
        return {
            'statusCode': 500,
            'body': json.dumps(outputData1)
        }
        
    except TypeError as te:
        return {
            'statusCode': 500,
            'body': json.dumps(outputData1)
        }

def lambda_handler(event, context):
    return run_code(event, context)