In [1]:
# Make sure that we reload custom-written modules each time to facilitate development
%reload_ext autoreload
%autoreload 2

In [2]:
from utils.meta_credentials import META_TEMP_TOKEN
import requests
from tqdm.notebook import tqdm
import pandas as pd
import time
import datetime
from loguru import logger
logger.add("logs/meta_ads_api_calls_{time}.log")

1

In [3]:
return_fields = [
    'page_id',
    'page_name',
    'bylines',
    'ad_creative_bodies',
    'ad_delivery_start_time',
    'ad_delivery_stop_time',
    'ad_snapshot_url',
    'demographic_distribution',
    'delivery_by_region',
    'estimated_audience_size',
    'publisher_platforms',
    'spend',
    'currency',
    'impressions'
]

In [4]:
#def get_json_response(url):
#    response = requests.request("GET", url)
#    response.json()['data']
#    
#    return response.json()['data']

def update_page_status(pages):
    pages += 1
    print(f'Pages collected: {pages}', end='\r')
    
    return pages

In [43]:
#def get_cursor(response, chunks, wait_on_rate_limit):
#    error = response.json().get('error')
#    
#    if error:
#        logger.debug(f'ERROR: {error} at page {len(chunks)}')
#        logger.debug(f'Sleeping for {wait_on_rate_limit/60} mins.')
#        time.sleep(wait_on_rate_limit)
#        
#        # Get last known non-error response and retrieve cursor
#        cursor = chunks[-1]['paging']['cursors']['after']
#
#    else:
#        # In case of no error, simply retrieve cursor
#        cursor = response.json()['paging']['cursors']['after']
#
#    return cursor

In [65]:
class ResponseError(Exception):
    pass

In [77]:
def request_ads_handle_errors(base_url, cursor, wait_on_rate_limit, max_retries = 3):
    
    # Construct URL for next page
    next_url = base_url + f"&after={cursor}"
    
    # Get contents of next page
    response = requests.request("GET", next_url)
    
    if 'data' in response.json().keys():
        output = response
    
    # If we hit the (unknown) rate limit, the API will
    # return a JSON containing an error message, which
    # we'll print and handle accordingly
    elif 'error' in response.json().keys():
        
        retries = 0
        success = False
        
        error_message = response.json()['error']['message']
        logger.debug(f'ERROR: {error_message} at {next_url}.\nSleeping for {wait_on_rate_limit/60} mins. Retries: {retries}')
        
        # We'll sleep and try again up to 2 times
        while not success and retries < max_retries:
            
            time.sleep(wait_on_rate_limit)
            
            # Attempt to get contents of same page again,
            # until we reach max_retries
            response = requests.request("GET", next_url)
            retries += 1
            logger.debug(f'Retries: {retries}')
            
            if 'data' in response.json().keys():
                success = True
                output = response
            
            elif 'error' in response.json().keys():
                continue
            
            else:
                logger.debug(f'No response. Retry from {next_url} next time.')
                ResponseError(f'Data not in response after {retries}. Quitting.')
    
    return output

In [79]:
wait_on_rate_limit = 3600
chunks = []
pages = 0
more_pages = False

base_url = (
    "https://graph.facebook.com/v15.0/ads_archive?"
    f"access_token={META_TEMP_TOKEN}&"
    #"search_page_ids=['39233495912']&"
    "search_terms=''&"
    "ad_type=POLITICAL_AND_ISSUE_ADS&"
    "ad_reached_countries=['DK']&"
    "ad_delivery_date_min=2022-01-01&"
    f"fields={return_fields}&"
    "limit=1000"
)

# First run to retrieve cursor
logger.info(f'Calling Meta API...')
response = requests.request("GET", base_url)
chunks.append(response.json())
pages = update_page_status(pages)

if 'paging' in chunks[0].keys():
    more_pages = True

