# Ingest Beeswax & Ipon Reports (API)

## Imports (& Load ENV variables)

In [None]:
import requests
import time
import tempfile
from os import environ as ENV
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from io import StringIO

load_dotenv()

## Initialise Spark Session

In [None]:
spark = (
    SparkSession.builder
    .appName("Reporting_Test")
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0")
    .getOrCreate()
)
print(spark.sparkContext.getConf().get("spark.jars.packages"))

## Beeswax API Authenticate

In [None]:
auth_url = ENV['BEESWAX_BASE_URL']+'/authenticate'
auth_payload = {'email': ENV['BEESWAX_EMAIL'], 'password': ENV['BEESWAX_PASSWORD']}
auth_headers = {'Content-Type': 'application/json'}

# authenticate request

response = requests.post(auth_url, json=auth_payload, headers=auth_headers)
response.raise_for_status()

print(response.json())

# store auth cookie in session

session = requests.Session()

auth_response = session.post(auth_url, json=auth_payload, headers=auth_headers)
auth_response.raise_for_status()

## Beeswax API Call (Report Dump)

In [None]:
def bw_api_get_report_fields(report_type):
    """retrieve all fields for given report"""
    report_url = ENV['BEESWAX_BASE_URL']+'/v2/reporting/reports/'+report_type

    report_response = session.get(report_url)
    report_response.raise_for_status()
    data = report_response.json()

    return [val['name'] for val in data['fields']]

## Beeswax API Call (Get Report)

In [None]:
def bw_api_get_report(report_type, field_list, filter_dict):
    """retrieve beeswax report"""
    report_url = ENV['BEESWAX_BASE_URL']+'/v2/reporting/run-query'
    report_payload={
        'view': report_type,
        'fields': field_list,
        'filters': filter_dict,
        'result_format': 'csv'
    }
    print('generate report')
    report_response = session.post(report_url, json=report_payload)
    report_response.raise_for_status()
    data = report_response.json()

    task_id = data['task_id']
    async_url = ENV['BEESWAX_BASE_URL']+'/v2/reporting/async-results/'+task_id

    start_time = time.time()
    max_wait_seconds = 60

    while True:
        result_response = session.get(async_url)
        if result_response.status_code == 200:
            print('report received')
            print('spark initialise')
            # write to temp file
            with tempfile.NamedTemporaryFile(mode="w+", suffix=".csv", delete=False) as temp:
                temp.write(result_response.text)
                temp_path = temp.name
            report_df = spark.read.option('header', True).option('inferSchema', True).csv(temp_path)
            #rdd = spark.sparkContext.parallelize(result_response.text.splitlines())
            #report_df = spark.read.option('header', True).option('inferSchema', True).csv(rdd)
            break
        elif result_response.status_code == 202:
            elapsed_time = time.time() - start_time
            if elapsed_time > max_wait_seconds:
                raise TimeoutError (f'Report not ready after {max_wait_seconds} seconds')
            time.sleep(5)
        else:
            result_response.raise_for_status()

    return report_df


## Get Performance and Bid Performance Report Data

In [None]:

performance_fields = bw_api_get_report_fields('performance_agg')
performance_report_df = bw_api_get_report('performance_agg', performance_fields, {'bid_day': '2025-09-26'})

bid_performance_fields = bw_api_get_report_fields('bid_performance_agg')
bid_performance_report_df = bw_api_get_report('bid_performance_agg', bid_performance_fields, {'bid_day': '2025-09-26'})

geography_fields = bw_api_get_report_fields('geo_agg')
geography_report_df = bw_api_get_report('geo_agg', geography_fields, {'bid_day': '2025-09-26'})


platform_fields = bw_api_get_report_fields('platform_agg')
platform_report_df = bw_api_get_report('platform_agg', platform_fields, {'bid_day': '2025-09-26'})


"""
bid_performance_report_df = bw_api_get_report(
    'bid_performance_agg',
    [
        "bid_day", 
        "advertiser_id", 
        "campaign_id", 
        "line_item_id",  
        "creative_id", 
        "bid",
        "win_rate"
    ],
    {
        'bid_day': '2025-09-26'
    }
)
"""


