# Aconex Mails Extraction

### Summary
This notebook uses Aconex API for 2 purposes:
1. Extract list of mails exchanged since last refresh date. This uses an API call that returns list of mails (paged) in a response with basic details about each mail.
2. Extract detailed information for each mail like custom fields and field like `DocumentID`, which is used to build a mapping table to connect to the Aconex Document data. This uses API calls for each mail that returns metadata of a mail with all the fields related to a mail.

### Input
> _No external input required._

### API
- API Vendor: Oracle Aconex
- Authentication: Basic (API key and credentials)

### The Execution Flow
1. Get last refresh date from lastRefreshDate delta table and use it to find out the number of pages using a basic GET API call with date filter parameter.
2. Loop through the range of total pages.
    - Perform GET call using the current page number.
3. Loop through the list of mails in current page, within the main loop.
    - Perform GET call using the `MailID` for each mail to get metadata and additional fields.
4. Store the required fields of each mail in a dataframe of mails and record the required fields for mapping in a dataframe of mapping table.
5. After completing the first loop, convert the 2 dataframes into spark dataframes.
6. Append the spark dataframes to respective delta tables. Replace the last refresh date with current date in the last refresh date delta table.

### Output
3 Delta tables:
- `dev.dept.aconex_mails`
- `dev.dept.aconex_map`
- `dev.dept.aconex_mails_lastrefreshdate`

### Code Setup

In [None]:
!pip install xmltodict
!pip install tqdm

In [None]:
import pandas as pd
import requests
from requests.auth import HTTPBasicAuth
from cryptography.fernet import Fernet
import xmltodict
import xml.etree.ElementTree as ET
from tqdm.notebook import trange, tqdm
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

### Global Variables

In [None]:
# Credentials
KEY = "KEY"
TOKEN = "TOKEN"
API_KEY = "API KEY"

# Delta tables
mails_lastRefreshDate_catalog = "dev.dept.aconex_mails_lastrefreshdate"
mails_table_catalog = "dev.dept.aconex_mails"
map_table_catalog = "dev.dept.aconex_map"

mails_delta_df = spark.read.table(mails_table_catalog).toPandas()

# Required dates
lastRefreshDate_df = spark.read.table(mails_lastRefreshDate_catalog).toPandas()
lastRefreshDate = lastRefreshDate_df.iloc[0,0]
todayDate = datetime.today().strftime("%Y%m%d")

# Delta table schemas
MAIL_SCHEMA = StructType([
    StructField("Column1", StringType(), True),
    StructField("Column2", StringType(), True),
    StructField("Column3", StringType(), True),
    StructField("Column4", StringType(), True),
    StructField("Column5", StringType(), True),
    StructField("Column6", StringType(), True),
    StructField("Column7", StringType(), True),
    StructField("Column8", StringType(), True),
    StructField("Column9", StringType(), True),
    StructField("Column10", StringType(), True),
    StructField("Column11", StringType(), True),
    StructField("Column12", StringType(), True),
    StructField("Column13", StringType(), True),
    StructField("Column14", StringType(), True),
    StructField("Column15", StringType(), True),
    StructField("Column16", StringType(), True),
    StructField("Column17", StringType(), True),
    StructField("Column18", StringType(), True),
    StructField("Column19", StringType(), True),
    StructField("Column20", StringType(), True)
])

MAP_SCHEMA = StructType([
    StructField("Column1", StringType(), True),
    StructField("Column2", StringType(), True),
    StructField("Column3", StringType(), True),
    StructField("Column4", StringType(), True),
])

### Function Definitions
____

