<a href="https://colab.research.google.com/github/monjurkuet/yt-crawler/blob/main/crawl_channel_details.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [15]:
import os
from google.colab import userdata # Keep for direct Colab execution, but prefer os.environ

# Define the CHANNEL_ID for the YouTube channel
channel_id = 'UCnwxzpFzZNtLH8NgTeAROFA'

class ConfigManager:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(ConfigManager, cls).__new__(cls)
            cls._instance._load_config()
        return cls._instance

    def _load_config(self):
        # Helper to get config, prioritizing environment variables
        def get_config_value(key):
            return os.environ.get(key) or userdata.get(key)

        # YouTube API Credentials
        self.API_KEY = get_config_value('API_KEY')

        # SSH Tunnel & MySQL Database Credentials
        self.SSH_HOST = get_config_value('SSH_HOST')
        self.SSH_USERNAME = 'administrator' # This is often a fixed username for cloud VMs
        self.SSH_PRIVATEKEY_PATH = get_config_value('SSH_PRIVATEKEY_PATH') # Path to private key on Colab
        self.LOCAL_PORT = 3307 # Local port for SSH tunnel
        self.REMOTE_MYSQL_HOST = '127.0.0.1'
        self.REMOTE_MYSQL_PORT = 3306
        self.DATABASE_NAME = get_config_value('DATABASE_NAME')
        self.DATABASE_PASSWORD = get_config_value('DATABASE_PASSWORD')

    def get_config(self):
        return {
            'API_KEY': self.API_KEY,
            'SSH_HOST': self.SSH_HOST,
            'SSH_USERNAME': self.SSH_USERNAME,
            'SSH_PRIVATEKEY_PATH': self.SSH_PRIVATEKEY_PATH,
            'LOCAL_PORT': self.LOCAL_PORT,
            'REMOTE_MYSQL_HOST': self.REMOTE_MYSQL_HOST,
            'REMOTE_MYSQL_PORT': self.REMOTE_MYSQL_PORT,
            'DATABASE_NAME': self.DATABASE_NAME,
            'DATABASE_PASSWORD': self.DATABASE_PASSWORD
        }

# Example usage (for testing purposes, not part of the module's core function)
if __name__ == '__main__':
    config = ConfigManager()
    #print("API Key:", config.API_KEY)
    #print("SSH Host:", config.SSH_HOST)
    #print("Database Name:", config.DATABASE_NAME)


In [16]:
from googleapiclient.discovery import build

print("--- Initializing YouTube API Client ---")

# Build the YouTube API service object
youtube = build('youtube', 'v3', developerKey=config.API_KEY)

print("YouTube API client initialized successfully.")

--- Initializing YouTube API Client ---
YouTube API client initialized successfully.


In [17]:
import json
from googleapiclient.errors import HttpError

try:
    # Call the channels.list method to retrieve comprehensive channel data
    channel_response = youtube.channels().list(
        id=channel_id,
        part='snippet,contentDetails,statistics,topicDetails,status,brandingSettings'
    ).execute()

    # Extract and display channel details
    if channel_response['items']:
        channel_data = channel_response['items'][0] # Assuming there's at least one item for the given ID
        #print("\n--- Full Channel Data (JSON) ---")
        #print(json.dumps(channel_data, indent=2))

        # Parse and display requested items
        print("\n--- Parsed Channel Details ---")
        print(f"ID: {channel_data.get('id')}")
        print(f"Title: {channel_data.get('snippet', {}).get('title')}")
        print(f"Description: {channel_data.get('snippet', {}).get('description')[:200]}...") # Truncate description for brevity
        print(f"Custom URL: {channel_data.get('snippet', {}).get('customUrl')}")
        print(f"Published At: {channel_data.get('snippet', {}).get('publishedAt')}")
        print(f"Country: {channel_data.get('snippet', {}).get('country')}")
        print(f"View Count: {channel_data.get('statistics', {}).get('viewCount')}")
        print(f"Subscriber Count: {channel_data.get('statistics', {}).get('subscriberCount')}")
        print(f"Video Count: {channel_data.get('statistics', {}).get('videoCount')}")
        print(f"Topic Details: {channel_data.get('topicDetails', {})}")
        print(f"Made For Kids: {channel_data.get('status', {}).get('madeForKids')}")
        print(f"Keywords: {channel_data.get('brandingSettings', {}).get('channel', {}).get('keywords')}")

    else:
        print(f"No channel data found for ID: {channel_id}")

except HttpError as e:
    print(f"An HTTP error {e.resp.status} occurred: {e.content.decode()}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")


--- Parsed Channel Details ---
ID: UCnwxzpFzZNtLH8NgTeAROFA
Title: Krown
Description: Welcome to the Cave. 

