In [None]:
import logging
from config import IG_REEL_SCRAPING_CONFIG, COLUMN_MAPPER
from apify_service import ApifyService
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

In [None]:
# Access job parameter dataset id. By default, this would not be set meaning job runs will scrape new datasets by default. If provided, the pipeline will work on past scrape jobs that 
dataset_id_param = dbutils.widgets.get("dataset_id")  # will be empty string if not provided

if dataset_id_param:
    logger.info(f"Using manually provided dataset_id: {dataset_id_param}")
    dataset_id = dataset_id_param
else:
    logger.info("No dataset_id provided. Using one generated by ApifyService.")
    dataset_id = None

In [None]:
# Step 1: Scrape Reels data
apify_service = ApifyService()
if dataset_id:
    scraped_df = apify_service.get_dataset_id_df(dataset_id)
else:
    scraped_df = apify_service.get_config_df(IG_REEL_SCRAPING_CONFIG, wait_secs=60*60)
    dataset_id = scraped_df.loc[0, "dataset_id"]

# Step 2: Clean and rename columns
# Drop unneeded columns and map column names to work with downstream transformation functions
try:
    logger.info("Dropping unnecessary columns...")
    keep_columns_list = list(COLUMN_MAPPER.keys())
    cleaned_df = scraped_df[keep_columns_list]
    num_dropped_cols = scraped_df.shape[1] - len(keep_columns_list)
    logger.info(f"{num_dropped_cols} columns dropped.")
    logger.info(f"{cleaned_df.shape[1]} columns kept.")
except Exception as e:
    logger.error(f"Failed to drop columns from scraped dataframe due to exception: {e}")
    raise
# This will work if the previous try block worked
logger.info("Mapping remaining columns...")
cleaned_df = cleaned_df.rename(COLUMN_MAPPER, axis=1)
logger.info("Remaining columns mapped.")

In [None]:
# Convert to Spark DataFrame and save to Delta
spark_df = spark.createDataFrame(cleaned_df)
bronze_path = "workspace.test.nus_bronze_instagram_ingested"
spark_df.write.format("delta").mode("append").saveAsTable(bronze_path)

# Register the dataset_id so next task can use it
dbutils.jobs.taskValues.set(key="dataset_id", value=dataset_id)
dbutils.jobs.taskValues.set(key="bronze_path", value=bronze_path)

logger.info(f"Bronze table written to {bronze_path} for dataset_id={dataset_id}")