# Introduction

Welcome to our code examples for the API! These examples show how to use different API endpoints to retrieve data, create calls and reserve time slots.

You can find the latest API documentation at: [ak API Documentation - Doctor Consultation ](https://arztkonsultation.de/ak-api-dokumentation/).

We have multiple APIs. You probably mostly use the APIv2 (aka User API) and the Client API.

If you use the APIv2 you need to authenticate with a users email-adress and password to receive a Baerer token. Usually we recommend to use an general admin account / system account to communcate with our api.  

If you use the Client API you need a Client-ID and a password for authentication. If you don't have these data please concat us.


**Please note that you may need to customise the API URL.

## How do I use the Jupyter Notebook?

You can execute this code directly in Google Colab. For that, you need to adapt the code (e.g. API URL, E-mail-adresses and password, user ids)


Please be aware that you should change the hostname to the URL we've provided, so that you can use the code examples properly.

# Authentication and token retrieval



## APIv2

First, a user must authenticate themselves via email address and password (e.g. the admin user) in order to receive a token for subsequent API calls.

The following example sends a POST request for authentication and returns the access token.



In [None]:
#Authentication via email and password


email="Email address of the user who is to make the API call (e.g. doctor or admin)"
password="Password of the account"

# @title
import requests

#Change hostname!
hostname='app-sandbox.arztkonsultation.de'

def fetch(method, path, data=None, token=None):
    url = f'https://{hostname}{path}'
    headers = {
        'Content-Type': 'application/json',
        'Accept': 'application/json'
    }
    if token:
        headers['Authorization'] = f'Bearer {token}'

    response = requests.request(method, url, json=data, headers=headers)

    if response.status_code >= 400:
        raise Exception(f'Request failed with status code {response.status_code}: {response.text}')

    return response.json()

def login():
    c = {'email': email, 'password': password}
    result = fetch('POST', '/api/auth', c)
    return result['access_token']

token = login()

## Client API



In [None]:
# Authentication via client ID and password

client_id = 'your_client_id'  # Replace this with your client ID
pw = 'your_password'           # Replace this with your password

# @title
import requests

# Change Hostname!
hostname = 'app-sandbox.arztkonsultation.de'

def fetch(method, path, data=None, token=None):
    url = f'https://{hostname}{path}'
    headers = {
        'Content-Type': 'application/json',
        'Accept': 'application/json'
    }
    if token:
        headers['Authorization'] = f'Bearer {token}'

    response = requests.request(method, url, json=data, headers=headers)

    if response.status_code >= 400:
        raise Exception(f'Request failed with status code {response.status_code}: {response.text}')

    return response.json()

def login():
    c = {'client_id': client_id, 'pw': pw}
    result = fetch('POST', '/api/v2/client', c)
    return result['access_token']

token = login()
print("Access Token:", token)


# Users


## Create user with the role ‘Doctor’

The script creates a user account with the role ‘Doctor’.


You can find and overview if the user roles in our API documentation.

### Without notifications

A user with the role ‘Doctor’ is created and all e-mail & SMS notifications are deactivated.

In [None]:
def create_user(token, name, email, pw):
    companies = fetch('GET', '/api/v2/companies', token=token)
    company_id = companies['data'][0]['id']
    user_data = {
        "name": name,
        "email": email,
        "password": pw,
        "company_id": company_id,
        "roles": [2],
        "notification_options": [
            {
              "notification_id": 1001, #Notificattion via E-Mail
              "enabled": False,
            },
            {
              "notification_id": 1002, #Notification via SMS
              "enabled": False,
            },
                ],

            }
    user = fetch('POST', '/api/v2/users', user_data, token=token)
    return user['data']


In [None]:
user=create_user(token, "Full name of the user", "users email adress", "users password")
user

### With notifications

A user with the role ‘Doctor’ is created. The user receives notifications by e-mail when an appointment is created, changed or deleted. All other notifications have been deactivated in this example.

In [None]:
def create_user(token, name, email, pw):
    companies = fetch('GET', '/api/v2/companies', token=token)
    company_id = companies['data'][0]['id']
    user_data = {
        "name": name,
        "email": email,
        "password": pw,
        "company_id": company_id,
        "roles": [2],
        "notification_options": [
            {
                "notification_id": 1001, #Notification by e-mail. Additional e-mail addresses can be stored to which a notification is to be sent, e.g. ‘value’: [‘test@em.ail’, ‘test2@andere.email’,],
                "enabled": True,
            },
            {
               "notification_id": 1002, #Notification by SMS. Additional telephone numbers can be stored to which a notification is to be sent, e.g. ‘value’: [‘+4912345679’,],
                "enabled": False,
            },
            {
                "notification_id": 2001, #Appointment created
                "enabled": True,
             },
             {
                "notification_id": 2002, #Appointment changed
                "enabled": True,
             },
             {
                "notification_id": 2003, ##Appointment deleted
                "enabled": True,
             },
             {
                "notification_id": 3001, #Document received
                "enabled": False,
             },
             {
                "notification_id": 3002, #Document sent
                "enabled": False,
             },
             {
                "notification_id": 3003, #Document deleted
                "enabled": False,
             },
             {
               "notification_id": 3004, #Document expires
                "enabled": False,
             },
             {
               "notification_id": 4001, #Invalid email of the patient
                "enabled": False,
             },
                ],

            }
    user = fetch('POST', '/api/v2/users', user_data, token=token)
    return user['data']

In [None]:
user=create_user(token, "Full name of the user", "users email adress", "users password")
user

## List users

This example retrieves all users and displays the user ID, name and company ID.




In [None]:
def getAllUsers(token):
    response = fetch('GET', '/api/v2/users?per_page=100', token=token)

    if 'data' in response and isinstance(response['data'], list):
        users = response['data']
    else:
        print("Error: Invalid API response")
        return []

    user_data = []
    for user in users:
        role_name = user['roles'][0]['name'] if 'roles' in user and len(user['roles']) > 0 else ''
        user_data.append([
            user.get('id', ''),
            user.get('title', ''),
            user.get('name', ''),
            user.get('company_id', ''),
            user.get('company_name', ''),
            user.get('ousubsidiary_name', ''),
            user.get('oulocation_name', ''),
            user.get('department_name', ''),
            role_name
        ])

    return user_data

# Call the function to retrieve the user information
users_data = getAllUsers(token)

# Table formatting with tabulate
headers = ['ID', 'Title', 'Display Name', 'Company', 'Company Name', 'Ousubsidiary Name', 'Oulocation Name', 'Department Name', 'Role']
print(tabulate(users_data, headers=headers, tablefmt='grid'))

# Calls



## Create Calls as Admin

**Note**: The admin user will be added to the call as participant and will be visible in the participants list durig the call. If you don't want that, you need to get a specific user token (e.g. of the doctor) via the Client API (/api/ext/authenticateUser) and create a call with the users access token.

-------

In the following example, the admin account creates a video call with different scenarios and participants.

The ak users can join the video call either via the CallID and need then to authenticate via email/password, SSO or One-Time-Password (OTP).

If you integrate our SAV via iFrame we highly recommend using OTP (see "Auth via One-Time-Password (OTP)").

**Default Mode is email/password**

*SSO needs to be activated and configured by ak. Please contact us, if you want to use it*

The code outputs the URLs for the stand-alone-video (SAV) for all participants. The SAV can be integrated via iFrame.




### Between one ak user & one TAN user (e.g. doctor to patient)

This code shows how to create a videocall between an ak user (e.g. doctor) and a TAN user (e.g. patient).

Depending on the role, the account that created the call can also be a participant (e.g. if the account has the role ‘doctor’)

In [None]:
import requests
import json
from datetime import datetime


def create_call(token):
    call_data = {
        "duration": 15,
        "tan_username": "TAN-User12",
        "tan_email": "TAN-User2@arztkonsultation.de", # only neccesary if the ak system should send an email notification
        "call_type": "tan", # mandatory, even there is only this call_type
        "additional_callees": [
            {
                "user_id": 59288 #can be omitted if the account for the API call is the ak user who is also to join the call.
            }
        ]
    }

    call = fetch('POST', '/api/v2/calls', call_data, token)

    call_id = call['data']['id']
    tan = next(user['tan'] for user in call['data']['users'] if user['role_name'] == 'tanuser')

    print('Call')
    print(json.dumps(call['data'], indent=2))

    created_at_timestamp = call['data']['created_at']
    created_at = datetime.utcfromtimestamp(created_at_timestamp).strftime('%Y-%m-%d %H:%M:%S')

    participants = call['data']['users']
    print(f"Der Anruf wurde erstellt am: {created_at} UTC")
    print("Teilnehmer:")
    for user in participants:
        print(f"  - Name: {user['user_name']}, Rolle: {user['role_name']}, E-Mail: {user['email']}")

        if user['role_name'] == 'tanuser':
            tan = user['tan']
            patient_url = f"https://{hostname}/video/{call_id}?tan={tan}"
            print(f"  - Link direkt ins Video für {user['user_name']}: {patient_url}")
        else:
            if user['tan']:
                arzt_url = f"https://{hostname}/video/{call_id}?tan={user['tan']}"
                print(f"  - Link direkt ins Video für {user['user_name']}: {arzt_url}")
            else:
                arzt_url = f"https://{hostname}/video/{call_id}"
                print(f"  - Link direkt ins Video für {user['user_name']}: {arzt_url}")

    return {'callId': call_id, 'tan': tan}

# Call the create_call function with the received token
result = create_call(token)
print(result)


### Between *one* ak user & *two* TAN users (e.g. doctor to patient)

This code shows how to create a videocall between an ak user (e.g. doctor) and two TAN users (e.g. patient and relative).

Depending on the role, the account that created the call can also be the participant (e.g. if the account has the role ‘doctor’)

In [None]:
import requests
import json
from datetime import datetime


def create_call(token):
    call_data = {
        "duration": 15,
        "begin": 1727884800,
        "tan_username": "TAN-User1",
        "tan_email": "TAN-User1@arztkonsultation.de", # only neccesary if the ak system should send an email notification
        "call_type": "tan", # mandatory, even there is only this call_type
        "additional_callees": [
            {
                "name": "TAN2",
                "user_id": 59288 #can be omitted if the account for the API call is the ak user who is also to join the call.

            }
        ]
    }

    call = fetch('POST', '/api/v2/calls', call_data, token)

    call_id = call['data']['id']
    tan = next(user['tan'] for user in call['data']['users'] if user['role_name'] == 'tanuser')

    print('Call')
    print(json.dumps(call['data'], indent=2))

    created_at_timestamp = call['data']['created_at']
    created_at = datetime.utcfromtimestamp(created_at_timestamp).strftime('%Y-%m-%d %H:%M:%S')

    participants = call['data']['users']
    print(f"Der Anruf wurde erstellt am: {created_at} UTC")
    print("Teilnehmer:")
    for user in participants:
        print(f"  - Name: {user['user_name']}, Rolle: {user['role_name']}, E-Mail: {user['email']}")

        if user['role_name'] == 'tanuser':
            tan = user['tan']
            patient_url = f"https://{hostname}.arztkonsultation.de/video/{call_id}?tan={tan}"
            print(f"  - Link direkt ins Video für {user['user_name']}: {patient_url}")
        else:
            if user['tan']:
                arzt_url = f"https://{hostname}.arztkonsultation.de/video/{call_id}?tan={user['tan']}"
                print(f"  - Link direkt ins Video für {user['user_name']}: {arzt_url}")
            else:
                arzt_url = f"https://{hostname}.arztkonsultation.de/video/{call_id}"
                print(f"  - Link direkt ins Video für {user['user_name']}: {arzt_url}")

    return {'callId': call_id, 'tan': tan}

# Call the create_call function with the received token
result = create_call(token)
print(result)


### Between *two* ak users and *one* TAN user
In the following example, the admin account creates a video call with one TAN user (‘TAN user1’) and two ak users.

In [None]:
import json
from datetime import datetime

def create_call(token):
    call_data = {
        "duration": 15,
        "begin": 1727884800,
        "tan_username": "TAN-User1",
        "call_type": "tan", # mandatory, even there is only this call_type
        "additional_callees": [
            {
                "user_id": 59288 #If necessary, only a user ID needs to be added if the account for the API call is the ak user who is also to join the call.
            },
            {
                "user_id": 59351
            }
        ]
    }

    call = fetch('POST', '/api/v2/calls', call_data, token)

    call_id = call['data']['id']

    print('Call')
    print(json.dumps(call['data'], indent=2))

    created_at_timestamp = call['data']['created_at']
    created_at = datetime.utcfromtimestamp(created_at_timestamp).strftime('%Y-%m-%d %H:%M:%S')

    participants = call['data']['users']
    print(f"Der Anruf wurde erstellt am: {created_at} UTC")
    print("Teilnehmer:")
    for user in participants:
        print(f"  - Name: {user['user_name']}, Rolle: {user['role_name']}, E-Mail: {user['email']}")
        if user['role_name'] == 'tanuser':
            tan = user['tan']
            patient_url = f"https://{hostname}/video/{call['data']['id']}?tan={tan}"
            print(f"  - Link direkt ins Video für {user['user_name']}: {patient_url}")
            display(IFrame(patient_url, width=800, height=600))
        elif user['role_name'] == 'callee':
            arzt_url = f"https://{hostname}/video/{call['data']['id']}"
            print(f"  - Link direkt ins Video für {user['user_name']}: {arzt_url}")


# Call the create_call function with the received token
create_call(token)


### Between *two* ak users (e.g. doctor consultation)
In the following example, the admin account creates a video call between two ak users.

***However, a TAN user will always be created, due to historical reasons. In this case, you can just ignore it***

The code outputs the URLs for all participants.


In [None]:
import json
from datetime import datetime

def create_call(token):
    call_data = {
        "duration": 15,
        "begin": 1727884800,
        "call_type": "tan", # mandatory, even there is only this call_type
        "additional_callees": [
            {
                "user_id": 59288
            },
            {
                "user_id": 59351
            }
        ]
    }

    call = fetch('POST', '/api/v2/calls', call_data, token)

    call_id = call['data']['id']

    print('Call')
    print(json.dumps(call['data'], indent=2))

    created_at_timestamp = call['data']['created_at']
    created_at = datetime.utcfromtimestamp(created_at_timestamp).strftime('%Y-%m-%d %H:%M:%S')

    participants = call['data']['users']
    print(f"Der Anruf wurde erstellt am: {created_at} UTC")
    print("Teilnehmer:")
    for user in participants:
        print(f"  - Name: {user['user_name']}, Rolle: {user['role_name']}, E-Mail: {user['email']}")
        if user['role_name'] == 'tanuser':
            tan = user['tan']
            patient_url = f"https://{hostname}/video/{call['data']['id']}?tan={tan}"
            print(f"  - Link direkt ins Video für {user['user_name']}: {patient_url}")
        elif user['role_name'] == 'callee':
            arzt_url = f"https://{hostname}/video/{call['data']['id']}"
            print(f"  - Link direkt ins Video für {user['user_name']}: {arzt_url}")


# Call the create_call function with the received token
create_call(token)


## Auth via One-Time-Password (OTP)

his code snippet handles the process of creating a video call and generating a one-time password (OTP) for secure access in the Arztkonsultation system.

	1.	Client Authentication: It first authenticates the client API to obtain a general access token.
	2.	User Authentication: Then, it authenticates a specific user to get a user token.
	3.	Call Creation: Using the user token, it creates a new video consultation call.
	4.	Scoped Token Generation: With the client token, it generates a scoped token (OTP) for accessing the call securely.
	5.	Token Encoding & URL Creation: The scoped token is Base64-encoded and embedded in a secure video call URL.

In [None]:
# Authentication via client ID and password

import requests
import base64

client_id = 'your_client_id'  # Replace this with your client ID
pw = 'your_client_password'           # Replace this with your password

# Change Hostname!
hostname = 'app-sandbox.arztkonsultation.de'  # Replace this with your hostname

def fetch(method, path, data=None, token=None):
    url = f'https://{hostname}{path}'
    headers = {
        'Content-Type': 'application/json',
        'Accept': 'application/json'
    }
    if token:
        headers['Authorization'] = f'Bearer {token}'

    response = requests.request(method, url, json=data, headers=headers)

    if response.status_code >= 400:
        raise Exception(f'Request failed with status code {response.status_code}: {response.text}')

    return response.json()

def login():
    c = {'client_id': client_id, 'pw': pw}
    result = fetch('POST', '/api/v2/client', c)
    return result['token']

token = login()
print("Access Token:", token)

# Authenticate User
def authenticate_user(token, email):
    auth_data = {"email": email}
    result = fetch('POST', '/api/ext/authenticateUser', auth_data, token)
    return result["access_token"]

email = "organizers e-mail adress"  # Replace with actual email
user_access_token = authenticate_user(token, email)
print("User Access Token:", user_access_token)

# Create Call
def create_call(token):
    call_data = {
        "duration": 15,
        "tan_username": "TAN-User12",
        "tan_email": email,
        "tan_data_agreed": True,
        "call_type": "tan",
        "additional_callees": [
            {
                "user_id": 61261
            }
        ]
    }
    result = fetch('POST', '/api/v2/calls', call_data, token)
    return result["data"]["id"]  # Assuming the response contains a call ID

call_id = create_call(user_access_token)
print("Created Call ID:", call_id)

# Generate Scoped Token
def create_scoped_token(token, email, call_id):
    scope_data = {
        "email": email,
        "valid_for": 60,
        "scope_type": "call",
        "scope_id": call_id
    }
    result = fetch('POST', '/api/ext/createScopedToken', scope_data, token)
    return result["token"]

scoped_token = create_scoped_token(token, email, call_id)
print("Scoped Token:", scoped_token)

# Encode Scoped Token in Base64
encoded_scoped_token = base64.b64encode(scoped_token.encode()).decode()
print("Base64 Encoded Scoped Token:", encoded_scoped_token)

# Generate Video Call URL
video_call_url = f'https://{hostname}/video/{call_id}?tid={encoded_scoped_token}'
print("Video Call URL:", video_call_url)


Access Token: eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJhdWQiOiIxMTMiLCJqdGkiOiJhNGNhZTJmOGY5YTgzMTU3N2U2NWJmOTBiMWE3YzY4MGM2MDQ5OWYwZjY2ZTMwYjQ4YzgzMGI3OGZiZDhkZGE1MDFlYjExNTYxODc1NGVlOCIsImlhdCI6MTc0MTA5MTI1My43NjcxNDksIm5iZiI6MTc0MTA5MTI1My43NjcxNTQsImV4cCI6MTc0MTA5NDg1My43NTMxOTksInN1YiI6IiIsInNjb3BlcyI6WyIqIl0sInBlcm1pc3Npb24iOlsid2ViaG9va3MiXX0.clbIEhKrvMOE5mhnsWGM9bHdDJfi-jT5cFVR0a6kAUQv_c8IcKvrgJdF-F5GfIthPk1yhZLSDYePFD9OztFlfUyBjOSkrP6f-c6or4FVGinYc7GcYBi7SJkJxMtjAaw1qkWKokOYE4HRiqJ0ETQ1-TLRH8fQcKH35FUv0pl95peMrc7zA5wriGLuUq3xzPw6iaEAqyoT3a2GxC5esWGgF1Cdau8OgtoqChacnkxdOuv51ba1GIbedXTktkwCEQOXtHaf8NG3PiRMhqBIO_CyeIW9RG_ibwVSYtieL07UsNntdsOBREadNbEkkd5lLwBJjd_xCeUuOgPYWT2uSm8IIkJC-tY-s70POhIkKQ0t0XlZWJxenwpX2Y_KB0GYtCqSKW682euFWrwnPQ94U8cgkcQcPlBve2NMwl3lZSFQIDu2JWjgJjlHOyfQwJ4fq5C2i89FiwRib88GtgIAHrBRXBFzasJpOOq4N75nlurpCu5_PQawYGO-0WIkEuefBCWKmz08TqwOOEdoV5xGbIl28x4e98BJgegxzdiMgyFeEzbR8xXypGArl-QcP8arCWJuQ3FRSbetn_XR4jHUoLOlG38GGttiRJ30-YGLNbJ_Rz1HHa2JCs9AloBoqb7Xyj0VJO6JPpd1W

## List Calls

In [None]:
import pandas as pd
from datetime import datetime, timezone

# Function for retrieving all past calls with dynamic pagination
def getPastCalls(token):
    page = 1
    all_calls = []
    per_page = 500

    current_time = datetime.now(timezone.utc).timestamp()  # Current time in UTC timestamp
    total_pages = None  # Initialize total_pages to None

    while True:
        # API call to fetch calls with pagination
        calls_response = fetch('GET', f'/api/v2/calls?per_page={per_page}&page={page}', token=token)

        if calls_response is None or 'data' not in calls_response:
            print("Error: No valid API response received")
            break

        calls = calls_response.get('data', [])

        # Process each call entry
        for call in calls:
            # Filter only calls that have a "begin" timestamp in the past
            if call.get('begin') and int(call['begin']) < current_time:
                created_at = datetime.utcfromtimestamp(int(call['created_at'])).strftime('%Y-%m-%d %H:%M:%S')
                updated_at = datetime.utcfromtimestamp(int(call['updated_at'])).strftime('%Y-%m-%d %H:%M:%S')
                begin = datetime.utcfromtimestamp(int(call['begin'])).strftime('%Y-%m-%d %H:%M:%S') if call.get('begin') else None

                # Handle None duration values by setting them to 0
                duration = int(call.get('duration', 0) or 0)

                # Process each user in the call (loop through all users)
                for user_info in call.get('users', []):
                      all_calls.append({
                          'call_id': call['id'],
                          'created_at': created_at,
                          'updated_at': updated_at,
                          'user_name': user_info.get('user_name', ''),
                          'begin': begin,
                          'duration': duration,
                          'status': call.get('status', ''),
                          'company_id': user_info.get('company_id', ''),
                          'tan': user_info.get('tan', ''),
                          'oulocation_id': user_info.get('oulocation_id', ''),
                          'department_name': user_info.get('department_name', ''),
                          'user_id': user_info.get('id', '')
                      })


        # Fetch the total number of pages from the API response metadata
        if total_pages is None and 'meta' in calls_response:
            total_pages = calls_response['meta'].get('last_page', 1)

        # Check if we've processed all pages
        if page >= total_pages:
            break

        # Go to the next page
        print(f"Fetched page {page}/{total_pages}")
        page += 1

    return all_calls

# Main function to retrieve past calls and display them in a table
def main():
    token = login()
    if not token:
        print("Error: No valid token received")
        return

    # Get all past calls for all users
    past_calls = getPastCalls(token)

    # Load calls into a DataFrame
    df_calls = pd.DataFrame(past_calls)

    # Display the DataFrame as a table
    from IPython.display import display
    display(df_calls)

# Execute the main function
main()


## Delete Call

A call can only be deleted if it has not yet been executed

In [None]:
def delete_call(call_id, token):
    path = f'/api/v2/calls/{call_id}'
    url = f'https://{hostname}{path}'
    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
    }

    response = requests.delete(url, headers=headers)

    if response.status_code == 204:
        print(f"Der Anruf mit der ID {call_id} wurde erfolgreich gelöscht.")
    else:
        print(f"Fehler beim Löschen des Anrufs: {response.status_code}")
        print(response.text)
        raise Exception(f'Fehler beim Löschen des Anrufs: {response.status_code} {response.text}')

