In [10]:
!pip3 install pandas
!pip3 install openpyxl



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Collecting openpyxl
  Using cached openpyxl-3.1.5-py2.py3-none-any.whl.metadata (2.5 kB)
Collecting et-xmlfile (from openpyxl)
  Using cached et_xmlfile-2.0.0-py3-none-any.whl.metadata (2.7 kB)
Using cached openpyxl-3.1.5-py2.py3-none-any.whl (250 kB)
Using cached et_xmlfile-2.0.0-py3-none-any.whl (18 kB)
Installing collected packages: et-xmlfile, openpyxl
Successfully installed et-xmlfile-2.0.0 openpyxl-3.1.5

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [27]:
import json
import requests
import pandas as pd
from datetime import datetime
from typing import Optional, Dict, Any, List
from urllib.parse import quote

In [4]:
with open("credentials.cfg") as data_file:
    data = json.load(data_file)

authUrl = data["urls"]["authUrl"]
baseUrl = data["urls"]["baseUrl"]
client_id = data["credentials"]["client_id"]
client_secret = data["credentials"]["client_secret"]
username = data["credentials"]["username"]
password = data["credentials"]["password"]


In [5]:
def getAccessToken():
    """Returns access_token"""
    headers = {}
    headers['content-type'] = "application/x-www-form-urlencoded"
    headers['cache-control'] = "no-cache"
    
    url = authUrl + "oauth/token"
    payload = "client_id=" + client_id + '&'
    payload += "client_secret=" + client_secret + '&'
    payload += "grant_type=" + 'password&'
    payload += "username=" + username.replace('@','%40') + '&'
    payload += "password=" + password + '&'
    
    response = requests.request("POST", url, data=payload, headers=headers)
    
    if not response.ok:
        print("Access token: ")
        print(response.text)
        exit()
        
    access_token = response.json()['access_token']
    return access_token

In [6]:
def get_bookings(date_time: str = "2025-06-01T09:00:00") -> Dict[Any, Any]:
    """
    Retrieves bookings from Plutora API for a specific date/time.
    
    Args:
        date_time (str): The date and time to filter bookings (ISO format)
                        Default: "2025-06-01T09:00:00"
    
    Returns:
        Dict: JSON response from the API containing booking data
    """
    # Get access token
    token = getAccessToken()
    
    # Construct the filter query
    filter_query = f"`startDate` <= `{date_time}` and `endDate` >= `{date_time}`"
    
    # URL encode the filter parameter
    encoded_filter = quote(filter_query)
    
    # Construct the full URL
    url = f"https://ukapi.plutora.com/Bookings?filter={encoded_filter}"
    
    # Set up headers
    headers = {
        'accept': 'application/json',
        'Plutora-Info': 'script.name=swagger',
        'Authorization': f'bearer {token}'
    }
    
    # Make the GET request
    response = requests.get(url, headers=headers)
    
    # Check if request was successful
    if response.ok:
        return response.json()
    else:
        print(f"Error: {response.status_code}")
        print(response.text)
        response.raise_for_status()

In [17]:
def get_release_name(release_id: str, token: str) -> str:
    """
    Retrieves the release name for a given release ID.
    
    Args:
        release_id (str): The release ID to look up
        token (str): Authorization token
    
    Returns:
        str: The release name, or the original release_id if lookup fails
    """
    if not release_id or release_id == "00000000-0000-0000-0000-000000000000":
        return release_id
    
    # Construct the filter query for the specific release ID
    filter_query = f"`id` = `{release_id}`"
    
    # URL encode the filter parameter
    encoded_filter = quote(filter_query)
    
    # Construct the full URL
    url = f"https://ukapi.plutora.com/releases?filter={encoded_filter}"
    
    # Set up headers
    headers = {
        'accept': 'application/json',
        'Plutora-Info': 'script.name=swagger',
        'Authorization': f'bearer {token}'
    }
    
    try:
        # Make the GET request
        response = requests.get(url, headers=headers)
        
        if response.ok:
            releases = response.json()
            if releases and len(releases) > 0:
                return releases[0].get('name', release_id)
            else:
                print(f"No release found for ID: {release_id}")
                return release_id
        else:
            print(f"Error fetching release {release_id}: {response.status_code}")
            return release_id
            
    except Exception as e:
        print(f"Exception fetching release name for {release_id}: {e}")
        return release_id


In [18]:
def get_environment_name(environment_id: str, token: str) -> str:
    """
    Retrieves the environment name for a given environment ID.
    
    Args:
        environment_id (str): The environment ID to look up
        token (str): Authorization token
    
    Returns:
        str: The environment name, or the original environment_id if lookup fails
    """
    if not environment_id or environment_id == "00000000-0000-0000-0000-000000000000":
        return environment_id
    
    # Construct the filter query for the specific environment ID
    filter_query = f"`id` = `{environment_id}`"
    
    # URL encode the filter parameter
    encoded_filter = quote(filter_query)
    
    # Construct the full URL
    url = f"https://ukapi.plutora.com/environments?filter={encoded_filter}"
    
    # Set up headers
    headers = {
        'accept': 'application/json',
        'Plutora-Info': 'script.name=swagger',
        'Authorization': f'bearer {token}'
    }
    
    try:
        # Make the GET request
        response = requests.get(url, headers=headers)
        
        if response.ok:
            environments = response.json()
            if environments and len(environments) > 0:
                return environments[0].get('name', environment_id)
            else:
                print(f"No environment found for ID: {environment_id}")
                return environment_id
        else:
            print(f"Error fetching environment {environment_id}: {response.status_code}")
            return environment_id
            
    except Exception as e:
        print(f"Exception fetching environment name for {environment_id}: {e}")
        return environment_id