In [None]:
def get_mail_response(mail_box, page_number = 1, max_page_size = 500, last_refresh_date=lastRefreshDate, today_date=todayDate):
    # URL
    mails_url = "https://ca1.aconex.com/api/projects/<project id>/mail"

    # Headers
    headers = {
        "X-Application-Key": API_KEY
    }

    # Authorization
    auth = HTTPBasicAuth("username", Fernet(KEY).decrypt(TOKEN).decode())

    # Parameters
    params = {
        "mail_box": mail_box,
        "return_fields": "identifier1,identifier2,identifier3,identifier4,identifier5,identifier6,identifier7,identifier8,identifier9,identifier10,identifier11",
        "search_type": "PAGED",
        "page_size": max_page_size,
        "page_number": page_number,
        "sort_field": "sentdate",
        "search_query": f"sentdate:[{last_refresh_date}000000 TO {today_date}000000]"
    }

    # Get and parse api response
    response_raw = requests.get(mails_url, headers=headers, auth=auth, params=params)
    while response_raw.status_code != 200:
        print("\t- Retrying | Status Code:", response_raw.status_code)
        response_raw = requests.get(mails_url, headers=headers, auth=auth, params=params)
    #print("\t- Response received | Status Code:", response_raw.status_code)

    response_parsed = xmltodict.parse(response_raw.text.replace("\x02"," "), encoding='utf-8')
    #print("\t- Response parsed to dict")

    return response_parsed, response_raw.status_code


def get_mail_details(mail_id:str):
    mail_url = f"https://ca1.aconex.com/api/projects/1879053393/mail/{mail_id}"
    headers = {"X-Application-Key": "API KEY"}
    auth = HTTPBasicAuth("mpatel", Fernet(KEY).decrypt(TOKEN).decode())

    mail_response = requests.get(mail_url, headers=headers, auth=auth)
    while mail_response.status_code != 200:
        mail_response = requests.get(mail_url, headers=headers, auth=auth)

    mail_response = xmltodict.parse(mail_response.text.replace("\x02"," "), encoding='utf-8')

    return mail_response


def extract_default_fields(mail_dict):
    default_field_values = [mail_dict['Column1'],mail_dict['Column2'],mail_dict['Column3'],mail_dict['Column4'],mail_dict['Column5'],
                            mail_dict['Column6']['Column6.1'] if 'Column6' in list(mail_dict) else "",
                            mail_dict['Column7']['Column7.1'] if 'Column7' in list(mail_dict) else "",
                            mail_dict['Column8']['Column8.1'] if 'Column8' in list(mail_dict) else "",
                            mail_dict['Column9'],mail_dict['Column10']]
    return default_field_values


def extract_custom_fields(custom_fields_dict):
    if type(custom_fields_dict) == dict:
        custom_fields_dict = [custom_fields_dict]
    cf_labels = [c['Label'] for c in custom_fields_dict]
    cf = {item['Label']: item['Value'] for item in custom_fields_dict}

    custom_field_values = [cf['Column11'] if 'Column11' in cf_labels else "",
                        cf['Column12'] if 'Column12' in cf_labels else "",
                        cf['Column13'] if 'Column13' in cf_labels else "",
                        cf['Column14'] if 'Column14' in cf_labels else "",
                        cf['Column15'] if 'Column15' in cf_labels else "",
                        cf['Column16'] if 'Column16' in cf_labels else "",
                        cf['Column17'] if 'Column17' in cf_labels else ""]
    return custom_field_values


def get_attachment_details(attachment_count,attachment_details,cols,mail_id, mail_no):
    attachment_data_list = []

    if int(attachment_count) == 1:                      # One Attachment
        if "DocumentId" in list(attachment_details):
            attachment_data_list.append([mail_id, mail_no, attachment_details['DocumentId'], attachment_details['DocumentNo']])
        elif "RegisteredAs" in list(attachment_details):
            attachment_data_list.append([mail_id, mail_no, attachment_details['RegisteredAs'], attachment_details['DocumentNo']])

    elif int(attachment_count) > 1:                     # Multiple Attachments
        for j in attachment_details:
            if "RegisteredAs" in list(j):
                attachment_data_list.append([mail_id, mail_no, j['RegisteredAs'], j['DocumentNo']])
            elif "DocumentId" in list(j):
                attachment_data_list.append([mail_id, mail_no, j['DocumentId'], j['DocumentNo']])

    return pd.DataFrame(attachment_data_list,columns=cols)


