In [3]:
import logging
import requests
import csv
from dotenv import load_dotenv
import os
import sys
from pprint import pformat,pprint
import json
from datetime import datetime

import warnings
warnings.filterwarnings('ignore', message='Unverified HTTPS request')

# https://towardsdatascience.com/progress-bars-in-python-4b44e8a4c482?gi=6a0158a5a16e
from tqdm.auto import tqdm

logging_level_dict = {
    'DEBUG': logging.DEBUG,
    'INFO': logging.INFO,
    'ERROR': logging.ERROR,
    'WARNING': logging.WARNING,
    'CRITICAL': logging.CRITICAL
}

headers = {'Content-type': 'application/json'}

def get_jira_users(jira_logger, jira_base_url, auth, group_name, include_inactive_users, start_at):

    users = []
    user_count = 0
    pbar = tqdm(total=100)

    resp = requests.get(f'{jira_base_url}rest/api/2/group/member?groupname={group_name}&includeInactiveUsers={include_inactive_users}&startAt={start_at}', auth=auth, verify=False)    
    if resp.status_code == 200:
        total = resp.json()['total']
        start_at = resp.json()['startAt']
        current_result_count = len(resp.json()['values'])

        while (current_result_count > 0):
            jira_logger.debug(f'Total - {total}, Starts At - {start_at}, Current Result Count - {current_result_count}')
            current_user_set = resp.json()['values']
            for i in range(current_result_count):
                jira_logger.debug(f"{current_user_set[i]['name']},{current_user_set[i]['emailAddress']},{current_user_set[i]['displayName']},{current_user_set[i]['active']}")
                users.append(f"{current_user_set[i]['name']},{current_user_set[i]['emailAddress']},{current_user_set[i]['displayName']},{current_user_set[i]['active']}")
                user_count += 1
            
            percent = (current_result_count/total)*100
            pbar.update(percent)

            # Get next set of results if available.
            start_at = start_at + current_result_count            
            resp = requests.get(f'{jira_base_url}rest/api/2/group/member?groupname={group_name}&includeInactiveUsers={include_inactive_users}&startAt={start_at}', auth=auth, verify=False)    
            current_result_count = len(resp.json()['values'])
    else:
        jira_logger.error(f"Response Code: {resp.status_code}, Response Message: {resp.text}")

    jira_logger.info(f"Total {user_count} users found in group - '{group_name}'")    
    pbar.close()

    return users

def update_jira_username(jira_logger, jira_base_url, auth, username, new_username_value):

    is_update_successful = True
    update_status_code = 200
    update_status_message = ""
        
    # name is username attribute in Jira Internal Directory
    json_body = {
        'name' : new_username_value
    }
    
    resp = requests.put(f'{jira_base_url}rest/api/2/user?username={username}', data=json.dumps(json_body), auth=auth, headers=headers, verify=False)
    if resp.status_code != 200:
        is_update_successful = False
        update_status_code = resp.status_code
        update_status_message = resp.json()["errors"]["active"]
        jira_logger.error(f"Response Code: {update_status_code}, Response Message: {update_status_message}")
    
    jira_logger.info(f"{username} - update status: {is_update_successful}")
    return is_update_successful, update_status_code, update_status_message

# update user name with value from emailAddress
def update_jira_usernames(jira_logger, jira_base_url, auth, jira_env, group_name, user_dict):
    update_operation_status_list = []
    update_operation_status_list.append("username,Is Update Successful?, Error Details")

    with open(f"{group_name}_{jira_env}.group_users_update_execution.csv", "a") as output_csvfile:

        output_csvfile.writelines("\n\n")
        output_csvfile.writelines(datetime.now().strftime("%d/%b/%Y %H:%M:%S") + "\n\n")
        
        for username in user_dict:
            #if username is already updated to email, skip this user!
            if username != user_dict[username]:
                jira_logger.info(f"Update username from {username} to {user_dict[username]}")
                update_status_info_tuple = update_jira_username(jira_logger, jira_base_url, auth, username, user_dict[username])
                jira_logger.debug(f"{username} - {update_status_info_tuple[0]}")
                update_operation_status_list.append(username + "," + str(update_status_info_tuple[0]) + "," + update_status_info_tuple[2])
                output_csvfile.writelines(username + "," + str(update_status_info_tuple[0]) + "," + update_status_info_tuple[2]+ "\n")

