In [1]:
import psycopg2
from psycopg2.extras import execute_values
import os
from dotenv import load_dotenv
from datetime import datetime
import pandas as pd
load_dotenv()

True

In [2]:
db_host = os.getenv("DB_HOST")
db_name = os.getenv("DB_NAME")
db_user = os.getenv("DB_USER")
db_pass = os.getenv("DB_PASSWORD")

connection = psycopg2.connect(
            host=db_host,
            database=db_name,
            user=db_user,
            password=db_pass
)
cursor = connection.cursor()

## Voluum Tests

In [3]:
data_df = pd.read_csv("../data/COMPL_GLOBAL_BL_CLK36M_TSLOG_MMD_04122025-V2_bot_HLRAJ_v3.4.csv")
data_df = data_df[['customVariable1', 'reason', 'source']]
data_df.loc[data_df['reason']=='Invalid_phone', 'reason'] = 'INVALID_PHONE'
data_df.head()

Unnamed: 0,customVariable1,reason,source
0,448000000000.0,INVALID_PHONE,AI_AGENT_OSversion
1,448000000000.0,INVALID_PHONE,AI_AGENT_OSversion
2,448000000000.0,INVALID_PHONE,AI_AGENT_OSversion
3,448000000000.0,INVALID_PHONE,AI_AGENT_OSversion
4,448000000000.0,INVALID_PHONE,AI_AGENT_OSversion


In [4]:
data_df['customVariable1'] = pd.to_numeric(data_df['customVariable1'], errors='coerce')
data_df = data_df.dropna(subset=['customVariable1'])
data_df = data_df[
    (data_df['customVariable1'] != '') & 
    (data_df['customVariable1'].notnull())
]

In [5]:
data_df[['customVariable1']].astype(int).to_csv('../data/cleaned_blacklist_ids.csv', index=False)

In [6]:
data_df['customVariable1'].astype(int)

0         447999998980
1         447999998262
2         447999998172
3         447999997175
4         447999996691
              ...     
349458      4530414419
349459      4524910837
349460      4523429009
349461      4524476611
349462      4521324046
Name: customVariable1, Length: 349463, dtype: int64

In [7]:
data = data_df.to_numpy()
data[:5]

array([[447999998980.0, 'INVALID_PHONE', 'AI_AGENT_OSversion'],
       [447999998262.0, 'INVALID_PHONE', 'AI_AGENT_OSversion'],
       [447999998172.0, 'INVALID_PHONE', 'AI_AGENT_OSversion'],
       [447999997175.0, 'INVALID_PHONE', 'AI_AGENT_OSversion'],
       [447999996691.0, 'INVALID_PHONE', 'AI_AGENT_OSversion']],
      dtype=object)

## Insert into ts source

In [8]:
insert_query = f"""
INSERT INTO public."api_voluum_ts_sources" ( custom_variable_1, timestamp_created, category, data_source
) VALUES %s
"""
rows = []
now = datetime.now()
for record in data:
    rows.append((
        int(record[0]), now, 'BLACKLIST', record[2]
    ))
if rows:
    execute_values(cursor, insert_query, rows, page_size=50000)
    connection.commit()
    print(f"Inserted final batch of {len(rows)} records into api_voluum_ts_sources.")

Inserted final batch of 349463 records into api_voluum_ts_sources.


## Insert into Blacklist

In [None]:
query = f"""
INSERT INTO public."blacklist" ( custom_variable_1, timestamp_created, source, reason)
VALUES %s
;
"""

now = datetime.now()
rows = []

for record in data:
    rows.append((
        int(record[0]), now, record[2], record[1]
    ))

# Flush remaining
if rows:
    execute_values(cursor, query, rows, page_size=50000)
    connection.commit()
    print(f"Inserted/Updated final batch of {len(rows)} records into Blacklist.")

Inserted/Updated final batch of 349463 records into Blacklist.


In [3]:
from pathlib import Path
from datetime import datetime

def export_table_to_csv_fast(table_name, output_dir="exports"):
    Path(output_dir).mkdir(parents=True, exist_ok=True)

    timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
    csv_path = Path(output_dir) / f"{table_name}_{timestamp}.csv"

    query = f"""
        COPY "{table_name}"
        TO STDOUT WITH CSV HEADER
    """

    with open(csv_path, "w", encoding="utf-8") as f:
        cursor.copy_expert(query, f)

    print(f"✅ Exported table using COPY to {csv_path}")


In [4]:
export_table_to_csv_fast("whitelist")

  timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")


✅ Exported table using COPY to exports/whitelist_20260115_124341.csv