## Beeswax Write to DB Staging Table

In [None]:
def write_staging_table(report_df, table_name):
    """write report df to its staging table"""
    report_df.write.jdbc(
        url=f"jdbc:postgresql://{ENV['DB_HOST']}:{ENV['DB_PORT']}/{ENV['DB_NAME']}",
        table=f'{table_name}_rep_staging',
        mode='overwrite',
        properties={
             "user": ENV['DB_USER'],
            "password": ENV['DB_PASSWORD'],
            "driver": "org.postgresql.Driver"
        }
    )

In [None]:
write_staging_table(performance_report_df, 'performance')
write_staging_table(bid_performance_report_df, 'bid_performance')
write_staging_table(geography_report_df, 'geography')
write_staging_table(platform_report_df, 'platform')

## Ipon API Get Report Fields

In [None]:
def ipon_api_get_report_fields():
    """return all available report fields with descriptions"""
    get_fields_url = 'https://uslicer.iponweb.com/API/v2/info'
    get_fields_payload = {'slicer_name': ENV['IPON_SLICER'],'project_name': ENV['IPON_PROJECT'],'token': ENV['IPON_TOKEN']}
    get_fields_headers = {'Content-Type': 'application/json'}

    response = requests.post(get_fields_url, json=get_fields_payload, headers=get_fields_headers)
    response.raise_for_status()
    data = response.json()

    fields = {}

    # mapping dimensions & metrics w/ descriptions
    for val in data.get('key_fields', []):
        fields[val['name']] = val.get('description') or val['name']

    for val in data.get('data_fields', []):
        fields[val['name']] = val.get('description') or val['name']

    return fields


In [None]:
fields_with_desc = ipon_api_get_report_fields()
for api_name, description in list(fields_with_desc.items()):
    print(f"{api_name} -> {description}")

## Ipon Get Report Data

In [None]:
def ipon_api_get_report():
    """get api data"""
    get_report_url = 'https://uslicer.iponweb.com/API/v2/export'
    get_report_payload = {
        'slicer_name': ENV['IPON_SLICER'] , 
        'project_name': ENV['IPON_PROJECT'], 
        'token': ENV['IPON_TOKEN'],
        "start_date": "-1d",
        "end_date": "-1d",
        "split_by": [
            "granularity_day",
            "advertiser_id",
            "agency_id",
            "campaign_id",
            "line_item_id",
            "creative_id",
            "creative_size",
            "categories",
            "browser_name",
            "country",
            "city",
            "device_maker",
            "device_type",
            "os_name"
        ],
        "timezone": 0,
        "order_by": [
            {
                "name": "granularity_day",
                "direction": "ASC"
            }
        ],
        "include_mappings": 1,
        "add_keys": [
            "granularity_day"
        ],
        "data_fields": [
            "imps",
            "clicks",
            "custom_column_6659", # total spend
            "convs",
            "convs_pv",
            "convs_pc",
            "vast_start",
            "vast_firstQuartile",
            "vast_midpoint",
            "vast_thirdQuartile",
            "vast_complete",
            "bids",
            "custom_column_12211", # Win-Rate
            "custom_column_34227", # CPM
            "custom_column_6655", # CPC
            "custom_column_6657", # CPA
            "ctr",
        ],
        'export_format': 'csv'
    }
    get_report_headers = {'Content-Type': 'application/json'}

    response = requests.post(get_report_url, json=get_report_payload, headers=get_report_headers)
    response.raise_for_status()

    return response.text

In [None]:
ipon_report = ipon_api_get_report()
print(ipon_report)

## Ipon Write to Staging Table

In [None]:
spark = SparkSession.builder.getOrCreate()

with tempfile.NamedTemporaryFile(mode="w+", suffix=".csv", delete=False) as temp:
    temp.write(ipon_report)
    temp_path = temp.name

ipon_report_df = spark.read.option("header", True).option("inferSchema", True).csv(temp_path)

# write to db staging table
write_staging_table(ipon_report_df, "ipon")