def extract_recipients(rcpt_value):
    if type(rcpt_value) == list:
        return ', '.join([f"{r['Name']} - {r['OrganizationName']}" for r in rcpt_value])
    elif type(rcpt_value) == dict:
        return f"{rcpt_value['Name']} - {rcpt_value['OrganizationName']}"


def extract_senttomx(rcpt_value):
    if type(rcpt_value) == list:
        return '1879056043' in [r['OrganizationId'] for r in rcpt_value]
    elif type(rcpt_value) == dict:
        return '1879056043' == rcpt_value['OrganizationId']


def sparkify_df(df,schema=None):
    return spark.createDataFrame(df,schema=schema)


def push_data(spark_df,mode,delta_table_name,mergeSchema="true"):
    try:
        spark_df.write.option("delta.columnMapping.mode", "name").option("mergeSchema", mergeSchema).mode(mode).saveAsTable(delta_table_name)
        return True
    except Exception as e:
        print(f"Data push failed:\n\t{e}")


def check_duplicate_mails(df):
    if len(df['MailID']) == len(set(df['MailID'])):
        print(f"No duplicates. {len(df['MailID'])} records.")
    else:
        print(f"Duplicates found.")
        print(f"Total: {len(df['MailID'])}")
        print(f"Distinct: {len(set(df['MailID']))}")


def df_batcher(df, batch_size):
    batches = []
    total_batches = len(df) // batch_size + (1 if len(df) % batch_size != 0 else 0)
    for i in range(total_batches):
        start_index = i * batch_size
        end_index = start_index + batch_size
        batches.append(df[start_index:end_index])
    return batches


### Main
____

In [None]:
# Define dataframes to store data
map_cols = ['Column1','Column2','Column3','Column4']
mail_cols = ['Column1','Column2','Column3','Column4','Column5','Column6','Column7','Column8','Column9','Column10','Column11','Column12','Column13','Column14','Column15','Column16','Column17','Column18','Column19','Column20']

df_map = pd.DataFrame(columns=map_cols)
df_mail = pd.DataFrame(columns=mail_cols)

# List of mail boxes to get mails from
mail_boxes = ['inbox', 'sentbox']

In [None]:
# Iterate through the list of mail boxes
for mailbox in mail_boxes:
    response, status_code = get_mail_response(mailbox)
    total_pages = int(response['MailSearch']['@TotalPages'])
    print(f"Fetching from {mailbox} ({total_pages} pages):")

    # Define progress bar
    total_results = int(response['MailSearch']['@TotalResults'])
    progress = tqdm(total=total_results, desc=f"Mails from {mailbox}")

    # Iterate through the range of total number of pages
    for page_num in range(1,total_pages+1):
        print(f"- Page {page_num}:",end="\n")
        response, status_code = get_mail_response(mailbox, page_number=page_num)

        # Store list of mail details from the page
        page_content = response['MailSearch']['SearchResults']['Mail']

        # Iterate through the list of mail details dictionaries per mail
        for i in page_content:
            tmp_mail_id = i['@MailId']
            
            # Process Mail only if not in either existing mails' dataframe or existing delta table dataframe
            if tmp_mail_id in df_mail['MailID'].values or tmp_mail_id in mails_delta_df['MailID'].values:
                print('.', end=' ')
            
            else:

                try:
                    temp_mail, temp_cf = ([],['']*7)
                    mail_details = get_mail_details(tmp_mail_id)['Mail']

                    # Extract mail data and populate df_mail
                    ### List default field values
                    temp_mail = extract_default_fields(i)

                    ### List custom field values if present
                    if 'CustomFields' in list(mail_details):
                        temp_cf = extract_custom_fields(mail_details['CustomFields']['CustomField'])

                    ### Extract additional values related to recipients
                    senttomx = [str(extract_senttomx(mail_details['ToUsers']['Recipient']))]
                    recipients = [extract_recipients(mail_details['ToUsers']['Recipient'])]

                    # Append full mails data row to mails df
                    df_mail.loc[len(df_mail)] = temp_mail + temp_cf + senttomx + recipients + [mailbox]

                    # Extract mapping data and populate df_map
                    if i['AttachedDocumentCount'] != "0":
                        attachment_data = mail_details['Attachments']['RegisteredDocumentAttachment']
                        temp_df_map = get_attachment_details(i['AttachedDocumentCount'], attachment_data, map_cols, tmp_mail_id, i['MailNo'])
                        df_map = pd.concat([df_map, temp_df_map], ignore_index=True)
                    
                    print('|', end=' ')

                except Exception as e:
                    print(f"Exception occurred while processing - {tmp_mail_id}")
                    print(e, end="\n\n")
            
            progress.update(1)
        
        print(" X\n")