In [14]:
query = f"""
SELECT COUNT(*)
FROM api_voluum_conversions avc 
WHERE avc.processed_at   >= NOW() - INTERVAL '24 hours';
"""
cursor = connection.cursor()
response = cursor.execute(query)

In [15]:
cursor.fetchone()

(205,)

## Conversions Test

In [3]:
from time import time
import json
from uuid import uuid4

country_codes = json.load(open('../utils/country_codes.json', 'r'))

def insert_conversions(data):
    insert_query = f"""
    INSERT INTO public."api_voluum_conversions" (uuid, click_id, postback_timestamp, processed, processed_at, error_message, retry_count, last_retry_at, generated_email, custom_variable_1, affiliate_network_id, affiliate_network_name, browser, browser_version,campaign_id, campaign_name, city, connection_type, conversion_type, conversion_type_id, cost, country_code, country_name,custom_variable_10, custom_variable_2, custom_variable_3,custom_variable_4, custom_variable_5, custom_variable_6,custom_variable_7, custom_variable_8, custom_variable_9,device, device_name, external_id, external_id_type, flow_id,ip, isp, lander_id, lander_name, language, offer_id, offer_name,os, os_version, path_id, profit, referrer, region, revenue, traffic_source_id, traffic_source_name, transaction_id, user_agent, visit_timestamp, source)
    VALUES %s
    ON CONFLICT (click_id, transaction_id, conversion_type)
    DO NOTHING
    """
    # 7, 39, false, now, null, 0, null, generated_email, 14, 0, 1, 2, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 40, 41, 42, 43, 44, 45, 46, 47, 48
    rows = []
    now = datetime.now()
    start_time = time()
    for record in data:
        uuid_new = str(uuid4())
        country_code = country_codes.get(record[5].split(' - ')[1], "DEF")
        conversion_type = "REG" if record[9] != "FTD" else "FTD"
        generated_email = '+' + record[14] + '@yourmobile.com' if record[14] and record[14].strip() and record[14].strip().lower() != '<na>' else None
        rows.append((
            uuid_new, record[7], record[39], False, now, None, 0, None, generated_email,
            record[14], record[0], record[1], record[2], record[3], record[4], 
            record[5], record[6], record[8], conversion_type, record[10], record[11], country_code,
            record[13], record[15], record[16], record[17], record[18], record[19],
            record[20], record[21], record[22], record[23], record[24], record[25],
            record[26], record[27], record[28], record[29], record[30], record[31],
            record[32], record[33], record[34], record[35], record[36], record[37],
            record[38], record[40], record[41], record[42], record[43], record[44],
            record[45], record[46], record[47], record[48], country_code + "_" + conversion_type
        ))

    print(f"Data prepared for insertion in {time() - start_time} seconds.")
    if rows:
        execute_values(cursor, insert_query, rows, page_size=10000)
        connection.commit()
    return True

In [4]:
import requests

def create_session_token():
    url = "https://api.voluum.com/auth/access/session"

    payload = {
        "accessId": os.getenv("VOLUUM_ACCESS_ID"),
        "accessKey": os.getenv("VOLUUM_ACCESS_KEY")
    }

    headers = {
        "Content-Type": "application/json; charset=utf-8",
        "Accept": "application/json"
    }

    response = requests.post(url, json=payload, headers=headers)

    return response.json()["token"]

def get_conversions_data(limit="500", from_date="2025-12-06T00:00:00.000Z", to_date="2025-12-13T00:00:00.000Z"):
    url = f"https://api.voluum.com/report/conversions?column=clickId&column=transactionId&column=visitTimestamp&column=postbackTimestamp&column=revenue&column=cost&column=profit&column=campaignId&column=campaignName&column=offerId&column=offerName&column=landerId&column=landerName&column=flowId&column=pathId&column=trafficSourceId&column=trafficSourceName&column=conversionType&column=conversionTypeId&column=referrer&column=affiliateNetworkId&column=affiliateNetworkName&column=countryCode&column=countryName&column=region&column=city&column=ip&column=isp&column=connectionType&column=deviceName&column=os&column=osVersion&column=browser&column=browserVersion&column=userAgent&column=language&column=status&column=customVariable1&column=customVariable2&column=customVariable3&column=customVariable4&column=customVariable5&column=customVariable6&column=customVariable7&column=customVariable8&column=customVariable9&column=customVariable10&column=externalId&column=externalIdType&from={from_date}&to={to_date}&limit={limit}&offset=OFFSET_VALUE&currency=EUR"
    access_token = create_session_token()

    headers = {
        "cwauth-token": access_token
    }
    total_data = []
    offset = 0

    while True:
        paginated_url = url.replace("OFFSET_VALUE", str(offset))
        response = requests.get(paginated_url, headers=headers)
        data = response.json()
        total_data.extend(data['rows'])
        print(f"Fetched {len(data['rows'])} rows at offset {offset} out of {data['totalRows']} total rows. Current total: {len(total_data)}")
        if len(total_data) == int(data["totalRows"]):
            break
        offset += int(limit)
    return total_data

