In [None]:
# Import python packages
import streamlit as st
import pandas as pd

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
import re
from snowflake.snowpark.functions import col, lit
import snowflake.snowpark as snowpark
from typing import Optional

class BronzeIngestor:
    def __init__(
        self,
        session: snowpark.Session,
        stage_path: str,             # e.g., @DB.SCHEMA.STAGE/datalake/bronze/crawler/metadata/user_post
        bronze_table: str,
        meta_table: str = "PROCESSING_DATE"
    ):
        self.session = session
        self.stage = stage_path.rstrip("/")   # Ensure no trailing slash
        self.bronze = bronze_table
        self.meta = meta_table

        # Create processing metadata table if not exists
        self.session.sql("USE DATABASE TECHTEST").collect()
        self.session.sql("USE SCHEMA BACKEND_DEV_DE_2025_PAULHAPPY").collect()
        
        self.session.sql(f"""
        CREATE TABLE IF NOT EXISTS {self.meta} (
            source STRING PRIMARY KEY,
            last_processed TIMESTAMP_LTZ
        )
        """).collect()

    def get_last_processed(self) -> Optional[str]:
        df = self.session.table(self.meta).filter(col("source") == self.bronze)
        row = df.collect()
        return row[0]["LAST_PROCESSED"] if row else None

    def update_last_processed(self, ts: str):
        self.session.sql(f"""
        MERGE INTO {self.meta} AS m
        USING (SELECT '{ts}'::TIMESTAMP_LTZ AS last_processed) AS v
        ON m.source = '{self.bronze}'
        WHEN MATCHED THEN UPDATE SET last_processed = v.last_processed
        WHEN NOT MATCHED THEN INSERT (source, last_processed)
        VALUES ('{self.bronze}', v.last_processed)
        """).collect()

    def ingest(self):
        last_ts = self.get_last_processed()

        # Step 1: List all JSON.gz files
        file_list = self.session.sql(f"LIST {self.stage}").collect()
        print(file_list)
        json_files = [
            row["name"]
            for row in file_list
            if row["name"].endswith(".json.gz")
        ]

        all_dfs = []

        for file_path in json_files:
            # Match: user_post/{ingestion_ts}/{platform}/{author}/{file}.json.gz
            match = re.search(
                r"backend_dev_de_2025_stage_paulhappy/datalake/bronze/crawler/metadata/user_post/([^/]+)/([^/]+)/([^/]+)/[^/]+\.json\.gz",
                file_path
            )

            if not match:
                continue

            ingestion_ts_raw, platform, author = match.groups()

            # Convert to proper TS format
            ingestion_ts = ingestion_ts_raw.replace("T", " ") \
                                           .replace("-", ":", 2) \
                                           .replace("-", "", 1)

            # Compare ingestion date
            if last_ts and ingestion_ts <= str(last_ts):
                continue

            df = self.session.read.option("compression", "gzip").json(f"@{file_path}")
            df = df.with_column("ingestion_date_timestamp", lit(ingestion_ts))
            df = df.with_column("platform", lit(platform))
            df = df.with_column("author", lit(author))
            all_dfs.append(df)

        if not all_dfs:
            print("No new files to ingest.")
            return

        # Combine all into one DataFrame
        full_df = all_dfs[0]
        for df in all_dfs[1:]:
            full_df = full_df.union(df)

        result_df = full_df.select(
            col("author_id"),
            col("text"),
            col("timestamp"),
            col("likes"),
            col("reposts"),
            col("comments"),
            col("media_urls"),
            col("media_local_paths"),
            col("ingestion_date_timestamp"),
            col("platform"),
            col("author")  # from path
        )

        # Ensure bronze table exists
        self.session.sql(f"""
        CREATE TABLE IF NOT EXISTS {self.bronze} (
            author_id STRING,
            text STRING,
            timestamp STRING,
            likes NUMBER,
            reposts NUMBER,
            comments NUMBER,
            media_urls ARRAY,
            media_local_paths ARRAY,
            ingestion_date_timestamp STRING,
            platform STRING,
            author STRING
        )
        """).collect()

        result_df.write.mode("append").save_as_table(self.bronze)

        max_ts = result_df.agg({"ingestion_date_timestamp": "max"}).collect()[0][0]
        self.update_last_processed(max_ts)

        print(f"Ingested up to {max_ts}")