# Call the delete_call function with a call ID and token
call_id = '23666'  # Replace this with the desired call ID
delete_call(call_id, token)

# Calllogs

Link to API-Doc: https://arztkonsultation.de/wp-content/uploads/2024/10/2024-10-01-ak-APIv2-Dokumentation.html#tag/Calllogs

## Retrieve call logs

This example retrieves the call logs exclusively via the API endpoint ‘/api/v2/calllogs’. The users are retrieved in advance via ‘/api/v2/users’.


In [None]:
def get_call_logs(token):
    # Retrieve call logs
    return fetch('GET', '/api/v2/calllogs', token=token)

# Main execution
try:
    token = login()
    call_logs = get_call_logs(token)
    print(call_logs)
except Exception as e:
    print(f'An error occurred: {e}')

## Extended call logs

This example retrieves the call logs via the API endpoint ‘/api/v2/calllogs’. In addition, further information about the users and calls is retrieved via ‘/api/v2/users’ and ‘/api/v2/calls’ and collated in a table.

In [None]:
import pandas as pd
from datetime import datetime, timezone

# Function for retrieving all past calls with dynamic pagination
def getPastCalls(token):
    page = 1
    all_calls = []
    per_page = 500

    current_time = datetime.now(timezone.utc).timestamp()  # Current time in UTC timestamp
    total_pages = None  # Initialize total_pages to None

    while True:
        # API call to fetch calls with pagination
        calls_response = fetch('GET', f'/api/v2/calls?per_page={per_page}&page={page}', token=token)

        if calls_response is None or 'data' not in calls_response:
            print("Error: No valid API response received")
            break

        calls = calls_response.get('data', [])

        # Process each call entry
        for call in calls:
            # Filter only calls that have a "begin" timestamp in the past
            if call.get('begin') and int(call['begin']) < current_time:
                created_at = datetime.utcfromtimestamp(int(call['created_at'])).strftime('%Y-%m-%d %H:%M:%S')
                updated_at = datetime.utcfromtimestamp(int(call['updated_at'])).strftime('%Y-%m-%d %H:%M:%S')
                begin = datetime.utcfromtimestamp(int(call['begin'])).strftime('%Y-%m-%d %H:%M:%S') if call.get('begin') else None

                # Handle None duration values by setting them to 0
                duration = int(call.get('duration', 0) or 0)

                # Fetch the 'end' time for this call from the /calllogs endpoint
                calllog_response = fetch('GET', f'/api/v2/calllogs?call_id={call["id"]}', token=token)

                # Check if calllog_response is valid and has data
                if calllog_response and 'data' in calllog_response and calllog_response['data']:
                    calllog = calllog_response['data'][0]  # Get the first entry
                    end = datetime.utcfromtimestamp(int(calllog.get('end', 0))).strftime('%Y-%m-%d %H:%M:%S') if calllog.get('end') else None
                else:
                    end = None  # No call log entry found

                # Process each user in the call (loop through all users)
                for user_info in call.get('users', []):
                    all_calls.append({
                        'call_id': call['id'],
                        'created_at': created_at,
                        'updated_at': updated_at,
                        'user_name': user_info.get('user_name', ''),
                        'begin': begin,
                        'end': end,
                        'duration': duration,
                        'status': call.get('status', ''),
                        'company_id': user_info.get('company_id', ''),
                        'tan': user_info.get('tan', ''),
                        'oulocation_id': user_info.get('oulocation_id', ''),
                        'department_name': user_info.get('department_name', ''),
                        'user_id': user_info.get('id', '')
                    })

        # Fetch the total number of pages from the API response metadata
        if total_pages is None and 'meta' in calls_response:
            total_pages = calls_response['meta'].get('last_page', 1)

        # Check if we've processed all pages
        if page >= total_pages:
            break

        # Go to the next page
        print(f"Fetched page {page}/{total_pages}")
        page += 1

    return all_calls