def get_conversions_data_as_dataframe(limit="500", from_date="2025-12-06T00:00:00.000Z", to_date="2025-12-13T00:00:00.000Z"):
        data = get_conversions_data(limit, from_date, to_date)
        df = pd.DataFrame(data)
        return df

In [6]:
# data = get_conversions_data_as_dataframe(from_date="2025-10-01T00:00:00.000Z", to_date="2025-10-02T00:00:00.000Z")
data_big = get_conversions_data_as_dataframe(from_date="2025-10-31T00:00:00.000Z", to_date="2025-11-01T00:00:00.000Z")

Fetched 500 rows at offset 0 out of 1484 total rows. Current total: 500
Fetched 500 rows at offset 500 out of 1484 total rows. Current total: 1000
Fetched 484 rows at offset 1000 out of 1484 total rows. Current total: 1484


In [8]:
# df = pd.DataFrame(data)
# df = df.sort_values(by='postbackTimestamp')
df_big = pd.DataFrame(data_big)
df_big = df_big.sort_values(by='postbackTimestamp')
len(df_big)

1484

In [77]:
# df[["postbackTimestamp", "revenue", "region"]][df.postbackTimestamp >= '2025-10-01 04:20:11 AM'][:10].sort_values(by='postbackTimestamp')
# df[["postbackTimestamp", "revenue", "region"]][df.revenue.round(2) == 300][15:25].sort_values(by='postbackTimestamp')
df[["postbackTimestamp", "revenue", "region"]][df.clickId == "wepghap9uhm2r44djmt0lv2q"].sort_values(by='postbackTimestamp')

Unnamed: 0,postbackTimestamp,revenue,region
458,2025-10-01 04:17:21 AM,0.0,Liverpool
459,2025-10-01 04:20:43 AM,300.0,Liverpool


In [89]:
# df_big[["postbackTimestamp", "revenue", "region"]][df_big.postbackTimestamp >= '2025-10-01 04:20:11 AM'][:10].sort_values(by='postbackTimestamp')
df_big[["postbackTimestamp", "revenue", "region"]][(df_big.clickId == "wepghap9uhm2r44djmt0lv2q")].sort_values(by='postbackTimestamp')

Unnamed: 0,postbackTimestamp,revenue,region
4680,2025-10-01 04:17:21 AM,0.0,Liverpool
16598,2025-10-01 04:17:21 AM,0.0,Liverpool
4681,2025-10-01 04:20:43 AM,300.0,Liverpool
16599,2025-10-01 04:20:43 AM,300.0,Liverpool


In [22]:
df['revenue'].sum()

np.float64(390070.57364147005)

In [31]:
# Get rows with 2 or more duplicates for columns 'postbackTimestamp' and 'clickId'
df_duplicates = df[df.groupby(['postbackTimestamp', 'clickId'])['clickId'].transform('size') >= 2]

In [33]:
df_duplicates[['postbackTimestamp', 'clickId']]

Unnamed: 0,postbackTimestamp,clickId
897,2025-10-02 12:34:08 PM,wkblrq2fqf5i045d3q2hk7ci
898,2025-10-02 12:34:08 PM,wkblrq2fqf5i045d3q2hk7ci
1224,2025-10-04 01:40:19 PM,w7hopr2o68lctl6d3v12ta9c
1225,2025-10-04 01:40:19 PM,w7hopr2o68lctl6d3v12ta9c
1516,2025-10-01 11:07:49 PM,wgqkrpd8nkvuvn4d3n7crduq
1517,2025-10-01 11:07:49 PM,wgqkrpd8nkvuvn4d3n7crduq
1822,2025-10-04 04:14:14 PM,wc8irum83n4mun6djsl1nu6f
1823,2025-10-04 04:14:14 PM,wc8irum83n4mun6djsl1nu6f


## Day by Day

In [10]:
from datetime import datetime, timedelta