# Keep looping as long as there are more pages
while more_pages:
    
    try:
        cursor = response.json()['paging']['cursors']['after']

        response = request_ads_handle_errors(
            base_url,
            cursor,
            wait_on_rate_limit,
            max_retries = 3
        )

        # The error-handled API call ensures that we
        # only append reponses with valid data fields
        chunks.append(response.json())
        pages = update_page_status(pages)
        
        # Sleep just in case -- multiple consecutive
        # requests empirically result in strange API
        # behaviour (undocumented burst limit)?
        time.sleep(10)
    
    # If unable to access cursor key, assume that the
    # last page has been collected and break out of
    # the loop, printing last data payload
    except KeyError:
        print(response.json()['data'])
        logger.info('Last page reached. All done!')
        more_pages = False

2022-10-22 13:10:44.881 | INFO     | __main__:<module>:19 - Calling Meta API...


Pages collected: 1

KeyboardInterrupt: 

In [50]:
chunks_unpacked = [chunk['data'] for chunk in chunks]

KeyError: 'data'

In [None]:
def json_responses2df(chunks: 'list'):
    df_list = [pd.DataFrame(chunk) for chunk in chunks]
    return pd.concat(df_list)

In [None]:
df = json_responses2df(chunks_unpacked)

In [None]:
df.tail()

In [None]:
df.to_csv('data/raw/meta/meta_ads_2022.csv')

In [None]:
#############

In [None]:
df = pd.read_csv('data/raw/meta/meta_ads_v1.csv').iloc[: , 1:]

In [None]:
df['bylines'].value_counts()[:20]

In [None]:
df['ad_delivery_start_time'].str.extract(r'-([0-9]+-[0-9]+)')

In [None]:
wait_on_rate_limit = 600
chunks = []
pages = 0
more_pages = True

base_url = (
    "https://graph.facebook.com/v15.0/ads_archive?"
    f"access_token={META_TEMP_TOKEN}&"
    #"search_page_ids=['39233495912']&"
    "search_terms=''&"
    "ad_type=POLITICAL_AND_ISSUE_ADS&"
    "ad_reached_countries=['DK']&"
    "ad_delivery_date_min=2018-05-07&"
    f"fields={return_fields}&"
    "limit=1000"
)

# First run to retrieve cursor
logger.info(f'Calling Meta API...')
response = requests.request("GET", base_url)
chunks.append(response.json())

pages = update_page_status(pages)

# Loop while there are more pages
while more_pages:
    try:
        # get_cursor handles rate limit errors
        cursor = get_cursor(response, chunks, wait_on_rate_limit)
        next_url = base_url + f"&after={cursor}"
        
        response = requests.request("GET", next_url)
        chunks.append(response.json())

        pages = update_page_status(pages)

    # Break out once last page has been collected
    except KeyError:
        logger.info('Last page reached. All done!')
        more_pages = False

In [None]:
test_list = [
    {
        'data': 'this contains data',
        'paging': {
            'cursors': {
                'after': 1,
                'next': 1
            }
        }
    },
     {
        'data': 'this contains data',
        'paging': {
            'cursors': {
                'after': 2,
                'next': 2
            }
        }
    },
    {
        'data': 'this contains data',
        'paging': {
            'cursors': {
                'after': 3,
                'next': 3
            }
        }
    },
    {
        'error': {
            'code': 1,
            'message': "Please reduce the amount of data you're asking for, then retry your request"
        }
    },
    {
        'data': 'this contains data',
        'paging': {
            'cursors': {
                'after': 4,
                'next': 4
            }
        }
    },
    {
        'data': 'this contains data',
        'paging': {
            'cursors': {
                'after': 5,
                'next': 5
            }
        }
    }
]

In [None]:
test_list

In [None]:
wait_on_rate_limit = 2
chunks = []
cursors = []
pages = 0
more_pages = True

response = test_list[0]

while more_pages:
    
    error = response.get('error')
    data = response.get('data')

    if error:
        logger.debug(f'ERROR: {error} at page {len(chunks)}')
        logger.debug(f'Sleeping for {wait_on_rate_limit/60} mins.')
        time.sleep(wait_on_rate_limit)
        
        del test_list[3]
        
        cursor = cursors[-1]
        response = test_list[cursor]

    elif data:
        chunks.append(data)
        pages = update_page_status(pages)
        time.sleep(1)
    
        try:
            cursor = response['paging']['cursors']['after']
            cursors.append(cursor)
            response = test_list[cursor]
        
        except:
            print('Done!')
            more_pages = False
    