# Main function to retrieve past calls and display them in a table
def main():
    token = login()
    if not token:
        print("Error: No valid token received")
        return

    # Get all past calls for all users
    past_calls = getPastCalls(token)

    # Load calls into a DataFrame
    df_calls = pd.DataFrame(past_calls)

    # Display the DataFrame as a table
    from IPython.display import display
    display(df_calls)

# Execute the main function
main()


# iFrame

This example shows how the video can be integrated via iFrame.

You can find integration instructions at the following link: https://arztkonsultation.de/wp-content/uploads/2024/05/2024-05-15-ak-iFrame-Integrationsanleitung.pdf.pdf

## Docs via CallID


In [None]:
from IPython.display import IFrame

url = "https://app-sandbox.arztkonsultation.de/video/24761
IFrame(url, width=800, height=600)

## TAN user without waiting area

In [None]:
from IPython.display import IFrame

url = "https://app-sandbox.arztkonsultation.de/video/24761?tan=FtUD-tu5e-ZsBA"
IFrame(url, width=800, height=600)


## TAN user with waiting area

In [None]:
from IPython.display import IFrame

url = "https://app-sandbox.arztkonsultation.de/video/login?tan=FtUD-tu5e-ZsBA"
IFrame(url, width=800, height=600)

# Documents

