In [None]:
%cd ..

In [None]:
import json
import os
import shutil
import time
from datetime import date, datetime

import nest_asyncio
import pandas as pd
import pytz
from tqdm import tqdm

from src.api_utils import datetime_to_unix, fetch_all_pages, make_request
from src.conversation import Conversation
from src.pii_utility import authenticate_pii_client, redact_pii_with_batches
from src.settings import (
    AIRCALL_API_BASE64_CREDENTIALS,
    AIRCALL_API_RATELIMIT,
    AIRCALL_API_URL,
    AIRCALL_NUMBERS_CS,
    AIRCALL_NUMBERS_PS,
    PATH_AIRCALL_CALLS,
    PATH_AIRCALL_DATA,
    PATH_AIRCALL_PROCESSED,
    RANDOM_STATE,
)
from src.utils import day_month_list

# 1 - DATA RETRIEVAL - Aircall

This notebook serves as the first step in our data analysis pipeline by retrieving call data from Aircall’s API. The dataset includes information on calls, associated phone numbers, and Conversation Intelligence (CI) features such as transcripts, summaries, key topics, and sentiment analysis. By structuring and storing this data efficiently, we create a solid foundation for further exploration and modeling.

#### Objectives

- Authenticate and connect to the Aircall API
- Download phonenumbers, call metadata, and CI features
- Store the retrieved data in a structured format for further analysis

#### [Aircall API](https://developer.aircall.io/api-references)
- *GET /v1/numbers*: Fetch all Numbers associated to a company and their information
- *GET /v1/calls/search*: Search for specific Calls depending on several Query Params like `user_id`, `phone_number` or `tags`. Given a call transferred between `A` and `B` phone numbers, the call will not appear when filtering by `A` but it will for `B`.

## User Input
Please specify the business you are interested in here. Options are:
- CS: Customer Service
  - +32 2 586 3507 → CS SAE DE (M)
- PS: Pharma Service
  - +31 85 888 1504 → PS PB EAV IVR Kosmetik (M)
  - +31 85 888 1469 → PS PB EAV IVR NEM (M)
  - +31 85 888 1524 → PS PB EAV IVR OTC & Rest (M)
  - +31 85 888 1515 → PS PB EAV IVR RX (M)
  - +31 85 888 1579 → PS PB SAE DE IVR Kosmetik (M)
  - +31 85 888 1529 → PS PB SAE DE IVR NEM (M)
  - +31 85 888 1610 → PS PB SAE DE IVR OTC & Rest (M)
  - +31 85 888 1604 → PS PB SAE DE IVR RX (M)
- Other: In case you're interested in other phone numbers. *Note: You will have to specify a valid Number in the cell below when you select this option.*

In [None]:
### Specify details here ###
BUSINESS = 'OTHER'
CI_FEATURES = ["sentiments", "summary", "topics", "transcription"]
SAMPLE_SIZE = 2 # Number of calls to download Conversational Intelligence features for

START = date(2025, 3, 8)
END = date(2025, 3, 9)
#############################

### Set values based on business ###
if BUSINESS == 'CS':
    NUMBERS = AIRCALL_NUMBERS_CS
elif BUSINESS == 'PS':
    NUMBERS = AIRCALL_NUMBERS_PS
elif BUSINESS == 'OTHER':
    NUMBERS = ['+31 85 888 1579', '+31 85 888 1529']
else:
    raise ValueError('Invalid business')
####################################

### Create folder(s) ###
os.makedirs(PATH_AIRCALL_CALLS, exist_ok=True)
os.makedirs(PATH_AIRCALL_PROCESSED, exist_ok=True)
for CI_FEATURE in CI_FEATURES:
    os.makedirs(f'{PATH_AIRCALL_DATA}/{CI_FEATURE}', exist_ok=True)
########################

print(f'Fetching calls for {BUSINESS} from {START} to {END}.\nThis includes the following Numbers:\n{NUMBERS}')

## Retrieve Calls

In [None]:
# Generate list of days and months in the specified range
day_months = day_month_list(START, END)

# List to store all calls
all_calls = []

# Per number, fetch all calls for each day in the specified range
for NUMBER in NUMBERS:
    for day, month in day_months:
        calls = fetch_all_pages(
            url=f"{AIRCALL_API_URL}/calls/search",
            headers={"Authorization": f"Basic {AIRCALL_API_BASE64_CREDENTIALS}"},
            params={
                "from": datetime_to_unix(datetime(2025, month, day, 0, 0, 0, tzinfo=pytz.timezone("Europe/Berlin"))),
                "to": datetime_to_unix(datetime(2025, month, day, 23, 59, 59, tzinfo=pytz.timezone("Europe/Berlin"))),
                "direction": "inbound",
                "phone_number": NUMBER,
                },
            key="calls",
            page_param="page",
            rate_limit=AIRCALL_API_RATELIMIT,
        )

        all_calls.extend(calls)

    calls_df = pd.DataFrame(all_calls)

    print(f"Retrieved {calls_df.shape[0]} calls for {NUMBER}.")

    # Continue if no calls were found
    if calls_df.empty:
        continue

    # Unpack relevant nested data
    calls_df["number_id"] = calls_df["number"].apply(lambda x: x["id"])
    calls_df["number_digits"] = calls_df["number"].apply(lambda x: x["digits"])	
    calls_df["number_name"] = calls_df["number"].apply(lambda x: x["name"])
    calls_df["number_country"] = calls_df["number"].apply(lambda x: x["country"])

    # Filter relevant columns
    calls_df = calls_df[[
        "id",
        "sid",
        "direction",
        "status",
        "missed_call_reason",
        "started_at",
        "answered_at",
        "ended_at",
        "duration",
        "recording",
        "number_id",
        "number_digits",
        "number_name",
        "number_country",
        # "transferred_by", # Omitted, since this is not a Number but an Agent
        # "transferred_to", # Omitted, since this is not a Number but an Agent
        "country_code_a2",
    ]]

    # Save to CSV
    calls_df.to_csv(f"{PATH_AIRCALL_CALLS}/{START.strftime('%Y%m%d')}_{END.strftime('%Y%m%d')}_calls_{NUMBER.replace('+', '').replace(' ', '')}.csv", index=False)