def get_conversions_data_day_by_day(
    start_date="2025-12-06",
    end_date="2025-12-13",
    limit=5000
):
    access_token = create_session_token()

    headers = {
        "cwauth-token": access_token
    }

    all_data = []

    start_dt = datetime.fromisoformat(start_date)
    end_dt = datetime.fromisoformat(end_date)

    current_day = start_dt
    calculations = {}
    while current_day <= end_dt:
        day_start = current_day.strftime("%Y-%m-%dT00:00:00.000Z")
        day_end = (current_day + timedelta(days=1)).strftime("%Y-%m-%dT00:00:00.000Z")

        print(f"\nFetching data for {current_day.date()}")

        base_url = (
            "https://api.voluum.com/report/conversions?"
            "column=clickId&column=transactionId&column=visitTimestamp&column=postbackTimestamp"
            "&column=revenue&column=cost&column=profit"
            "&column=campaignId&column=campaignName"
            "&column=offerId&column=offerName"
            "&column=landerId&column=landerName"
            "&column=flowId&column=pathId"
            "&column=trafficSourceId&column=trafficSourceName"
            "&column=conversionType&column=conversionTypeId"
            "&column=referrer"
            "&column=affiliateNetworkId&column=affiliateNetworkName"
            "&column=countryCode&column=countryName"
            "&column=region&column=city&column=ip"
            "&column=isp&column=connectionType"
            "&column=deviceName&column=os&column=osVersion"
            "&column=browser&column=browserVersion"
            "&column=userAgent&column=language"
            "&column=status"
            "&column=customVariable1&column=customVariable2&column=customVariable3"
            "&column=customVariable4&column=customVariable5"
            "&column=customVariable6&column=customVariable7"
            "&column=customVariable8&column=customVariable9&column=customVariable10"
            "&column=externalId&column=externalIdType"
            f"&from={day_start}&to={day_end}"
            f"&limit={limit}&offset={{offset}}&currency=EUR"
        )

        offset = 0
        day_rows = []

        while True:
            url = base_url.format(offset=offset)
            response = requests.get(url, headers=headers)
            response.raise_for_status()

            data = response.json()
            rows = data.get("rows", [])
            total_rows = data.get("totalRows", 0)

            day_rows.extend(rows)

            print(
                f"  Offset {offset}: fetched {len(rows)} rows "
                f"(day total {len(day_rows)}/{total_rows})"
            )

            if len(day_rows) >= total_rows:
                break

            offset += limit

        all_data.extend(day_rows)
        print(f"Completed fetching for {current_day.date()}: {len(day_rows)} rows.")
        sum_day = sum(float(row['revenue']) for row in day_rows)
        print(f"Total revenue for {current_day.date()}: {sum_day}")
        sum_till_now = sum(float(row['revenue']) for row in all_data)
        print(f"Cumulative revenue till {current_day.date()}: {sum_till_now}")
        calculations[current_day.date()] = {
            "revenue": sum_day,
            "revenue_so_far": sum_till_now
        }
        current_day += timedelta(days=1)

    return all_data, calculations

In [24]:
data_big, calculations = get_conversions_data_day_by_day(start_date="2026-02-01T00:00:00.000Z", end_date="2026-02-13T00:00:00.000Z")


Fetching data for 2026-02-01
  Offset 0: fetched 970 rows (day total 970/970)
Completed fetching for 2026-02-01: 970 rows.
Total revenue for 2026-02-01: 18584.62719933
Cumulative revenue till 2026-02-01: 18584.62719933

Fetching data for 2026-02-02
  Offset 0: fetched 3017 rows (day total 3017/3017)
Completed fetching for 2026-02-02: 3017 rows.
Total revenue for 2026-02-02: 43045.29636567
Cumulative revenue till 2026-02-02: 61629.923565

Fetching data for 2026-02-03
  Offset 0: fetched 2529 rows (day total 2529/2529)
Completed fetching for 2026-02-03: 2529 rows.
Total revenue for 2026-02-03: 37337.79590231
Cumulative revenue till 2026-02-03: 98967.71946731

Fetching data for 2026-02-04
  Offset 0: fetched 1631 rows (day total 1631/1631)
Completed fetching for 2026-02-04: 1631 rows.
Total revenue for 2026-02-04: 30068.99999204
Cumulative revenue till 2026-02-04: 129036.71945935

Fetching data for 2026-02-05
  Offset 0: fetched 1390 rows (day total 1390/1390)
Completed fetching for 2026

In [25]:
df = pd.DataFrame(data_big)
df.revenue.sum(), len(df)

(np.float64(305982.99738942), 18007)

In [26]:
insert_conversions(df.to_numpy())

Data prepared for insertion in 0.08286905288696289 seconds.


True