# Cloud Scheduler and PubSub Deployment

This notebook is used to create cron-based GCP schedulers, which are used to trigger the ingest function. They should pass a flight identifier to the function, for which the function will pull that identifier from the FlightAware API.

This can be run once on setup of the project, and the GCP resources will be created to trigger the ingest cloud function.

To run properly, your GCP credentials must be globally available to the notebook. This can be accomplished by using the google cloud SDK and running `gcloud auth application-default login` in your terminal. Or using the `google cloud VS Code extension` and logging in there.

In future iterations, the multiple schedulers can be creater for each flight identifier to be tracked.

In [1]:
# !pip install google-cloud-scheduler
# !pip install google-cloud-pubsub

In [2]:
from google.cloud import pubsub_v1
from google.cloud import scheduler_v1

import json
import os
import pandas as pd

In [3]:
def list_cloud_scheduler_jobs(project_id, region = 'us-central1', return_df = False):
    
    scheduler_client = scheduler_v1.CloudSchedulerClient()
    
    # parent location must be formatted like this
    parent = f'projects/{project_id}/locations/{region}'

    jobs_data = []
    for job in scheduler_client.list_jobs(parent=parent):
        jobs_data.append({
            "Name": job.name,
            "Description": job.description,
            "Schedule": job.schedule,
            "Time Zone": job.time_zone,
            "Pubsub Target": job.pubsub_target.topic_name,
            "Pubsub Target Data": job.pubsub_target.data,
        })

    # Creating a DataFrame
    df = pd.DataFrame(jobs_data)

    if return_df: return df 
    else: return jobs_data

In [4]:
def list_pubsub_topics(project_id, return_df = False):
    project_path = f"projects/{project_id}"

    # Collecting topic data
    topics_data = []
    publisher_client = pubsub_v1.PublisherClient()

    for topic in publisher_client.list_topics(request={"project": project_path}):
        topics_data.append({
            "Name": topic.name,
            "KMS Key Name": topic.kms_key_name
        })

    # Creating a DataFrame
    df = pd.DataFrame(topics_data)

    if return_df: return df 
    else: return topics_data


In [5]:
# from google.protobuf.duration_pb2 import Duration
from google.cloud.scheduler_v1.types import PubsubTarget, HttpTarget, Job

def create_or_update_cloud_scheduler_job(project_id, region, name, description, cron_schedule, time_zone, pubsub_topic, pubsub_data):
    # Format the parent and job name
    parent = f'projects/{project_id}/locations/{region}'
    job_name = f'{parent}/jobs/{name}'

    # Cloud Scheduler client
    scheduler_client = scheduler_v1.CloudSchedulerClient()

    # Pubsub target
    pubsub_target = PubsubTarget(
        topic_name=pubsub_topic,
        data=pubsub_data.encode('utf-8') # encode the data as bytes
    )

    # Job configuration
    job = Job(
        name=job_name,
        description=description,
        schedule=cron_schedule,
        time_zone=time_zone,
        pubsub_target=pubsub_target,
    )

    # Attempt to update the job
    try:
        response = scheduler_client.update_job(job=job)
        print('Job updated:', response.name)
    except Exception as e:
        # If job does not exist, create a new one
        response = scheduler_client.create_job(parent=parent, job=job)
        print('Job created:', response.name)

    return response


In [6]:
def delete_cloud_scheduler_job(project_id, region, name):
    # Cloud Scheduler client
    scheduler_client = scheduler_v1.CloudSchedulerClient()

    # Format the job name
    job_name = f'projects/{project_id}/locations/{region}/jobs/{name}'

    # Attempt to delete the job
    try:
        scheduler_client.delete_job(name=job_name)
        print(f'Job {name} deleted successfully.')
    except Exception as e:
        print(f'An error occurred while deleting job {name}: {e}')

    return None


# Main Execution Below

In [7]:
project_id = 'aia-ds-accelerator-flight-1'

In [8]:
# Replicate the original trigger

name = 'flight-ingest-trigger-10min'
description = 'CRON-based trigger for cloud function used for flight data ingestion every 10 minutes'
cron_schedule = '*/10 * * * *' # Every 10 minutes
time_zone = 'Etc/UTC'
pubsub_topic = 'projects/aia-ds-accelerator-flight-1/topics/ingest-flight-snapshot-trigger'

###### Data to be passed to the cloud function ######
flight_ident_dict = {'flight_ident': 'AA2563'}
pubsub_data = json.dumps(flight_ident_dict)

create_or_update_cloud_scheduler_job(project_id, 'us-central1', name, description, cron_schedule, time_zone, pubsub_topic, pubsub_data)


Job updated: projects/aia-ds-accelerator-flight-1/locations/us-central1/jobs/flight-ingest-trigger-10min


name: "projects/aia-ds-accelerator-flight-1/locations/us-central1/jobs/flight-ingest-trigger-10min"
description: "CRON-based trigger for cloud function used for flight data ingestion every 10 minutes"
pubsub_target {
  topic_name: "projects/aia-ds-accelerator-flight-1/topics/ingest-flight-snapshot-trigger"
  data: "{\"flight_ident\": \"AA2563\"}"
}
schedule: "*/10 * * * *"
time_zone: "Etc/UTC"
user_update_time {
  seconds: 1690867940
}
state: PAUSED

In [9]:
list_cloud_scheduler_jobs(project_id, region= 'us-central1', return_df= False)

[{'Name': 'projects/aia-ds-accelerator-flight-1/locations/us-central1/jobs/flight-ingest-trigger-10min',
  'Description': 'CRON-based trigger for cloud function used for flight data ingestion every 10 minutes',
  'Schedule': '*/10 * * * *',
  'Time Zone': 'Etc/UTC',
  'Pubsub Target': 'projects/aia-ds-accelerator-flight-1/topics/ingest-flight-snapshot-trigger',
  'Pubsub Target Data': b'{"flight_ident": "AA2563"}'}]

In [10]:
list_pubsub_topics(project_id ,return_df=True)

Unnamed: 0,Name,KMS Key Name
0,projects/aia-ds-accelerator-flight-1/topics/in...,


In [11]:
# # Create an example trigger

# name = 'example-scheduler-job'
# description = 'This is a sample scheduler job.'
# cron_schedule = '*/10 * * * *' # Every 5 minutes
# time_zone = 'Etc/UTC'
# pubsub_topic = 'projects/aia-ds-accelerator-flight-1/topics/my-topic'
# # pubsub_topic = 'projects/aia-ds-accelerator-flight-1/topics/ingest-flight-snapshot-trigger'
# pubsub_data = 'This is the message contents.'

# create_or_update_cloud_scheduler_job(project_id, 'us-central1', name, description, cron_schedule, time_zone, pubsub_topic, pubsub_data)


In [12]:
# # Delete the example trigger

# name = 'example-scheduler-job'
# delete_cloud_scheduler_job(project_id, 'us-central1', name)


In [13]:
# In future, can iterate over a list of flight identifiers, and deploy a scheduler for each.
# Potentially can maintein a table of identifiers to carry, for which schedulers can be added.