### API Exploration & Documentation Review (15 mins)
Review YouTube API docs for videos endpoint <br>
Understand pagination for historical data <br>
Check API quotas and limits <br>

In [17]:
from googleapiclient.discovery import build
import os
from dotenv import load_dotenv
import json
from datetime import datetime
import re
from pathlib import Path

def ensure_data_directories():
    """Create data directories if they don't exist"""
    Path("data/raw").mkdir(parents=True, exist_ok=True)
    Path("data/processed").mkdir(parents=True, exist_ok=True)

def get_channel_details(youtube, channel_id):
    """Get detailed channel metrics"""
    try:
        request = youtube.channels().list(
            part="snippet,statistics,brandingSettings",
            id=channel_id
        )
        response = request.execute()
        
        if 'items' in response and response['items']:
            channel = response['items'][0]
            
            return {
                # Primary Key
                'channel_id': channel_id,
                
                # Core Metadata
                'channel_name': channel['snippet']['title'],
                'channel_url': f"https://www.youtube.com/channel/{channel_id}",
                'country': channel['snippet'].get('country', 'Unknown'),
                'joined_date': channel['snippet']['publishedAt'],
                
                # Performance Metrics
                'subscriber_count': int(channel['statistics']['subscriberCount']),
                'total_views': int(channel['statistics']['viewCount']),
                
                # Audit Fields
                'extracted_at': datetime.now().isoformat(),
            }
    except Exception as e:
        print(f"Error getting channel details: {str(e)}")
        return None

def parse_duration(duration):
    """Convert YouTube duration (PT1H2M10S) to seconds"""
    match = re.match(r'PT((?P<hours>\d+)H)?((?P<minutes>\d+)M)?((?P<seconds>\d+)S)?', duration)
    if not match:
        return 0
    
    parts = {k: int(v) for k, v in match.groupdict().items() if v}
    return parts.get('hours', 0) * 3600 + parts.get('minutes', 0) * 60 + parts.get('seconds', 0)

def get_video_details(youtube, channel_id, start_date=None, end_date=None, max_results=None):
    """
    Get video metrics (excluding shorts) within a date range
    
    Quota usage per iteration:
    - search().list() = 100 units
    - videos().list() = 1 unit per video (max 50 per request)
    """
    videos = []
    next_page_token = None
    quota_used = 0
    
    # Convert dates to datetime if provided
    if start_date:
        start_date = datetime.fromisoformat(start_date.replace('Z', '+00:00')).replace(tzinfo=None)
    if end_date:
        end_date = datetime.fromisoformat(end_date.replace('Z', '+00:00')).replace(tzinfo=None)

    def clean_text(text):
        """Clean unicode characters and HTML entities from text"""
        text = text.encode('ascii', 'ignore').decode('ascii')
        text = text.replace('&amp;', '&')
        text = text.replace('&#39;', "'")
        return text.strip()

    try:
        while True:
            # Get videos sorted by date
            request = youtube.search().list(
                part="snippet",
                channelId=channel_id,
                maxResults=50,
                order="date",
                type="video",
                pageToken=next_page_token,
                publishedAfter=start_date.isoformat() + 'Z',
                publishedBefore=end_date.isoformat() + 'Z'
            )
            search_response = request.execute()
            quota_used += 100
            
            if not search_response.get('items'):
                break

            video_ids = [item['id']['videoId'] for item in search_response['items']]
            
            # Get detailed video info
            video_request = youtube.videos().list(
                part="contentDetails,statistics",
                id=','.join(video_ids)
            )
            video_response = video_request.execute()
            quota_used += 1

            # Process each video
            for search_item, video_item in zip(search_response['items'], video_response['items']):
                upload_datetime = datetime.fromisoformat(
                    search_item['snippet']['publishedAt'].replace('Z', '+00:00')
                ).replace(tzinfo=None)
                
                duration = video_item['contentDetails']['duration']
                
                # Skip shorts (videos under 60 seconds)
                if parse_duration(duration) < 60:
                    continue
                
                videos.append({
                    'video_id': search_item['id']['videoId'],
                    'channel_id': channel_id,
                    'title': clean_text(search_item['snippet']['title']),
                    'url': f"https://www.youtube.com/watch?v={search_item['id']['videoId']}",
                    'duration_seconds': parse_duration(duration),
                    'view_count': int(video_item['statistics']['viewCount']),
                    'upload_datetime': upload_datetime.isoformat(),
                    'extracted_at': datetime.now().isoformat()
                })
                
                if max_results and len(videos) >= max_results:
                    return videos, quota_used

            next_page_token = search_response.get('nextPageToken')
            if not next_page_token:
                break

        return videos, quota_used
    except Exception as e:
        print(f"Error getting video details: {str(e)}")
        return None, quota_used

def save_data(data, filename):
    """Save data to JSON file with timestamp"""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filepath = Path(f"data/raw/{filename}_{timestamp}.json")
    
    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=2, ensure_ascii=False)
    
    print(f"Data saved to {filepath}")
    return filepath

if __name__ == "__main__":
    # Create data directories
    ensure_data_directories()
    
    # Initialize API connection
    load_dotenv()
    youtube = build('youtube', 'v3', 
                   developerKey=os.getenv('YOUTUBE_API_KEY'))
    
    channel_id = "UCbCmjCuTUZos6Inko4u57UQ"  # Cocomelon
    
    # Get channel details
    print("\nFetching channel details...")
    channel_data = get_channel_details(youtube, channel_id)
    if channel_data:
        save_data(channel_data, "channel_details")
    
    # Get all videos from 2023
    print("\nFetching videos from 2023...")
    videos, quota_used = get_video_details(
        youtube,
        channel_id,
        start_date="2023-01-01T00:00:00",
        end_date="2023-12-31T23:59:59"
    )
    
    if videos:
        save_data(videos, "video_details")
        print(f"\nFetched {len(videos)} videos")
        print(f"API quota units used: {quota_used}")

2024-12-22 06:08:48,244 - INFO - file_cache is only supported with oauth2client<4.0.0



Fetching channel details...
Data saved to data/raw/channel_details_20241222_060848.json

Fetching videos from 2023...
Data saved to data/raw/video_details_20241222_060851.json

Fetched 131 videos
API quota units used: 505