print("- - - EXECUTION COMPLETE - - -")

In [None]:
check_duplicate_mails(df_mail)

In [None]:
# Remove duplicate rows
df_mail.drop_duplicates(inplace=True)
df_map.drop_duplicates(inplace=True)

# Save current date as last refresh date
lastRefreshDate_df.iloc[0,0] = todayDate

In [None]:
# Update the delta tables with new data
WriteMode = "append"

push_data(
    sparkify_df(df_mail, schema=MAIL_SCHEMA),
    WriteMode,
    mails_table_catalog,
    mergeSchema="true"
)

push_data(
    sparkify_df(df_map, schema=MAP_SCHEMA),
    WriteMode,
    map_table_catalog,
    mergeSchema="true"
)

push_data(
    sparkify_df(lastRefreshDate_df),
    "overwrite",
    mails_lastRefreshDate_catalog,
    mergeSchema="true"
)

### Testing

In [None]:

# Temporary overwrite
'''
spark_map_df.write.option("delta.columnMapping.mode", "name").mode("overwrite").saveAsTable(map_table_catalog)
spark_mail_df.write.option("delta.columnMapping.mode", "name").option("mergeSchema", "true").mode("overwrite").saveAsTable(mails_table_catalog)
'''

# Temporary data save in batches
'''
df_batches = df_batcher(df_mail, 5000)
count = 0
sent_mails_table = 'dev.dept.aconex_sentmails'
for t in trange(len(df_batches)):
    spark_df = spark.createDataFrame(df_batches[t], schema=MAIL_SCHEMA)
    if count == 0:
        # Overwrite data for the first batch
        spark_df.write.format("delta").option("delta.columnMapping.mode", "name").option("mergeSchema", "true").mode("overwrite").saveAsTable(sent_mails_table)
    else:
        # Append data for the rest of the batches
        spark_df.write.format("delta").option("delta.columnMapping.mode", "name").option("mergeSchema", "true").mode("append").saveAsTable(sent_mails_table)
    print(count)
    count += 1
'''

# Temporary overwrite sent mails
'''
spark_df = spark.createDataFrame(df_mail,schema=MAIL_SCHEMA)
spark_df.write.option("delta.columnMapping.mode", "name").option("mergeSchema", 'true').mode('overwrite').saveAsTable('dev.dept.aconex_sentmails')
'''

In [None]:
%sql
--SELECT YEAR(DATE(SentDate)) as _Year, count(MailID) as Total_Count FROM dev.dept.aconex_mails GROUP BY _Year;
--SELECT * from dev.dept.aconex_mails where YEAR(SentDate) = 2025 and MONTH(SentDate) = 5 sort by SentDate desc;
--SELECT count(MailID) as Total_Count, count(DISTINCT(MailID)) as Disctinct_Count FROM dev.dept.aconex_mails;
--SELECT MailBox, count(*) as totalCount FROM dev.dept.aconex_sentmails group by MailBox;

-- Enable column mapping
--ALTER TABLE dev.dept.aconex_mails SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name');
-- Rename the column
--ALTER TABLE dev.dept.aconex_mails RENAME COLUMN Reponse TO Response;