In [1]:
import json
from pathlib import Path
import csv
import os

In [2]:
HUBS = Path("/Users/ngoc.tran/lexer/ops-data/workflows/hubs")
CSV_DIR = "csv"
SCHEDULES_COLLECTION_FILE = "ALL_CLIENTS_schedules.csv"

In [3]:
def read_schedule(schedule_path: Path):
    with schedule_path.open("r") as f:
        return json.loads(f.read())

def save_schedule(scheule_path: Path, schedule: dict):
    with scheule_path.open("w") as f:
        return f.write(json.dumps(schedule, indent=4) + "\n")

def write_dicts_to_csv(data, directory, file_name):
    if not data:
        raise ValueError("Schedule should not be empty")

    os.makedirs(directory, exist_ok=True)

    csv_file = os.path.join(directory, file_name)

    # Open the file in write mode
    with open(csv_file, mode='w', newline='') as file:
        # Create a csv.DictWriter object
        writer = csv.DictWriter(file, fieldnames=data[0].keys())
        
        # Write the header
        writer.writeheader()
        
        # Write the rows
        writer.writerows(data)

    print(f'Data successfully written to {csv_file}')

def parse_cron_expression(cron_expr):
    # Split the cron expression into parts
    parts = cron_expr.split()
    
    # Define the labels for each part of the cron expression
    labels = ["seconds", 
              "minutes", 
              "hours", 
              "day_of_the_month", 
              "month", 
              "day_of_the_week", 
              "cron_optional"]
    
    # Check if the expression includes the optional Year part
    if len(parts) == 6:
        parts.append("")
    
    # Create a dictionary to hold the parsed parts
    cron_parts = {}
    
    # Populate the dictionary with the parts
    for label, part in zip(labels, parts):
        cron_parts[label] = part
    
    return cron_parts

In [4]:
schedule_list = []
for hub in HUBS.iterdir():
    if hub.is_dir():
        schedules = hub / "schedules"
        if schedules.is_dir():
            for p in schedules.iterdir():
                record = {}
                record["client"] = str(p).split("/")[-3]
                try:
                    s = read_schedule(p)
                    if s["schedule"]["pause_status"] == "UNPAUSED":
                        jt = s["job_type"]
                        record["job_type"] = jt
                        cron = parse_cron_expression(s["schedule"]["quartz_cron_expression"])
                        record["seconds"] = cron["seconds"]
                        record["minutes"] = cron["minutes"]
                        record["hours"] = cron["hours"]
                        record["day_of_the_month"] = cron["day_of_the_month"]
                        record["month"] = cron["month"]
                        record["day_of_the_week"] = cron["day_of_the_week"]
                        record["cron_optional"] = cron["cron_optional"]
                        record["cron"] = s["schedule"]["quartz_cron_expression"]
                        record["region"] = s["region"]
                        if ((jt == "load_dataset") or (jt == "run_enrichment")):
                            service_name = s["notebook_task"]["base_parameters"]["service_name"]
                        elif jt == "run_dataflow":
                            service_name = s["notebook_task"]["base_parameters"]["dataflow_config"]["name"]
                        elif jt == "build_index":
                            service_name = s["notebook_task"]["base_parameters"]["build_config"]["namespace"]
                            service_name = service_name.split(".")[2] + "_index"
                        elif jt == "build_features":
                            filename = str(p).split("/")[-1]
                            filename = filename.split(".")[0]
                            service_name = s["notebook_task"]["base_parameters"]["client"] + "_" + filename
                        elif jt == "unification_job":
                            service_name = s["notebook_task"]["base_parameters"]["bucket"]
                            service_name = service_name.split("-")[2] + "_unification"
                        elif jt == "run_report":
                            service_name = s["notebook_task"]["base_parameters"]["bucket"]
                            service_name = service_name.split("-")[2] + "_report"
                        record["service_name"] = service_name
                        record["fname"] = str(p)
                        schedule_list.append(record)
                except Exception as e:
                    continue

In [5]:
# Write to CSV
write_dicts_to_csv(schedule_list, CSV_DIR, SCHEDULES_COLLECTION_FILE)

Data successfully written to csv/ALL_CLIENTS_schedules.csv