# Update usernames for given group
# Take backup of current jira users.
# Then update usernames to emails for each jira user.
def process_jira_username_updates(jira_logger, jira_base_url, auth, jira_env, group_name, include_inactive_users, company_domain):
    start_at = 0

    jira_logger.info(f"Working in {jira_env} environment")
    jira_logger.info("Reteriving current usernames to store in backup file!")
    users = get_jira_users(jira_logger, jira_base_url, auth, group_name, include_inactive_users, start_at)
    
    if users:
        jira_logger.info(f"Taking Backup of users for group {group_name}!")
        # First write current usernames into backup file in case we need them.
        with open(f"{group_name}.{jira_env}.csv", 'w') as filehandle:
            filehandle.writelines("username,email,display_name,is_user_active\n")
            filehandle.writelines("%s\n" % user for user in users)
        jira_logger.info(f"Backup of users for group {group_name} is complete and stored in {group_name}.{jira_env}.csv!")

        jira_logger.info("Getting dict {username, email} ready for usernames conversion.")
        username_email_dict = {}
        non_company_accounts_dict = {}
        for user in users:
            #Also we will skip username as which we will be running this script, so that subsequent calls will not fail!
            username = user.split(',')[0].strip().lower()
            email = user.split(',')[1].strip().lower()
            # We don't want to update logged in User's username otherwise script will fail.
            # we will need to update username to email to make it working agian!
            if username not in [os.getenv('USERID')]:
                username_email_dict[username] = email
                
            if email and email.split('@')[1] != company_domain:
                non_company_accounts_dict[username] = email
                
            if not email:
                non_company_accounts_dict[username] = None

        update_jira_usernames(jira_logger, jira_base_url, auth, jira_env, group_name, username_email_dict)
        pprint(f"Non {company_domain} accounts are:")
        pprint(non_company_accounts_dict)
        print("----")

# Add users to given jira group
def add_users_to_group(jira_logger, jira_base_url, auth, group_name, user_list):
    added_user_list = []
    errored_user_list = []
    
    for user in user_list:
        print(f'Adding user {user} to group {group_name}')
        json_body = {
            "name": user.lower().strip()
        }
        
        resp = requests.post(f'{jira_base_url}rest/api/2/group/user?groupname={group_name}', data=json.dumps(json_body), auth=auth, headers=headers)
        if resp.status_code != 201:
            update_status_code = resp.status_code
            update_status_message = resp.json()["errorMessages"][0]
            jira_logger.error(f"Response Code: {update_status_code}, Response Message: {update_status_message}")
            errored_user_list.append(user)
        else:
            added_user_list.append(user)
    return added_user_list, errored_user_list
        
# Remove users from groups
def remove_users_from_group(jira_logger, jira_base_url, auth, jira_env, group_name, user_list):
    remove_users_status_list = []
    remove_users_status_list.append("username, Is Removal Successful?, Error Details")

    with open(f"{group_name}_{jira_env}_remove_users_execution.csv", "a") as output_csvfile:

        output_csvfile.writelines("\n\n")
        output_csvfile.writelines(datetime.now().strftime("%d/%b/%Y %H:%M:%S") + "\n\n")
        
        for username in user_list:
            jira_logger.info(f"Remove username {username} from {group_name}")
            resp = requests.delete(f'{jira_base_url}rest/api/2/user?username={username}&groupname={group_name}', auth=auth, headers=headers, verify=False)
            if resp.status_code != 200:
                is_update_successful = False
                update_status_code = resp.status_code
                update_status_message = resp.json()["errors"]["active"]
                jira_logger.error(f"Response Code: {update_status_code}, Response Message: {update_status_message}")
            update_operation_status_list.append(username + "," + str(update_status_info_tuple[0]) + "," + update_status_info_tuple[2])
            output_csvfile.writelines(username + "," + str(update_status_info_tuple[0]) + "," + update_status_info_tuple[2]+ "\n")
    
