# Bulk Create Users in Auth0

This notebook uses the `Create import users job` on the Auth0 Management API to create multiple users in bulk for testing.

## References:

- [Bulk User Imports](https://auth0.com/docs/manage-users/user-migration/bulk-user-imports)
- [Create User Import Job](https://auth0.com/docs/api/management/v2/jobs/post-users-imports)
- [Bulk User Import JSON Schema](https://auth0.com/docs/manage-users/user-migration/bulk-user-import-database-schema-and-examples)


In [None]:
# Import necessary libraries for user creation and API interaction.
import os
import json
import bcrypt  # auth0 default encryption
import requests

## Password hashing function

We will hash the password using Auth0's recommending hashing algorithm. [Read More](https://auth0.com/docs/manage-users/user-migration/bulk-user-import-database-schema-and-examples#custom-password-hash)

You can submit one of the following (from the docs):

> `password_hash` string: **Default _bcrypt_ hashed password** Hashed password for the user's connection.
> When users are created, Auth0 uses bcrypt to secure the password.
> Importing hashed passwords lets users keep their passwords for a smoother experience.
> Compatible passwords should be hashed using bcrypt `$2a$` or `$2b$` and have 10 saltRounds.
> This property can only be provided when the user is first imported and cannot be updated later.
>
> `custom_password_hash` object: A more generic way to provide the user's password hash.
> This can be used instead of the password_hash field when the user's password hash was created with an alternate algorithm.
> During the bulk import process, you can update the custom_password_hash if the user did not login using the initially imported custom_password_hash.

Below shows how you would implement both of these hashing functions and import them.


In [None]:
# Hash a password using bcrypt with 10 salt rounds.
def hash_password(password):
    # Generate a salt
    salt = bcrypt.gensalt(rounds=10)
    # Hash the password with the salt
    hashed_password = bcrypt.hashpw(password.encode("utf-8"), salt)
    return hashed_password.decode("utf-8")


# Create a user dictionary with hashed password and other details.
def create_user(prefix="user", domain="example.com", password=None, i=1):
    username = f"{prefix}_{i:02d}"
    if password is None:
        password = username
    return {
        "email": f"{username}@{domain}",
        "email_verified": True,
        "password_hash": hash_password(password),
        "name": f"{username}@{domain}",
        "nickname": username,  # optional
        "username": username,
    }


# We will use the bcrypt hashing algorithm to hash the password.

# Simple SHA-1 hash - just shown for reference
# def sha1_password(password):
#     # Hash the password using SHA-1
#     sha1_hash = hashlib.sha1(password.encode('utf-8')).hexdigest()
#     return sha1_hash

# def create_user_with_sha1(prefix="user", domain="example.com", password=None, i=1):
#     username = f"{prefix}_{i:02d}"
#     if password is None:
#         password = username
#     return {
#         "email": f"{username}@{domain}",
#         "email_verified": True,
#         # "password_hash": hash_password(password),
#         "custom_password_hash": {
#             "algorithm": "sha1",
#             "hash": {
#                 "value": sha1_password(password),
#                 "encoding": "hex",
#             }
#         },
#         "name": f"{username}@{domain}",
#         "nickname": username,  # optional
#         "username": username,
#     }

## User account generation

Generate pre-defined list of users and writie it out to a JSON file. The JSON file will be used to import the users into Auth0. _this could likely be improved to use a file stream rather than writing a real file_


In [None]:
# Create a list of users with the same prefix and domain.
def create_users(prefix, domain="example.com", N=5, password=None):
    return [create_user(prefix, domain, password, i) for i in range(1, N + 1)]


# Create a JSON file with user data for bulk import.
def create_user_file(prefix, domain="example.com", N=5, password=None, id=None):
    """
    Generate a JSON file containing user data.
    Pass a value to `id` to create a single user. Useful for testing.
    Set the password, otherwise it will default to the username (i.e., prefix_01, ...).
    """
    if id is None:
        users = create_users(prefix, domain, N, password)
    else:
        users = [create_user(prefix, domain, password, id)]

    filename = f"{prefix}_users.json"
    with open(filename, "w") as f:
        json.dump(users, f, indent=4)
    print(f"Created {filename} with {len(users)} users.")
    return filename

In [None]:
# Retrieve the connection ID for a specific Auth0 connection.
# API Reference: https://auth0.com/docs/api/management/v2/connections/get-connections
def get_connection_id(
    domain, access_token, connection_name="Username-Password-Authentication"
):
    url = f"https://{domain}/api/v2/connections"
    headers = {"Authorization": f"Bearer {access_token}"}
    params = {"strategy": "auth0"}

    response = requests.get(url, headers=headers, params=params)
    connections = response.json()
    return (
        next(conn["id"] for conn in connections if conn["name"] == connection_name),
        response.json(),
    )

In [None]:
# Start a bulk user import job in Auth0.
# API Reference: https://auth0.com/docs/api/management/v2/jobs/post-users-imports
def start_import_job(filename, connection_id, domain, access_token, external_id=None):
    url = f"https://{domain}/api/v2/jobs/users-imports"
    headers = {"Authorization": f"Bearer {access_token}"}

    with open(filename, "rb") as f:
        files = {
            "users": (filename, f, "application/json"),
            "connection_id": (None, connection_id),
            "send_completion_email": (None, "false"),
        }

        if external_id:
            files["external_id"] = (None, external_id)

        response = requests.post(url, headers=headers, files=files)

    if response.status_code == 200 or response.status_code == 202:
        job = response.json()
        print("Import job started successfully.")
        return job
    else:
        print("❌ Error during users-imports:")
        print(response.status_code, response.text)
    return response

In [None]:
# Check the status of a bulk user import job in Auth0.
# API Reference: https://auth0.com/docs/api/management/v2/jobs/get-jobs-by-id
def check_job_status(job, connection_id, domain, access_token, sleep=5):
    url = f"https://{domain}/api/v2/jobs/{job['id']}"
    headers = {"Authorization": f"Bearer {access_token}"}

    pending = True
    while pending:
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            job_status = response.json()
            pending = job_status["status"] == "pending"
            if pending:
                print("Waiting for job to complete... (5 seconds)")
                time.sleep(5)
        else:
            print("❌ Error during job status check:")
            print(response.status_code, response.text)
            return response

    print(f"Job id: {job_status['id']}")
    print(f"Job status: {job_status['status']}")
    print(f"Job summary: {job_status['summary']}")
    return job_status

## Setup Auth0 Connection to the Management API

[Management Api Access Tokens](https://auth0.com/docs/secure/tokens/access-tokens/management-api-access-tokens)

Follow the instructions for **Testing** at the link above to get an access token.

In short, for development purposes, you can use the token generated for the `API Explorer`.

To find this, go to `Dashboard` > `Applications` > `APIs` > `API Explorer`. There you will see a very long token. Copy that, and past it into the `MGMT_API_ACCESS_TOKEN ` variable below. **NOTE**: This token expires in 24 hours, and has **FULL** access. DO NOT MAKE THIS TOKEN PUBLIC.


In [None]:
DOMAIN = ""  # usually something like dev-{{something long}}.us.auth0.com
MGMT_API_ACCESS_TOKEN = ""  # usually something very long

# Grab the connection to the database
CONNECTION_ID, response = get_connection_id(
    DOMAIN, MGMT_API_ACCESS_TOKEN, connection_name="Username-Password-Authentication"
)

## Create Fake User List


In [None]:
FILENAME = create_user_file(prefix="fakeU", domain="test.edu", N=1, id=1)

# with open(FILENAME, 'r') as f:
#     print(f.read())

In [None]:
job = start_import_job(
    filename=FILENAME,
    connection_id=CONNECTION_ID,
    domain=DOMAIN,
    access_token=MGMT_API_ACCESS_TOKEN,
)

job_status = check_job_status(job, CONNECTION_ID, DOMAIN, MGMT_API_ACCESS_TOKEN)