# TikTok Ad Data Collection and Analysis

## 1. Mount Google Drive and Install Dependencies

In [None]:
# Install required packages
!pip install python-dotenv requests pandas openpyxl tqdm ipywidgets

In [None]:
import os
import requests
import json
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
import time
from tqdm.notebook import tqdm
from pathlib import Path
import ipywidgets as widgets
from IPython.display import display, clear_output

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Create project directories
import os
base_dir = '/content/drive/MyDrive/TikTok_Ad_Research'
for subdir in ['data', 'media/videos', 'media/images']:
    os.makedirs(os.path.join(base_dir, subdir), exist_ok=True)



In [None]:
# Create credentials widgets
client_key_widget = widgets.Text(
    description='Client Key:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='50%')
)

client_secret_widget = widgets.Text(
    description='Client Secret:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='50%')
)

# Set credentials
client_key_widget.value = ''
client_secret_widget.value = ''

display(client_key_widget, client_secret_widget)

## 4. TikTok API Client Setup

## 5. Data Collection

In [None]:
def get_access_token(client_key: str, client_secret: str) -> str:
    """Get access token from TikTok API"""
    url = "https://open.tiktokapis.com/v2/oauth/token/"
    headers = {'Content-Type': 'application/x-www-form-urlencoded'}
    data = {
        'client_key': client_key,
        'client_secret': client_secret,
        'grant_type': 'client_credentials'
    }

    response = requests.post(url, headers=headers, data=data)
    if response.status_code != 200:
        raise Exception(f"Failed to get access token: {response.text}")

    response_data = response.json()
    return response_data['access_token']

class TikTokResearchAPI:
    def __init__(self, access_token: str):
        self.access_token = access_token
        self.base_url = 'https://open.tiktokapis.com/v2/research/adlib/commercial_content/query/'
        self.headers = {
            'authorization': f'Bearer {access_token}',
            'Content-Type': 'application/json'
        }
        self.fields = [
            'id', 'create_timestamp', 'label', 'brand_names',
            'creator', 'videos'
        ]

    def fetch_ads_data(self, country_code: str, start_date: str, end_date: str,
                       keywords: List[str] = None, max_results: int = 1000) -> List[Dict]:
        """Fetch ads data with pagination and keyword filtering"""
        all_ads = []
        cursor = None
        search_id = None

        filters = {
            "content_published_date_range": {
                "min": start_date,
                "max": end_date
            },
            "creator_country_code": country_code
        }

        if keywords:
            filters["keywords"] = keywords

        with tqdm(total=max_results, desc=f"Fetching {country_code}") as pbar:
            while len(all_ads) < max_results:
                params = {
                    "filters": filters,
                    "max_count": min(50, max_results - len(all_ads))
                }
                if search_id:
                    params["search_id"] = search_id


                url = f"{self.base_url}?fields={','.join(self.fields)}"
                response = requests.post(url, headers=self.headers, json=params)

                if response.status_code != 200:
                    print(f"Error: {response.text}")
                    break

                data = response.json()

                if not data.get('data', {}).get('commercial_contents'):
                    break

                ads = data['data']['commercial_contents']
                has_more = data['data']['has_more']
                if has_more:
                  print(data['data']['search_id'])
                  search_id = data['data']['search_id']

                all_ads.extend(ads)

                pbar.update(len(ads))

                time.sleep(1)  # Rate limiting

        return all_ads

In [None]:
# All EU country codes
EU_COUNTRIES = [
    'AT', 'BE', 'BG', 'HR', 'CY', 'CZ', 'DK', 'EE', 'FI', 'FR',
    'DE', 'GR', 'HU', 'IE', 'IT', 'LV', 'LT', 'LU', 'MT', 'NL',
    'PL', 'PT', 'RO', 'SK', 'SI', 'ES', 'SE'
]

# Country selection with "Select All" option
country_widget = widgets.SelectMultiple(
    options=EU_COUNTRIES,
    value=['FR', 'DE', 'IT', 'ES'],
    description='Countries:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='30%', height='200px')
)

select_all_countries = widgets.Checkbox(
    value=False,
    description='Select All Countries',
    style={'description_width': 'initial'}
)

# Date range widgets (ensure end date is yesterday)
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
start_date_widget = widgets.DatePicker(
    description='Start Date:',
    value=datetime(2024, 1, 1),
    style={'description_width': 'initial'}
)

end_date_widget = widgets.DatePicker(
    description='End Date:',
    value=datetime.strptime(yesterday, '%Y-%m-%d'),
    style={'description_width': 'initial'}
)

# Additional filters
keyword_widget = widgets.Text(
    description='Keywords:',
    placeholder='Enter keywords (comma-separated)',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='50%')
)

max_results_widget = widgets.BoundedIntText(
    value=1000,
    min=1,
    max=10000,
    description='Max Results per Country:',
    style={'description_width': 'initial'}
)

download_media_widget = widgets.Checkbox(
    value=True,
    description='Download Media Files',
    style={'description_width': 'initial'}
)

# Handle "Select All" countries
def on_select_all_change(change):
    if change['new']:
        country_widget.value = EU_COUNTRIES
    else:
        country_widget.value = ['FR', 'DE', 'IT', 'ES']

select_all_countries.observe(on_select_all_change, names='value')