Daily price analysis of Bitcoin 
...
Custom URL: @eckrown
Published At: 2016-08-30T14:16:04Z
Country: US
View Count: 47117029
Subscriber Count: 203000
Video Count: 3722
Topic Details: {'topicIds': ['/m/01k8wb'], 'topicCategories': ['https://en.wikipedia.org/wiki/Knowledge']}
Made For Kids: False
Keywords: "bitcoin daily" "bitcoin analysis" "top bitcoin analysis" "best bitcoin analysis" "bitcoin price prediction" "bitcoin predicition" "bitcoin today" "bitcoin trading" "krown bitcoin" "bitcoin news" bitcoin "bitcoin price news" "bitcoin today news" "crypto news" "crypto news today" "Bitcoin price prediction 2021" "Bitcoin 2021" "Bitcoin price 2021" "Bitcoin 2021 price analysis" "bitcoin price prediction August" "bitcoin August" "bitcoin August 2021" "bitcoin August analysis" ethereum "eth analysis"


In [18]:
!pip install mysql-connector-python sshtunnel paramiko==3.4.0 tenacity



In [19]:
import mysql.connector
from sshtunnel import SSHTunnelForwarder, BaseSSHTunnelForwarderError
import warnings
import paramiko
import logging
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    after_log
)
import json

# Suppress Paramiko UserWarning about missing cryptography library
warnings.filterwarnings('ignore', category=UserWarning, module='paramiko')

class DBConnector:
    def __init__(self):
        self.config = ConfigManager()
        self.ssh_tunnel = None
        self.mysql_conn = None
        self.logger = self._setup_logging()

        self.create_channels_table_sql = """
CREATE TABLE IF NOT EXISTS youtube_channels (
    channel_id VARCHAR(255) PRIMARY KEY,
    title VARCHAR(255),
    description TEXT,
    custom_url VARCHAR(255),
    published_at DATETIME,
    country VARCHAR(10),
    view_count BIGINT,
    subscriber_count BIGINT,
    video_count BIGINT,
    topic_details JSON, -- Storing topicDetails as JSON
    made_for_kids BOOLEAN,
    keywords TEXT, -- Storing keywords as TEXT
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
"""

    def _setup_logging(self):
        logger = logging.getLogger('db_connector')
        if not logger.handlers: # Prevent adding multiple handlers
            handler = logging.StreamHandler() # Or a file handler
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            handler.setFormatter(formatter)
            logger.addHandler(handler)
            logger.setLevel(logging.INFO)
        return logger

    # Helper for retry condition
    def _is_connection_error(self, exception):
        return isinstance(exception, (
            mysql.connector.Error,
            paramiko.SSHException,
            BaseSSHTunnelForwarderError
        ))

    @retry(
        wait=wait_exponential(multiplier=1, min=2, max=10),
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type((mysql.connector.Error, paramiko.SSHException, BaseSSHTunnelForwarderError)),
        after=after_log(logging.getLogger('db_connector'), logging.WARNING)
    )
    def establish_connection(self):
        self.close_connection() # Ensure previous connections are closed before retrying
        try:
            # Establish SSH tunnel
            self.logger.info("Attempting to establish SSH tunnel...")
            self.ssh_tunnel = SSHTunnelForwarder(
                (self.config.SSH_HOST, 22),
                ssh_username=self.config.SSH_USERNAME,
                ssh_pkey=self.config.SSH_PRIVATEKEY_PATH,
                remote_bind_address=(self.config.REMOTE_MYSQL_HOST, self.config.REMOTE_MYSQL_PORT),
                local_bind_address=('0.0.0.0', self.config.LOCAL_PORT)
            )
            self.ssh_tunnel.start()
            self.logger.info(f"SSH tunnel established on local port {self.ssh_tunnel.local_bind_port}")

            # Connect to MySQL
            self.logger.info("Attempting to connect to MySQL database...")
            self.mysql_conn = mysql.connector.connect(
                host='localhost',
                port=self.ssh_tunnel.local_bind_port,
                user='adminuser',
                password=self.config.DATABASE_PASSWORD,
                database=self.config.DATABASE_NAME
            )
            if self.mysql_conn.is_connected():
                self.logger.info("Successfully connected to MySQL database.")
                return True
            else:
                self.logger.error("Failed to connect to MySQL database after SSH tunnel.")
                self.close_connection() # Close partially established connections
                return False

        except (mysql.connector.Error, paramiko.SSHException, BaseSSHTunnelForwarderError) as e:
            self.logger.error(f"Connection error during establish_connection: {e}")
            self.close_connection()
            raise # Re-raise to trigger retry
        except Exception as e:
            self.logger.critical(f"An unexpected critical error occurred during connection establishment: {e}")
            self.close_connection()
            return False

    def create_channels_table(self):
        if self.mysql_conn and self.mysql_conn.is_connected():
            try:
                cursor = self.mysql_conn.cursor()
                self.logger.info("Executing CREATE TABLE statement for youtube_channels...")
                cursor.execute(self.create_channels_table_sql)
                self.mysql_conn.commit()
                cursor.close()
                self.logger.info("Table 'youtube_channels' ensured.")
                return True
            except mysql.connector.Error as err:
                if err.errno == 1050: # Table already exists
                    self.logger.info("Table 'youtube_channels' already exists. Skipping creation.")
                else:
                    self.logger.error(f"Error creating table youtube_channels: {err}")
                return False
            except Exception as e:
                self.logger.error(f"An unexpected error occurred during youtube_channels table creation: {e}")
                return False
        else:
            self.logger.warning("Cannot create table youtube_channels: MySQL connection is not active.")
        return False

    def insert_channel_data(self, channel_data):
        if not self.mysql_conn or not self.mysql_conn.is_connected():
            self.logger.error("No active MySQL connection to insert channel data.")
            return False

        try:
            cursor = self.mysql_conn.cursor()
            insert_sql = """
            INSERT IGNORE INTO youtube_channels (
                channel_id, title, description, custom_url, published_at, country,
                view_count, subscriber_count, video_count, topic_details, made_for_kids, keywords
            )
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            """

            # Prepare data for insertion
            # Ensure topic_details and keywords are stringified if they are not already
            topic_details_str = json.dumps(channel_data.get('topicDetails', {})) if channel_data.get('topicDetails') else None
            keywords_str = channel_data.get('brandingSettings', {}).get('channel', {}).get('keywords')

            record_to_insert = (
                channel_data.get('id'),
                channel_data.get('snippet', {}).get('title'),
                channel_data.get('snippet', {}).get('description'),
                channel_data.get('snippet', {}).get('customUrl'),
                channel_data.get('snippet', {}).get('publishedAt'),
                channel_data.get('snippet', {}).get('country'),
                channel_data.get('statistics', {}).get('viewCount'),
                channel_data.get('statistics', {}).get('subscriberCount'),
                channel_data.get('statistics', {}).get('videoCount'),
                topic_details_str,
                channel_data.get('status', {}).get('madeForKids'),
                keywords_str
            )

            self.logger.info(f"Inserting channel data for channel ID: {channel_data.get('id')}")
            cursor.execute(insert_sql, record_to_insert)
            self.mysql_conn.commit()
            inserted_rows_count = cursor.rowcount
            self.logger.info(f"Channel data insertion complete. New records processed: {inserted_rows_count} (inserted or ignored duplicates).")
            cursor.close()
            return True
        except mysql.connector.Error as err:
            self.logger.error(f"Error inserting channel data: {err}")
            self.mysql_conn.rollback() # Rollback if an error occurs
            return False
        except Exception as e:
            self.logger.error(f"An unexpected error occurred during channel data insertion: {e}")
            return False

    def close_connection(self):
        if self.mysql_conn and self.mysql_conn.is_connected():
            self.mysql_conn.close()
            self.mysql_conn = None
            self.logger.info("MySQL connection closed.")
        if self.ssh_tunnel and self.ssh_tunnel.is_active:
            self.ssh_tunnel.stop()
            self.ssh_tunnel = None
            self.logger.info("SSH tunnel stopped.")