In [22]:
def enrich_bookings_with_names(bookings_data: Dict[Any, Any]) -> Dict[Any, Any]:
    """
    Enriches booking data by replacing releaseId and environmentId with their respective names.
    
    Args:
        bookings_data (Dict): The original bookings data
    
    Returns:
        Dict: Enhanced bookings data with release and environment names
    """
    result_set = bookings_data.get('resultSet', [])
    
    if not result_set:
        return bookings_data
    
    # Get access token for lookups
    token = getAccessToken()
    
    # Create caches to avoid duplicate API calls
    release_name_cache = {}
    environment_name_cache = {}
    
    print(f"Fetching release and environment names for {len(result_set)} bookings...")
    
    # Process each booking
    for i, booking in enumerate(result_set):
        # Handle release ID
        release_id = booking.get('releaseId')
        if release_id and release_id != "00000000-0000-0000-0000-000000000000":
            # Check cache first
            if release_id in release_name_cache:
                release_name = release_name_cache[release_id]
            else:
                # Fetch release name and cache it
                release_name = get_release_name(release_id, token)
                release_name_cache[release_id] = release_name
                
            # Replace releaseId with releaseName
            booking['releaseName'] = release_name
            # Optionally keep the original ID in a different field
            booking['originalReleaseId'] = release_id
            # Remove the original releaseId field
            del booking['releaseId']
        
        # Handle environment ID
        environment_id = booking.get('environmentId')
        if environment_id and environment_id != "00000000-0000-0000-0000-000000000000":
            # Check cache first
            if environment_id in environment_name_cache:
                environment_name = environment_name_cache[environment_id]
            else:
                # Fetch environment name and cache it
                environment_name = get_environment_name(environment_id, token)
                environment_name_cache[environment_id] = environment_name
                
            # Replace environmentId with environmentName
            booking['environmentName'] = environment_name
            # Optionally keep the original ID in a different field
            booking['originalEnvironmentId'] = environment_id
            # Remove the original environmentId field
            del booking['environmentId']
            
        print(f"Processed {i + 1}/{len(result_set)} bookings", end='\r')
    
    print(f"\nCompleted processing all {len(result_set)} bookings")
    print(f"Found {len(release_name_cache)} unique releases")
    print(f"Found {len(environment_name_cache)} unique environments")
    
    return bookings_data

In [19]:
def export_bookings_to_excel(bookings_data: Dict[Any, Any], filename: str = None, include_names: bool = True) -> str:
    """
    Exports the resultSet from bookings data to an Excel file.
    
    Args:
        bookings_data (Dict): The JSON response from get_bookings()
        filename (str): Optional custom filename. If None, generates timestamp-based name.
        include_names (bool): If True, fetches release and environment names instead of IDs
    
    Returns:
        str: The filename of the created Excel file
    """
    # Enrich with release and environment names if requested
    if include_names:
        bookings_data = enrich_bookings_with_names(bookings_data)
    
    # Extract the resultSet array
    result_set = bookings_data.get('resultSet', [])
    
    if not result_set:
        print("No booking data found in resultSet")
        return None
    
    # Create DataFrame from the resultSet
    df = pd.DataFrame(result_set)
    
    # Generate filename if not provided
    if filename is None:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"bookings_{timestamp}.xlsx"
    
    # Ensure filename has .xlsx extension
    if not filename.endswith('.xlsx'):
        filename += '.xlsx'
    
    # Export to Excel
    df.to_excel(filename, index=False, sheet_name='Bookings')
    
    print(f"Successfully exported {len(result_set)} booking records to {filename}")
    return filename


In [24]:
def get_and_export_bookings(date_time: str = "2025-09-01T09:00:00", filename: str = None, include_names: bool = True) -> str:
    """
    Convenience function that gets bookings and exports them to Excel in one call.
    
    Args:
        date_time (str): The date and time to filter bookings
        filename (str): Optional custom filename for the Excel file
        include_names (bool): If True, fetches release and environment names instead of IDs
    
    Returns:
        str: The filename of the created Excel file
    """
    try:
        # Get bookings data
        print("Fetching bookings data...")
        bookings = get_bookings(date_time)
        
        # Export to Excel
        excel_filename = export_bookings_to_excel(bookings, filename, include_names)
        
        return excel_filename
        
    except Exception as e:
        print(f"Error retrieving and exporting bookings: {e}")
        raise

In [26]:
excel_file = get_and_export_bookings("2025-07-15T09:00:00")

Fetching bookings data...
Fetching release and environment names for 10 bookings...
Processed 10/10 bookings
Completed processing all 10 bookings
Found 5 unique releases
Found 5 unique environments
Successfully exported 10 booking records to bookings_20250903_134239.xlsx