# Display widgets
display(select_all_countries, country_widget)
display(start_date_widget, end_date_widget)
display(keyword_widget, max_results_widget, download_media_widget)

## 6. Collect and Export Data

In [None]:
def process_ads_to_dataframe(ads: List[Dict]) -> pd.DataFrame:
    """Convert ads data to a pandas DataFrame"""
    processed_ads = []

    for ad in ads:
        ad_data = {
            'id': ad.get('id', ''),
            'create_timestamp': datetime.fromtimestamp(ad.get('create_timestamp', 0)),
            'label': ad.get('label', ''),
            'brand_names': ', '.join(ad.get('brand_names', [])),
            'creator_username': ad.get('creator', {}).get('username', ''),
        }

        if ad.get('videos'):
            video = ad['videos'][0]
            ad_data.update({
                'video_url': video.get('url', ''),
                'cover_image_url': video.get('cover_image_url', '')
            })

        processed_ads.append(ad_data)

    return pd.DataFrame(processed_ads)

class MediaDownloader:
    def __init__(self):
        self.session = requests.Session()

    def download_file(self, url: str, filepath: str, retries: int = 3) -> bool:
        """Download a file from URL with retry mechanism"""
        for attempt in range(retries):
            try:
                response = self.session.get(url, stream=True)
                response.raise_for_status()

                with open(filepath, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        if chunk:
                            f.write(chunk)
                return True

            except Exception as e:
                print(f"Attempt {attempt + 1}/{retries} failed for {url}: {str(e)}")
                if attempt == retries - 1:
                    print(f"Failed to download {url} after {retries} attempts")
                    return False
                time.sleep(1)

        return False

    def download_media(self, df: pd.DataFrame, country_code: str, base_dir: str):
        """Download videos and images for the ads"""
        country_video_dir = os.path.join(base_dir, 'media', 'videos', country_code)
        country_image_dir = os.path.join(base_dir, 'media', 'images', country_code)
        os.makedirs(country_video_dir, exist_ok=True)
        os.makedirs(country_image_dir, exist_ok=True)

        for _, row in tqdm(df.iterrows(), total=len(df), desc=f"Downloading media for {country_code}"):
            ad_id = row['id']

            # Download video
            if pd.notna(row['video_url']):
                video_path = os.path.join(country_video_dir, f"{ad_id}.mp4")
                if not os.path.exists(video_path):
                    self.download_file(row['video_url'], video_path)

            # Download cover image
            if pd.notna(row['cover_image_url']):
                image_path = os.path.join(country_image_dir, f"{ad_id}.jpg")
                if not os.path.exists(image_path):
                    self.download_file(row['cover_image_url'], image_path)

            time.sleep(0.5)

## 7. Download Media Files

In [None]:
def run_collection():
    # Get credentials
    client_key = client_key_widget.value
    client_secret = client_secret_widget.value

    if not client_key or not client_secret:
        print("Please enter your API credentials")
        return

    # Get filters
    countries = list(country_widget.value)
    start_date = start_date_widget.value.strftime('%Y%m%d')
    end_date = end_date_widget.value.strftime('%Y%m%d')
    keywords = [k.strip() for k in keyword_widget.value.split(',')] if keyword_widget.value else None
    max_results = max_results_widget.value
    download_media_files = download_media_widget.value

    try:
        # Get access token
        access_token = get_access_token(client_key, client_secret)
        api = TikTokResearchAPI(access_token)
        downloader = MediaDownloader()

        # Process each country
        for country in countries:
            print(f"\nProcessing {country}...")
            ads = api.fetch_ads_data(country, start_date, end_date, keywords, max_results)

            if ads:
                df = process_ads_to_dataframe(ads)

                # Save to Excel
                timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                filename = f"tiktok_ads_{country}_{timestamp}.csv"
                filepath = os.path.join(base_dir, 'data', filename)

                df.to_csv(filepath, index=False)
                print(f"Saved {len(df)} ads to {filepath}")

                if download_media_files:
                    downloader.download_media(df, country, base_dir)

                print(f"\nSummary for {country}:")
                print(f"Total ads: {len(df)}")
                print(f"Unique brands: {df['brand_names'].nunique()}")
                print(f"Unique creators: {df['creator_username'].nunique()}")
                print(f"Date range: {df['create_timestamp'].min()} to {df['create_timestamp'].max()}")

    except Exception as e:
        print(f"Error: {str(e)}")

# Create and display the run button
run_button = widgets.Button(
    description='Run Collection',
    button_style='success',
    layout=widgets.Layout(width='200px')
)

def on_run_button_clicked(b):
    run_collection()

run_button.on_click(on_run_button_clicked)
display(run_button)

In [None]:
ads

## 8. Verify Results

Check that all data has been collected and stored in your Google Drive under the following structure:

```
TikTok_Ad_Research/
├── data/
│   ├── tiktok_ads_FR_*.xlsx
│   ├── tiktok_ads_DE_*.xlsx
│   ├── tiktok_ads_IT_*.xlsx
│   └── tiktok_ads_ES_*.xlsx
└── media/
    ├── videos/
    │   ├── FR/
    │   ├── DE/
    │   ├── IT/
    │   └── ES/
    └── images/
        ├── FR/
        ├── DE/
        ├── IT/
        └── ES/
```