## Load Calls from File

In [None]:
calls_df = pd.DataFrame()

for file in os.listdir(PATH_AIRCALL_CALLS):
    if file.startswith(
        f"{START.strftime('%Y%m%d')}_{END.strftime('%Y%m%d')}"
    ) and file.endswith(
        tuple([f"{NUMBER.replace('+', '').replace(' ', '')}.csv" for NUMBER in NUMBERS])
    ):
        calls_df = pd.concat([calls_df, pd.read_csv(f"{PATH_AIRCALL_CALLS}/{file}")])

Quick check here how many Calls we retrieved and which Numbers they were registered at. We'll immediately check if they were recorded and whether they were all inbound as specified in the request.

In [None]:
calls_df.pivot_table(
    values=["id", "recording"],
    index=["number_name", "direction"],
    aggfunc={"id": "count", "recording": "count"},
    observed=False,
    margins=True,
    margins_name="Total",
)

Note here that sometimes different numbers show up or counts deviate from the counts in the individual csv files. This has got to do with the way Aircall stores transferred calls.

## Filter Calls

Depending on your selection of numbers and date range, we could end up with a large amount of calls here. We're likely not going to need the Conversation Intelligence (CI) data for all those calls at the same time. So let's make sure we only download it for a limited subset.

*Note*: The rate limit for the Aircall API is - at the time of writing - 60 requests/min. We'll need 4 API calls per Call, i.e. one for every CI feature per call.

We will get rid of all Calls without a recording, since they will not include the CI features we're interested in. We will then sample the remaining calls.

In [None]:
sample = calls_df.copy()[calls_df["recording"].notnull()]
sample = sample.sample(SAMPLE_SIZE, random_state=RANDOM_STATE)

Let's compare the distributions of Calls over the different Number between the full set and the sample.

In [None]:
calls_df.number_name.value_counts(normalize=True)

In [None]:
sample.number_name.value_counts(normalize=True)

## Retrieve Conversation Intelligence data

Here we will download the CI data for every Call. We will temporarily store the raw responses. These will be processed (incl. anonymization) immediately after. Raw responses will then be deleted and only the anonymized data is stored.

In [None]:
for id in tqdm(sample.id):
    # Get all CI features for each call
    for endpoint in CI_FEATURES:
        response = make_request(
            "GET",
            f"{AIRCALL_API_URL}/calls/{id}/{endpoint}",
            headers={"Authorization": f"Basic {AIRCALL_API_BASE64_CREDENTIALS}"},
            params=None,
            data=None,
            json=None,
        )

        # Temporarily storing the responses in JSON files
        with open(f"{PATH_AIRCALL_DATA}/{endpoint}/{id}.json", "w") as f:
            json.dump(response, f)

        # Added a sleep to avoid rate limiting
        time.sleep(0.8)

### Process CI data

The Conversation object takes a Call id, loads the CI data from JSONs and parses it. Here, we create more humanly readable text representations out of it.

In [None]:
sample["conversation"] = sample["id"].apply(lambda id: Conversation(id))
sample["transcription"] = sample["conversation"].apply(lambda cv: cv.transcription_str)
sample["summary"] = sample["conversation"].apply(lambda cv: cv.summary_str)
sample["sentiment"] = sample["conversation"].apply(lambda cv: cv.sentiments_str)
sample["topics"] = sample["conversation"].apply(lambda cv: cv.topics_str)

In [None]:
sample.head()

### Redact PII
Now we redact summaries, topics, and transcriptions. Sentiments will never include PII.

In [None]:
nest_asyncio.apply() # Required for running async functions in Jupyter notebooks

client = await authenticate_pii_client()
transcriptions_red = await redact_pii_with_batches(client, sample.transcription)

client = await authenticate_pii_client()
summaries_red = await redact_pii_with_batches(client, sample.summary)

client = await authenticate_pii_client()
topics_red = await redact_pii_with_batches(client, sample.topics)

Now we replace/remove the non-redacted data in the DataFrame.

In [None]:
sample["transcription"] = transcriptions_red
sample["summary"] = summaries_red
sample["topics"] = topics_red
sample.drop(columns=["conversation"], inplace=True)

Let's store the DataFrame in csv format.

In [None]:
sample.to_csv(
    f"{PATH_AIRCALL_PROCESSED}/{START.strftime('%Y%m%d')}_{END.strftime('%Y%m%d')}_{BUSINESS}.csv",
    index=False,
    encoding="latin-1",
)

Finally, we'll make sure the raw API responses - which are likely to include PII - are deleted.

In [None]:
for CI_FEATURE in CI_FEATURES:
    if os.path.exists(f"{PATH_AIRCALL_DATA}/{CI_FEATURE}"):
        shutil.rmtree(f"{PATH_AIRCALL_DATA}/{CI_FEATURE}")