In [None]:
class SilverTransformer:
    def __init__(self, session: snowpark.Session, bronze_table: str, silver_table: str):
        self.session = session
        self.bronze = bronze_table
        self.silver = silver_table

    def run(self):
        df = self.session.table(self.bronze)
        clean = df.select(
            col("author_id"),
            col("text"),
            # convert ISO string → TIMESTAMP_LTZ
            to_timestamp_ltz(col("timestamp"),
                             "YYYY-MM-DD\"T\"HH24:MI:SS").as_("timestamp"),
            col("likes"),
            col("reposts"),
            col("comments"),
            col("media_urls"),
            col("media_local_paths"),
            to_timestamp_ltz(col("ingestion_date_timestamp"),
                             "YYYY-MM-DD\"T\"HH24:MI:SS")
              .as_("ingestion_date_timestamp"),
            col("platform")
        ).drop_duplicates()

        # write out
        self.session.sql(f"""
        CREATE TABLE IF NOT EXISTS {self.silver} LIKE {self.bronze};
        """).collect()
        clean.write.mode("overwrite").save_as_table(self.silver)
        print(f"Silver table {self.silver} updated.")

In [None]:
class GoldAggregator:
    def __init__(self, session: snowpark.Session, silver_table: str):
        self.session = session
        self.silver = silver_table

    def build(self):
        self.session.sql(f"""
            CREATE OR REPLACE TABLE gold_crawler_data AS
                WITH base AS (
                  SELECT
                    author_id,
                    text,
                    timestamp,
                    (likes + reposts + comments) AS interactions,
                    DATE_TRUNC('WEEK', timestamp) AS week_start
                  FROM silver.crawler_data
                ),
                author_top AS (
                  -- top post per author (all‐time)
                  SELECT author_id,
                         text AS top_post_text,
                         interactions AS top_post_interactions
                  FROM (
                    SELECT author_id, text, interactions,
                           ROW_NUMBER() OVER (
                             PARTITION BY author_id
                             ORDER BY interactions DESC
                           ) AS rn
                    FROM base
                  )
                  WHERE rn = 1
                ),
                weekly_top AS (
                  -- top post per author per week
                  SELECT author_id,
                         week_start,
                         text  AS weekly_top_post_text,
                         interactions AS weekly_top_post_interactions
                  FROM (
                    SELECT author_id,
                           week_start,
                           text,
                           interactions,
                           ROW_NUMBER() OVER (
                             PARTITION BY author_id, week_start
                             ORDER BY interactions DESC
                           ) AS rn
                    FROM base
                  )
                  WHERE rn = 1
                )
                SELECT
                  pc.author_id,
                  w.week_start,
                  at.top_post_text,
                  at.top_post_interactions,
                  w.weekly_top_post_text,
                  w.weekly_top_post_interactions
                FROM author_top  at 
                LEFT JOIN weekly_top  w  ON w.author_id   = at.author_id
            ;""").collect()


In [None]:
def main(session: snowpark.Session):
    # adjust these names to match your Snowflake objects
    BRONZE_TABLE = "bronze_crawler_data"
    SILVER_TABLE = "silver_crawler_data"
    STAGE_NAME   = '@"TECHTEST"."BACKEND_DEV_DE_2025_PAULHAPPY"."BACKEND_DEV_DE_2025_STAGE_PAULHAPPY"/datalake/bronze/crawler/metadata/user_post'                       
    ING_META_TBL = "PROCESSING_DATE"

    # 1) Bronze
    bi = BronzeIngestor(
        session,
        stage_path=STAGE_NAME,
        bronze_table=BRONZE_TABLE,
        meta_table=ING_META_TBL
    )
    bi.ingest()

    # 2) Silver
    st = SilverTransformer(session, bronze_table=BRONZE_TABLE, silver_table=SILVER_TABLE)
    st.run()

    # 3) Gold
    ga = GoldAggregator(session, silver_table=SILVER_TABLE)
    ga.build()

    return "Pipeline complete!"

In [None]:
LIST '@"TECHTEST"."BACKEND_DEV_DE_2025_PAULHAPPY"."BACKEND_DEV_DE_2025_STAGE_PAULHAPPY"/datalake/bronze/crawler/metadata/user_post'

In [None]:
main(session)