## List documents

This script retrieves and lists documents belonging to an authenticated user via the /api/v2/documents API. The user must authenticate with their own credentials, as only document owners have access. Admins or other users cannot view these documents. It displays key details such as document ID, filename, creation date, and size.

In [3]:
import requests

# API endpoint
API_ROOT = "https://app-sandbox.arztkonsultation.de/api/" #Change the hostname

# Authentication credentials
CREDENTIALS = {
  "email": "user@example.com", #add the users email adress
  "password": "password" # add the users password
}

def login():
    """
    Authenticates with the API using credentials and retrieves an access token.
    """
    response = requests.post(API_ROOT + "auth", json=CREDENTIALS)
    response.raise_for_status()
    return response.json()["access_token"]

def list_documents(token):
    """
    Fetches and lists all documents from the API.
    """
    response = requests.get(
        f"{API_ROOT}/v2/documents",
        headers={"Authorization": f"Bearer {token}"}
    )
    response.raise_for_status()
    api_response = response.json()

    # Extract the document data
    documents = api_response.get("data", [])
    if not documents:
        print("No documents found.")
        return

    # Print a readable list of documents
    print("Available Documents:")
    for doc in documents:
        doc_id = doc.get("id", "Unknown ID")
        filename = doc.get("original_filename", "No Filename")
        created_at = doc.get("created_at", "Unknown Date")
        encrypted_size = doc.get("encrypted_filesize", "Unknown Size")
        print(f"- ID: {doc_id}, Filename: {filename}, Created At: {created_at}, Encrypted Size: {encrypted_size} bytes")