# Example usage
if __name__ == '__main__':
    db_connector = DBConnector()
    if db_connector.establish_connection():
        db_connector.create_channels_table()

        # Assuming 'channel_data' is available from previous steps
        if 'channel_data' in globals() and channel_data:
            db_connector.insert_channel_data(channel_data)
        else:
            db_connector.logger.info("No channel data available to insert.")

        db_connector.close_connection()
    else:
        db_connector.logger.error("Failed to connect to database.")


2025-12-06 21:29:46,452 - db_connector - INFO - Attempting to establish SSH tunnel...
INFO:db_connector:Attempting to establish SSH tunnel...
2025-12-06 21:29:47,177 - db_connector - INFO - SSH tunnel established on local port 3307
INFO:db_connector:SSH tunnel established on local port 3307
2025-12-06 21:29:47,181 - db_connector - INFO - Attempting to connect to MySQL database...
INFO:db_connector:Attempting to connect to MySQL database...
2025-12-06 21:29:48,205 - db_connector - INFO - Successfully connected to MySQL database.
INFO:db_connector:Successfully connected to MySQL database.
2025-12-06 21:29:48,256 - db_connector - INFO - Executing CREATE TABLE statement for youtube_channels...
INFO:db_connector:Executing CREATE TABLE statement for youtube_channels...
2025-12-06 21:29:48,379 - db_connector - INFO - Table 'youtube_channels' ensured.
INFO:db_connector:Table 'youtube_channels' ensured.
2025-12-06 21:29:48,431 - db_connector - INFO - Inserting channel data for channel ID: UCnwx