In [5]:
import time
import asyncio
import aiohttp
import json
import pandas as pd
from datetime import datetime, timedelta
import nest_asyncio

# Apply nest_asyncio to allow nested event loops in Jupyter Notebook
nest_asyncio.apply()

MAX_RETRIES = 10
RETRY_DELAY = 60  # Initial delay between retries in seconds
MAX_RETRY_DELAY = 1000  # Maximum delay between retries

# Get today's date
end_date = datetime.now()
# Calculate the start date (30 days before today)
start_date = end_date - timedelta(days=30)

output_file = r'C:\Users\prave\1M_Active_Users_Summary_Data.xlsx'

# Define the mapping dictionary
mapping = {
    'india': 'India',
    'philippines': 'Philippines',
    'malaysia': 'Malaysia',
    'singapore': 'Singapore',
    'gulf': 'Middle East',
    'hongkong': 'Hongkong',
    'thailand': 'Thailand',
    'indonesia': 'Indonesia',
    'vietnam': 'Vietnam',
    'rexmonster': 'India',
}

# Define the required columns
required_columns = [
    'India', 'Philippines', 'Malaysia', 'Singapore', 
    'Middle East', 'Hongkong', 'Thailand', 
    'Indonesia', 'Vietnam',
]

async def fetch_event_data(session, start_date_integer, end_date_integer):
    url = "https://in1.api.clevertap.com/1/profiles.json?batch_size=5000&app=false&events=false"
    payload = json.dumps({
        "advanced_query": {
            "did_any": {
                "any_events": [
                    {
                        "event_name": event, 
                        "common_profile_properties": {
                            "reachability": [{"name": "has_email", "value": "True"}],
                            "user_properties": [{
                                "name": "channel_name", 
                                "operator": "contains", 
                                "value": ["india", "philippines", "malaysia", 
                                           "singapore", "gulf", "hongkong", 
                                           "thailand", "indonesia", "vietnam", 
                                           "rexmonster"]
                            }]
                        },
                        "from": start_date_integer,
                        "to": end_date_integer
                    } for event in [
                        "App Launched", "Notification Viewed", 
                        "Email Opened", "MM Email Opened", 
                        "PageViewed"
                    ]
                ]
            }
        }
    })
    
    headers = {
        'X-CleverTap-Account-Id': '**********',
        'X-CleverTap-Passcode': '*********',
        'Content-Type': 'application/json'
    }
    
    retry_count = 0
    retry_delay = RETRY_DELAY
    unique_users = set() 
    count_country = {}

    while retry_count < MAX_RETRIES:
        async with session.post(url, headers=headers, data=payload) as response:
            try:
                response_json = await response.json()
            except json.JSONDecodeError:
                print("Failed to decode JSON response. Retrying...")
                retry_count += 1
                await asyncio.sleep(retry_delay)
                retry_delay = min(retry_delay * 2, MAX_RETRY_DELAY)
                continue

            if response_json.get('status') == 'fail' and response_json.get('code') == 2:
                retry_count += 1
                print(f"Request still in progress, please retry later. Retrying... Attempt {retry_count}")
                await asyncio.sleep(retry_delay)
                retry_delay = min(retry_delay * 2, MAX_RETRY_DELAY)
                continue
            elif response_json.get('status') == 'fail':
                print(f"Failed request with error: {response_json.get('error')}")
                return {}, set()

            cursor = response_json.get('cursor')
            while cursor:
                next_url = f"https://in1.api.clevertap.com/1/profiles.json?cursor={cursor}"
                async with session.get(next_url, headers=headers) as response:
                    response_json = await response.json()
                    if response_json.get('status') == 'fail' and response_json.get('code') == 2:
                        retry_count += 1
                        print(f"Request still in progress, please retry later. Retrying... Attempt {retry_count}")
                        await asyncio.sleep(RETRY_DELAY * retry_count)
                        continue

                    cursor = response_json.get('next_cursor')
                    if 'records' in response_json:
                        for record in response_json['records']:
                            channel_name = record.get('profileData', {}).get('channel_name', None)
                            user_id = record.get('identity')
                            if user_id and user_id not in unique_users: 
                                unique_users.add(user_id)  
                                if channel_name is not None:
                                    if isinstance(channel_name, list):
                                        channel_name = ', '.join(channel_name)
                                    count_country[channel_name] = count_country.get(channel_name, 0) + 1

                if not cursor:
                    break

            if count_country:
                return count_country, unique_users
            else:
                retry_count += 1
                print(f"Retrying... Attempt {retry_count}")
                await asyncio.sleep(retry_delay)
                retry_delay = min(retry_delay * 2, MAX_RETRY_DELAY)

    print("Max retries reached. Unable to fetch data.")
    return {}, set()