def main():
    """
    Main function to authenticate and list documents.
    """
    # Step 1: Login and retrieve access token
    token = login()
    print("Authentication successful.")

    # Step 2: List documents
    list_documents(token)

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print("FATAL:", e)

Authentication successful.
Available Documents:
- ID: 2428, Filename: Screenshot 2025-09-24 at 16.41.05.png, Created At: 1758787667, Encrypted Size: 300672 bytes
- ID: 2433, Filename: Screenshot 2025-09-17 at 10.48.10.png, Created At: 1758788051, Encrypted Size: 51500 bytes
- ID: 2438, Filename: Screenshot 2025-07-31 at 12.46.02.png, Created At: 1758788142, Encrypted Size: 187628 bytes


## Downlaod documents
This Python script is designed for securely downloading, decrypting, and saving encrypted documents retrieved via an API. The process involves authentication, fetching encrypted document metadata, decrypting symmetric encryption keys using a private RSA key, and finally decrypting the document in chunks using AES-CBC. The decrypted file is saved locally in Google Colab and automatically offered for download.

In [None]:
import requests
import base64
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import padding

# API endpoint
API_ROOT = "https://app-sandbox.arztkonsultation.de/api/v2/" #Change the hostname

# User and document identifiers
OWNER = {
    "for_id": 80100, #Please add the users id here
    "for_type": "user",
}