# Create new users in Jira
def create_users(jira_logger, jira_base_url, auth, jira_env, new_users_csv_file):

    created_user_list = []
    with open(new_users_csv_file) as csv_file:
        csv_reader = csv.reader(csv_file, delimiter=',')

        line_count = 0
        errored_users = ""

        for row in csv_reader:
            if line_count == 0:
                print(f'Column names are {", ".join(row)}')
                line_count += 1
            else:
                firstname = row[0]
                lastname = row[1]
                email = row[2]
                password = row[3]

                line_count += 1
                try:
                    email = email.strip().lower()
                    print(f'\t{firstname},{lastname},{email}')
                    new_user_json = {
                        "name" : email,
                        "password": password,
                        "emailAddress": email,
                        "displayName": f"{firstname} {lastname}",
                        "applicationKeys": ["jira-core"]
                    }
                    response = requests.post(f'{jira_base_url}rest/api/2/user', data=json.dumps(new_user_json), auth=auth, headers=headers, verify=False)
                    response.raise_for_status()
                    created_user_list.append(email)
                except requests.exceptions.HTTPError as jiraerror:
                    print(email + " ,", f"status code: {jiraerror.response.status_code}, ", jiraerror.response.text)
                    errored_users += f"{email},"
                
    print()
    print(f'Processed {line_count} lines.')
    print(f'Total Users created {len(created_user_list)}')
    print(f'Errored Users: {errored_users}')
    return created_user_list

def do_logger_setup():
    jira_logger = logging.getLogger(__name__)
    # Following check is necessary otherwise everytime we run this in Jupyter lab Cell, new handler is getting added resulting in duplicate logs printed!
    if not jira_logger.handlers:
        jira_logger.setLevel(logging_level_dict[os.getenv('LOG_LEVEL')])

        file_handler = logging.FileHandler(os.getenv('LOG_FILE'))
        file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - line %(lineno)d - %(message)s'))

        out_hdlr = logging.StreamHandler(sys.stdout)
        out_hdlr.setFormatter(logging.Formatter('line %(lineno)d - %(message)s'))

        jira_logger.addHandler(out_hdlr)
        jira_logger.addHandler(file_handler) 
    return jira_logger
    
def main():

    load_dotenv(override=True)
    jira_env = os.getenv('JIRA_ENV')
    company_domain = os.getenv('COMPANY_DOMAIN')

    # Setup Logger to display debugging information
    jira_logger = do_logger_setup()
    
    auth = (os.getenv(f'{jira_env}_USERID'), os.getenv(f'{jira_env}_PASSWORD'))
    jira_base_url = os.getenv(f'{jira_env}_JIRA_ENV_BASE_URL')
    
    #process_jira_username_updates(jira_logger, jira_base_url, auth, jira_env,'jira-software-users', 'true')
    new_user_list = create_users(jira_logger, jira_base_url, auth, jira_env, "newusers-test.csv")
    (users_added_to_group, errored_users) = add_users_to_group(jira_logger, jira_base_url, auth, "jira-software-users", new_user_list)
    pprint(users_added_to_group)
        
if __name__ == '__main__':
    main()

Column names are First Name [Required], Last Name [Required], Email Address [Required], Password [Required]
	Je-han,Yang,jyang@nuro.ai
	Drew,McPherson,dmcpherson@nuro.ai
dmcpherson@nuro.ai , status code: 400,  {"errorMessages":[],"errors":{"username":"A user with that username already exists."}}
	Emmanuel,Amponsah,eamponsah@nuro.ai
eamponsah@nuro.ai , status code: 400,  {"errorMessages":[],"errors":{"username":"A user with that username already exists."}}

Processed 4 lines.
Total Users created 1
Errored Users: dmcpherson@nuro.ai,eamponsah@nuro.ai,
Adding user jyang@nuro.ai to group jira-software-users
['jyang@nuro.ai']