async def main():
    total_unique_users = set()
    summary_df = pd.DataFrame(columns=['channel_name', 'count'])
    current_date = start_date
    async with aiohttp.ClientSession() as session:
        tasks = []

        while current_date < end_date:
            start_date_integer = int(current_date.strftime('%Y%m%d'))
            end_date_integer = int((current_date + timedelta(days=29)).strftime('%Y%m%d'))

            tasks.append(fetch_event_data(session, start_date_integer, end_date_integer))
            current_date += timedelta(days=30)

            if len(tasks) >= 3:  # Limit to 3 concurrent tasks to respect rate limit
                results = await asyncio.gather(*tasks)
                for event_data, unique_users in results:
                    if not event_data:
                        print(f"No data for events from {current_date} to {(current_date + timedelta(days=29))}")
                        continue

                    total_unique_users.update(unique_users)
                    df = pd.DataFrame(list(event_data.items()), columns=['channel_name', 'count'])
                    
                    new_rows = []
                    for idx, row in df.iterrows():
                        channel_names = row['channel_name'].split(', ')
                        for name in channel_names:
                            new_rows.append({'channel_name': name.strip().lower(), 'count': row['count']})
                    new_df = pd.DataFrame(new_rows)

                    new_df['channel_name_mapped'] = new_df['channel_name'].apply(lambda name: mapping.get(name, None))
                    df_filtered = new_df[new_df['channel_name_mapped'].notnull()]

                    result = df_filtered.groupby('channel_name_mapped')['count'].sum().reset_index()
                    result.columns = ['channel_name', 'count']

                    for column in required_columns:
                        if column not in result['channel_name'].values:
                            result = pd.concat([result, pd.DataFrame({'channel_name': [column], 'count': [0]})])

                    summary_df = pd.concat([summary_df, result])

                tasks = []  # Reset tasks

        # Process any remaining tasks
        if tasks:
            results = await asyncio.gather(*tasks)
            for event_data, unique_users in results:
                if not event_data:
                    continue

                total_unique_users.update(unique_users)
                df = pd.DataFrame(list(event_data.items()), columns=['channel_name', 'count'])

                new_rows = []
                for idx, row in df.iterrows():
                    channel_names = row['channel_name'].split(', ')
                    for name in channel_names:
                        new_rows.append({'channel_name': name.strip().lower(), 'count': row['count']})
                new_df = pd.DataFrame(new_rows)

                new_df['channel_name_mapped'] = new_df['channel_name'].apply(lambda name: mapping.get(name, None))
                df_filtered = new_df[new_df['channel_name_mapped'].notnull()]

                result = df_filtered.groupby('channel_name_mapped')['count'].sum().reset_index()
                result.columns = ['channel_name', 'count']

                for column in required_columns:
                    if column not in result['channel_name'].values:
                        result = pd.concat([result, pd.DataFrame({'channel_name': [column], 'count': [0]})])

                summary_df = pd.concat([summary_df, result])

    # Aggregate the 30 days summary data
    summary_df = summary_df.groupby('channel_name')['count'].sum().reset_index()
    summary_df['calculated_Date'] = pd.to_datetime(end_date, format='%Y%m%d').date()
    calculated_end_date = end_date - timedelta(days=1)
    summary_df['start_date_range'] = pd.to_datetime(start_date, format='%Y%m%d').date()
    summary_df['end_date_range'] = pd.to_datetime(calculated_end_date, format='%Y%m%d').date()
    summary_df['Metric Name'] = '30 - Days Unique Active Users'

    for column in required_columns: 
        if column not in summary_df['channel_name'].values:
            summary_df = pd.concat([summary_df, pd.DataFrame({'channel_name': [column], 'count': [0]})])

    summary_df.reset_index(drop=True, inplace=True)
    summary_df = summary_df[['calculated_Date', 'start_date_range', 'end_date_range', 'Metric Name', 'channel_name', 'count']]
    print(summary_df)

    # Save the summary to the Excel file
    with pd.ExcelWriter(output_file, engine='openpyxl', mode='a') as writer:
        summary_df.to_excel(writer, index=False, sheet_name='30 Days Summary', header=True)

    #print(f"Total unique users over the 30 days: {len(total_unique_users)}")

# Run the main function
await main()

Request still in progress, please retry later. Retrying... Attempt 1


  obj, end = self.scan_once(s, idx)


  calculated_Date                    Metric Name channel_name    count
0      2024-08-31  30 - Days Unique Active Users     Hongkong      518
1      2024-08-31  30 - Days Unique Active Users        India  9851358
2      2024-08-31  30 - Days Unique Active Users    Indonesia     1108
3      2024-08-31  30 - Days Unique Active Users     Malaysia   221759
4      2024-08-31  30 - Days Unique Active Users  Middle East   891561
5      2024-08-31  30 - Days Unique Active Users  Philippines   342997
6      2024-08-31  30 - Days Unique Active Users    Singapore   277140
7      2024-08-31  30 - Days Unique Active Users     Thailand      899
8      2024-08-31  30 - Days Unique Active Users      Vietnam      548