DOCUMENT = {
    "doc_id": 2088, # Please add the id of the document you want to download
}

# Authentication credentials
CREDENTIALS = {
    "email": "user@example.com", #add the users email adress
    "password": "password" # add the users password
}

# Password for decrypting the private RSA key
PASSWORD = "password" # add the users private key


def login():
    """
    Authenticates with the API using credentials and retrieves an access token.
    """
    response = requests.post(API_ROOT + "auth", json=CREDENTIALS)
    response.raise_for_status()
    return response.json()["access_token"]


def clean_base64(data):
    """
    Cleans Base64 data by removing invalid characters and ensuring proper padding.
    """
    try:
        # Ensure the data is a string
        if isinstance(data, bytes):
            data = data.decode("utf-8")
        elif not isinstance(data, str):
            raise ValueError(f"Expected string or bytes for Base64 data, got {type(data)}: {data}")

        # Remove invalid characters and fix padding
        data = ''.join(c for c in data if c.isalnum() or c in ['+', '/', '='])
        missing_padding = len(data) % 4
        if missing_padding:
            data += '=' * (4 - missing_padding)
        return data
    except Exception as e:
        raise ValueError(f"Error cleaning Base64 data: {e}")


def decrypt(encrypted, coded_private_key, passphrase, encoded64=False):
    """
    Decrypts an encrypted Base64 string using an RSA private key.
    """
    if encoded64:
        encrypted = base64.b64decode(clean_base64(encrypted))
    private_key = serialization.load_pem_private_key(
        base64.b64decode(clean_base64(coded_private_key)),
        password=passphrase.encode(),
        backend=default_backend()
    )
    return private_key.decrypt(
        encrypted,
        padding.PKCS1v15()
    )


