In [0]:
import sys
sys.path.append('/Workspace/Users/laugur1508@gmail.com/twitch_data_pipeline/notebooks')
import dlt
import requests
import pandas as pd
from pyspark.sql.functions import current_timestamp, to_timestamp, concat, col, lit
import logging
import json
from utils.auth import get_access_token, twitch_api_request
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, TimestampType

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


streams_schema = StructType([
    StructField("id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("user_login", StringType(), True),
    StructField("user_name", StringType(), True),
    StructField("game_id", StringType(), True),
    StructField("game_name", StringType(), True),
    StructField("type", StringType(), True),
    StructField("title", StringType(), True),
    StructField("viewer_count", IntegerType(), True),
    StructField("started_at", StringType(), True),  # string to keep original format, can cast later
    StructField("language", StringType(), True),
    StructField("thumbnail_url", StringType(), True),
    StructField("tag_ids", StringType(), True),
    StructField("tags", StringType(), True),
    StructField("is_mature", BooleanType(), True),
    StructField("ingestion_time", TimestampType(), True),
    StructField("ingestion_date", StringType(), True)
])


def get_twitch_streams_fixed(streamers, client_id, access_token):
    url = "https://api.twitch.tv/helix/streams"
    batch_size = 10
    all_streams = []

    for i in range(0, len(streamers), batch_size):
        batch = streamers[i:i + batch_size]
        params = [("user_login", s) for s in batch]

        try:
            streams = twitch_api_request(url, client_id, access_token, params)
            streams = streams.get('data', [])
            if streams:
                logger.info(f"Batch {i // batch_size + 1}: Retrieved {len(streams)} active streams")
                all_streams.extend(streams)
            else:
                logger.warning(f"Batch {i // batch_size + 1}: No streams or error occurred")
        except Exception as e:
            logger.error(f"Batch {i // batch_size + 1} failed: {e}")

    return all_streams

def clean_and_standardize_data(streams_data):
    if not streams_data:
        return None

    pdf = pd.DataFrame(streams_data)

    # Convert list fields to comma-separated strings
    for col_name in ['tag_ids', 'tags']:
        if col_name in pdf.columns:
            pdf[col_name] = pdf[col_name].apply(lambda x: ','.join(x) if isinstance(x, list) and x else '')
        else:
            pdf[col_name] = ''

    expected_columns = {
        'id': str, 'user_id': str, 'user_login': str, 'user_name': str,
        'game_id': str, 'game_name': str, 'type': str, 'title': str,
        'viewer_count': int, 'started_at': str, 'language': str,
        'thumbnail_url': str, 'tag_ids': str, 'tags': str, 'is_mature': bool
    }

    for col_name, col_type in expected_columns.items():
        if col_name not in pdf.columns:
            if col_type == str:
                pdf[col_name] = ''
            elif col_type == int:
                pdf[col_name] = 0
            elif col_type == bool:
                pdf[col_name] = False

    for col_name, col_type in expected_columns.items():
        try:
            if col_type == str:
                pdf[col_name] = pdf[col_name].astype(str).fillna('')
            elif col_type == int:
                pdf[col_name] = pd.to_numeric(pdf[col_name], errors='coerce').fillna(0).astype(int)
            elif col_type == bool:
                pdf[col_name] = pdf[col_name].astype(bool)
        except Exception as e:
            logger.warning(f"Could not convert column {col_name} to {col_type}: {e}")

    return pdf


@dlt.table(
    name="bronze_streams",
    comment="Bronze layer: Raw Twitch streams data with append-only mode",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        "pipelines.autoOptimize.managed": "true"
    },
    table_type="live"
)
def bronze_streams():
    """
    Bronze layer table for Twitch streams data.
    Uses append mode to ensure data is not overwritten.
    Captures all stream snapshots over time.
    """
    # Retrieve API credentials
    CLIENT_ID = dbutils.secrets.get(scope="my-secret", key="CLIENT-ID")
    CLIENT_SECRET = dbutils.secrets.get(scope="my-secret", key="CLIENT-SECRET")
    ACCESS_TOKEN = get_access_token(CLIENT_ID, CLIENT_SECRET)
    
    if not ACCESS_TOKEN:
        logger.error("Failed to get Twitch access token")
        return spark.createDataFrame([], streams_schema)

    # Load streamer configuration
    config_path = "dbfs:/FileStore/config/top50FrenchStreamer.json"
    config = json.loads(dbutils.fs.head(config_path))
    top50FrenchStreamers = config["top50FrenchStreamers"]

    logger.info(f"Starting Twitch streams ETL for {len(top50FrenchStreamers)} streamers.")

    # Fetch streams data from Twitch API
    streams = get_twitch_streams_fixed(top50FrenchStreamers, CLIENT_ID, ACCESS_TOKEN)

    if not streams:
        logger.info("No active streams found.")
        return spark.createDataFrame([], streams_schema)

    # Clean and standardize the data
    cleaned_pdf = clean_and_standardize_data(streams)
    if cleaned_pdf is None or cleaned_pdf.empty:
        logger.warning("No data to process after cleaning.")
        return spark.createDataFrame([], streams_schema)

    # Convert to Spark DataFrame
    spark_df = spark.createDataFrame(cleaned_pdf)

    # Add timestamp conversion and metadata columns
    spark_df = spark_df.withColumn("started_at", to_timestamp("started_at")) \
                       .withColumn("ingestion_time", current_timestamp()) \
                       .withColumn("ingestion_date", current_timestamp().cast("date").cast("string"))

    # Add a unique identifier to prevent duplicates
    spark_df = spark_df.withColumn("record_id", 
                                   concat(col("user_id"), 
                                         lit("_"), 
                                         col("started_at").cast("string")))

    logger.info(f"Successfully processed {spark_df.count()} stream records")
    return spark_df  # Fixed: removed spark.readStream()
