In [None]:
import sys
import boto3
import pandas as pd
import io
import re
from datetime import datetime
from pyspark.sql import SparkSession
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job


## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])


def get_glue_spark_session(catalog_nm: str, s3_bucket: str):
    spark = (
        SparkSession.builder
        .config(f"spark.sql.catalog.{catalog_nm}", "org.apache.iceberg.spark.SparkCatalog")
        .config(f"spark.sql.catalog.{catalog_nm}.warehouse", s3_bucket)
        .config(f"spark.sql.catalog.{catalog_nm}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
        .config(f"spark.sql.catalog.{catalog_nm}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
        .getOrCreate()
    )

    sc = spark.sparkContext
    glue_context = GlueContext(sc)
    return spark, glue_context

def create_metadata_df(all_xlsx_files: list) -> pd.DataFrame:
    processed = []

    for item in all_xlsx_files:
        key = item["Key"]
        last_modified = item["LastModified"]

        # 1. Lấy tên file (bỏ phần path, bỏ đuôi .xlsx)
        file_name = key.split("/")[-1].replace(".xlsx", "")

        # 2. Trích xuất created_date (pattern YYYY-MM-DD trong path hoặc file name)
        match = re.search(r"\d{4}-\d{2}-\d{2}", key)
        created_date = match.group(0) if match else None

        # 3. Chuyển LastModified sang dạng date
        last_modified_date = last_modified.date() if hasattr(last_modified, "date") else None

        processed.append({
            "File_Name": file_name,
            "created_date": created_date,
            "LastModified": last_modified_date
        })

    return pd.DataFrame(processed)

def read_checkpoint_table(spark, catalog_nm, table_name):
    df = spark.table(f"{catalog_nm}.{table_name}")
    return df

def read_all_excels_from_bucket(input_bucket: str, df_checkpoint: pd.DataFrame) -> pd.DataFrame:
    s3 = boto3.client("s3")
    
    # Keep_columns
    valid_cols = ["today_date", "country", "platform", "title", "rank", "media_id"]
    
    # Mapping table
    col_mapping = {
        "today_date": "Date",
        "country": "Market",
        "platform": "Platform",
        "title": "Title",
        "rank": "Ranking",
        "media_id": "Media ID",
    }
    
    # Scan bucket to get file .xlsx

    all_xlsx_files = []
    continuation_token = None

    while True:
        kwargs = {"Bucket": input_bucket}
        if continuation_token:
            kwargs["ContinuationToken"] = continuation_token

        response = s3.list_objects_v2(**kwargs)

        for obj in response.get("Contents", []):
            key = obj["Key"]
            if key.endswith(".xlsx"):
                all_xlsx_files.append({
                    "Key": key,
                    "LastModified": obj["LastModified"]
                })

        if response.get("IsTruncated"):
            continuation_token = response["NextContinuationToken"]
        else:
            break
    
    df_metadata = create_metadata_df(all_xlsx_files)
    df_checkpoint = df_checkpoint.toPandas()

    print(df_metadata.dtypes)
    print(df_checkpoint.dtypes)
    """
    df_metadata["created_date"] = pd.to_datetime(df_metadata["created_date"]).dt.date
    merge = df_metadata.merge(
        df_checkpoint,
        left_on=["File_Name", "created_date", "LastModified"],
        right_on=["file_name", "created_date", "modified_date"],
        how="right"
    )"""


    # Read each excel from S3
    all_dataframes = []

    for item in all_xlsx_files:
        xlsx_key = item["Key"]
        obj = s3.get_object(Bucket=input_bucket, Key=xlsx_key)

        data = obj["Body"].read()

        try:
            excel_file = pd.read_excel(io.BytesIO(data), sheet_name=None, engine="openpyxl")
            print(f"  Success: {list(excel_file.keys())}")  # in ra tên sheet
        except Exception as e:
            print(f"  Failed to read {xlsx_key}: {e}")

        df_list = []
        for sheet_name, df in excel_file.items():
            cols = [col for col in df.columns if col in valid_cols]
            if cols:
                df = df[cols]
                df_list.append(df)

        if df_list:
            df = pd.concat(df_list, ignore_index=True)
            df = df.rename(columns=col_mapping)
            #df["source_file"] = xlsx_key
            all_dataframes.append(df)
    
    # Merge all data
    if all_dataframes:
        final_df = pd.concat(all_dataframes, ignore_index=True)
        return final_df
    else:
        return pd.DataFrame()


def main():

    catalog_nm = "glue_catalog"
    s3_bucket = "s3://ampd-aldous-prod-datalake/curated"

    input_bucket = "top-titles-raw-data"
    output_bucket = "ampd-s3-auto-deletion"
    output_prefix = "testing_excel_combination/"

    spark, glue_context = get_glue_spark_session(catalog_nm, s3_bucket)

    job = Job(glue_context)
    job.init(args["JOB_NAME"], args)
    
    # Read table tmp.metadata_bucket_ttrd
    table_name = "tmp.metadata_bucket_ttrd"
    df_checkpoint = read_checkpoint_table(spark, catalog_nm, table_name)
    
    # Consolidate all file xlsx
    final_df = read_all_excels_from_bucket(input_bucket, df_checkpoint)

    if not final_df.empty:

        # Write file CSV to S3
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_key = f"{output_prefix.rstrip('/')}/combined_{timestamp}.csv"
        
        # Convert DataFrame -> CSV (bytes format)
        csv_buffer = io.StringIO()
        final_df.to_csv(csv_buffer, index=False)
        
        # Upload to S3
        s3 = boto3.client("s3")
        s3.put_object(
            Bucket=output_bucket,
            Key=output_key,
            Body=csv_buffer.getvalue().encode("utf-8"),
        )

    else:
        return

    job.commit()


if __name__ == "__main__":
    main()