def decrypt_bytes(base64_key, base64_iv, encrypted_hex):
    """
    Decrypts encrypted bytes using AES-CBC mode with the given key and IV.
    """
    key = base64.b64decode(clean_base64(base64_key))
    iv = base64.b64decode(clean_base64(base64_iv))
    encrypted_bytes = base64.b64decode(clean_base64(encrypted_hex))

    # Validate IV size for AES-CBC
    if len(iv) != 16:
        raise ValueError(f"Invalid IV size ({len(iv)}). Expected 16 bytes for CBC.")

    cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
    decryptor = cipher.decryptor()

    try:
        decrypted = decryptor.update(encrypted_bytes) + decryptor.finalize()
    except Exception as e:
        raise ValueError(f"Error during decryption: {e}")
    return decrypted


def chunks_download(token, encrypted_filesize, encchunksize, decrypted_doc_key, decrypted_doc_iv):
    """
    Downloads the encrypted document in chunks and decrypts each chunk.
    """
    chunks_number = (encrypted_filesize + encchunksize - 1) // encchunksize
    file_content = b""

    for idx in range(chunks_number):
        range_from = idx * encchunksize
        range_to = range_from + encchunksize - 1

        headers = {
            "Authorization": f"Bearer {token}",
            "Range": f"bytes={range_from}-{range_to}"
        }
        response = requests.get(f"{API_ROOT}documents/{DOCUMENT['doc_id']}", headers=headers)
        response.raise_for_status()
        answer = response.json()

        if "filecontent" in answer:
            decrypted = decrypt_bytes(decrypted_doc_key, decrypted_doc_iv, answer["filecontent"])
            file_content += decrypted
        else:
            raise Exception("No content to decrypt")

    return file_content


def main():
    """
    Main function to execute the entire process of downloading and decrypting the document.
    """
    # Step 1: Login and retrieve an access token
    token = login()

    # Step 2: Fetch the private RSA key
    response = requests.post(
        f"{API_ROOT}keys/fetch",
        json=OWNER,
        headers={"Authorization": f"Bearer {token}"}
    )
    response.raise_for_status()
    key_pair = response.json()
    private_key = key_pair.get("privkey")

    # Step 3: Fetch document metadata
    response = requests.get(f"{API_ROOT}documents?per_page=10000", headers={"Authorization": f"Bearer {token}"})
    response.raise_for_status()
    all_docs = response.json()
    encrypted_filesize = next((d["encrypted_filesize"] for d in all_docs["data"] if d["id"] == DOCUMENT["doc_id"]), None)

    if not encrypted_filesize:
        raise ValueError(f"Document {DOCUMENT['doc_id']} not found.")

    response = requests.get(f"{API_ROOT}documents/{DOCUMENT['doc_id']}", headers={"Authorization": f"Bearer {token}"})
    response.raise_for_status()
    answer = response.json()

    encrypted_sym_key = answer["symkey"]
    encrypted_sym_iv = answer["symiv"]
    encchunksize = answer["encchunksize"]
    filename = answer["filename"]

    print(f"encchunksize: {encchunksize}, encrypted_filesize: {encrypted_filesize}")

    # Step 4: Decrypt symmetric keys
    decrypted_sym_key = decrypt(encrypted_sym_key, private_key, PASSWORD, True)
    decrypted_sym_iv = decrypt(encrypted_sym_iv, private_key, PASSWORD, True)

    # Step 5: Download and decrypt file content
    decrypted_file_content = chunks_download(token, encrypted_filesize, encchunksize, decrypted_sym_key, decrypted_sym_iv)

    # Step 6: Save the decrypted file
    file_path = f"/content/{filename}"
    with open(file_path, "wb") as f:
        f.write(decrypted_file_content)
    print(f"File saved as {file_path}")

    # Step 7: Offer the file for download
    from google.colab import files
    files.download(file_path)


if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print("FATAL:", e)

encchunksize: 226796, encrypted_filesize: 226796
Decoded IV Length: 32 (Expected: 16)
IV (raw): b'N7\x88\xc6\x9bDz%\x83\xe4F\xa7`\x0ch\xf9\xdcz\x1b\xb2a\t\xf9\xad\xabPO1\x91\x91{H'
File saved as /content/Screenshot 2024-12-10 at 14.56.44.png


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Import

## Import Users via CSV

This script reads user data from a CSV file stored on your Google Drive and creates users in your system via an API. It processes each row in the CSV and calls the API to create a user with the provided details.


In [None]:
# Mount Google Drive (for Google Colab)
from google.colab import drive
drive.mount('/content/drive')

import pandas as pd
import requests


# Function to create a user via the API
def create_user(token, name, email, pw, company_id, role):
    user_data = {
        "name": name,
        "email": email,
        "password": pw,
        "company_id": company_id,
        "roles": [role],
    }
    user = fetch('POST', '/api/v2/users', user_data, token=token)
    return user['data']

# Define the path to your CSV file on Google Drive
csv_file_path = '/content/drive/My Drive/importuser.csv'

# Read the CSV file (it must be comma-separated with columns: name, email, password, company_id, role)
df = pd.read_csv(csv_file_path)

# Loop through each row in the DataFrame and create users via the API
for index, row in df.iterrows():
    name = row['name']
    email = row['email']
    pw = row['password']
    company_id = row['company_id']
    role = row['role']

    created_user = create_user(token, name, email, pw, company_id, role)
    print(f"User created: {created_user}")