In [1]:
# Import standard library
import os

# Set base directory name
BASE_DIR = "gcp_colab_sim"

# Create base folder
os.makedirs(BASE_DIR, exist_ok=True)

# Create src folder
os.makedirs(os.path.join(BASE_DIR, "src"), exist_ok=True)


In [2]:
# Import helper modules
import textwrap

# Define helper to write file
def write_file(path, content):
    with open(path, "w", encoding="utf-8") as f:
        f.write(textwrap.dedent(content).lstrip())

# Write PubSubClientMock class into src/pubsub_mock.py
write_file(
    os.path.join(BASE_DIR, "src", "pubsub_mock.py"),
    """
class PubSubClientMock:
    def __init__(self):
        # Initialize internal list of messages
        self.messages = []

    def publish(self, topic: str, data: dict):
        # Append message to internal list
        self.messages.append({"topic": topic, "data": data})
        # Print message for visibility
        print(f"[PubSubMock] Publish to {topic}: {data}")
    """
)


In [3]:
# Import sys to modify path
import sys

# Add base directory to Python path
sys.path.append(BASE_DIR)

# Check that mock file exists
print(os.path.exists(os.path.join(BASE_DIR, "src", "pubsub_mock.py")))


True


In [4]:
# Import PubSubClientMock from src
from src.pubsub_mock import PubSubClientMock

# Create client instance
client = PubSubClientMock()

# Publish one sim event
client.publish("events-topic", {"user_id": 1, "event": "view_product"})

# Print internal messages list
print(client.messages)


[PubSubMock] Publish to events-topic: {'user_id': 1, 'event': 'view_product'}
[{'topic': 'events-topic', 'data': {'user_id': 1, 'event': 'view_product'}}]


In [5]:
# Import helper to write file
import os, textwrap

# Define helper to write file (if not defined)
def write_file(path, content):
    with open(path, "w", encoding="utf-8") as f:
        f.write(textwrap.dedent(content).lstrip())

# Set base directory
BASE_DIR = "gcp_colab_sim"

# Write DataflowJobSim class file
write_file(
    os.path.join(BASE_DIR, "src", "dataflow_sim.py"),
    """
class DataflowJobSim:
    def __init__(self, name: str):
        # Store job name
        self.name = name

    def run(self, input_messages):
        # Create list for transformed rows
        transformed = []
        # Loop over input messages
        for m in input_messages:
            # Copy data from message
            row = dict(m["data"])
            # Mark row as processed
            row["processed"] = True
            # Append row to transformed list
            transformed.append(row)
        # Print number of processed rows
        print(f"[DataflowSim] Process {len(transformed)} elements")
        # Return transformed rows
        return transformed
    """
)


In [6]:
# Import sys to update path
import sys, os

# Add base directory to Python path
BASE_DIR = "gcp_colab_sim"
sys.path.append(BASE_DIR)

# Import Pub/Sub and Dataflow sims
from src.pubsub_mock import PubSubClientMock
from src.dataflow_sim import DataflowJobSim

# Create Pub/Sub client
client = PubSubClientMock()

# Publish two sim events
client.publish("events-topic", {"user_id": 1, "event": "view_product"})
client.publish("events-topic", {"user_id": 2, "event": "purchase"})

# Create Dataflow job sim
job = DataflowJobSim("etl-events")

# Run job on Pub/Sub messages
rows = job.run(client.messages)

# Print transformed rows
print(rows)


[PubSubMock] Publish to events-topic: {'user_id': 1, 'event': 'view_product'}
[PubSubMock] Publish to events-topic: {'user_id': 2, 'event': 'purchase'}
[DataflowSim] Process 2 elements
[{'user_id': 1, 'event': 'view_product', 'processed': True}, {'user_id': 2, 'event': 'purchase', 'processed': True}]


In [7]:
# Import modules to write file
import os, textwrap

# Set base directory
BASE_DIR = "gcp_colab_sim"

# Define helper to write file
def write_file(path, content):
    with open(path, "w", encoding="utf-8") as f:
        f.write(textwrap.dedent(content).lstrip())

# Write BigQuerySim class file
write_file(
    os.path.join(BASE_DIR, "src", "bigquery_sim.py"),
    """
class BigQuerySim:
    def __init__(self):
        # Initialize dict of tables
        self.tables = {}

    def insert_rows(self, table: str, rows: list):
        # Ensure table key exist
        self.tables.setdefault(table, [])
        # Extend table with new rows
        self.tables[table].extend(rows)
        # Print number of inserted rows
        print(f"[BigQuerySim] Insert {len(rows)} rows into {table}")
    """
)


In [8]:
# Import sys to update path
import sys, os

# Set base directory
BASE_DIR = "gcp_colab_sim"

# Add base directory to Python path
sys.path.append(BASE_DIR)

# Import sims
from src.pubsub_mock import PubSubClientMock
from src.dataflow_sim import DataflowJobSim
from src.bigquery_sim import BigQuerySim

# Create Pub/Sub client
client = PubSubClientMock()

# Publish two sim events
client.publish("events-topic", {"user_id": 1, "event": "view_product"})
client.publish("events-topic", {"user_id": 2, "event": "purchase"})

# Create Dataflow job sim
job = DataflowJobSim("etl-events")

# Run job on Pub/Sub messages
rows = job.run(client.messages)

# Create BigQuery sim client
bq = BigQuerySim()

# Insert rows into one table
bq.insert_rows("analytics.events", rows)

# Print internal tables dict
print(bq.tables)


[PubSubMock] Publish to events-topic: {'user_id': 1, 'event': 'view_product'}
[PubSubMock] Publish to events-topic: {'user_id': 2, 'event': 'purchase'}
[DataflowSim] Process 2 elements
[BigQuerySim] Insert 2 rows into analytics.events
{'analytics.events': [{'user_id': 1, 'event': 'view_product', 'processed': True}, {'user_id': 2, 'event': 'purchase', 'processed': True}]}


In [9]:
# Import modules to write file
import os, textwrap

# Set base directory
BASE_DIR = "gcp_colab_sim"

# Create VertexAISim file with basic ML training
with open(os.path.join(BASE_DIR, "src", "vertex_ai_sim.py"), "w", encoding="utf-8") as f:
    f.write(textwrap.dedent("""
from sklearn.linear_model import LogisticRegression

class VertexAISim:
    def __init__(self):
        # Initialize dict of models
        self.models = {}

    def train_model(self, model_name: str, X, y):
        # Create logistic regression model
        model = LogisticRegression()
        # Fit model on data
        model.fit(X, y)
        # Store model in dict
        self.models[model_name] = model
        # Print info
        print(f"[VertexAISim] Train {model_name}")
        # Return model name
        return model_name

    def predict(self, model_name: str, x_row):
        # Get model from dict
        model = self.models[model_name]
        # Compute probability for class 1
        prob = model.predict_proba([x_row])[0][1]
        # Print info
        print(f"[VertexAISim] Predict with {model_name} on {x_row}")
        # Return sim prediction
        return {"score": float(prob)}
""").lstrip())


In [10]:
# Reload VertexAISim module after editing file

# Import importlib
import importlib

# Import module object
import src.vertex_ai_sim

# Reload module to use new version
importlib.reload(src.vertex_ai_sim)

# Import VertexAISim class from reloaded module
from src.vertex_ai_sim import VertexAISim


In [11]:
# Import numpy for arrays
import numpy as np

# Import VertexAISim
from src.vertex_ai_sim import VertexAISim

# Create small training set (price -> high_value flag)
X = np.array([[10.0], [20.0], [100.0], [200.0]])
y = np.array([0, 0, 1, 1])

# Create Vertex AI sim client
vertex = VertexAISim()

# Train sim model
model_name = vertex.train_model("price_high_value_model", X, y)

# Predict for one new price
prediction = vertex.predict(model_name, [50.0])

# Print model name and prediction
print("Model:", model_name)
print("Prediction:", prediction)


[VertexAISim] Train price_high_value_model
[VertexAISim] Predict with price_high_value_model on [50.0]
Model: price_high_value_model
Prediction: {'score': 0.15735576245764596}


In [12]:
# Import sims for Pub/Sub, Dataflow and BigQuery
from src.pubsub_mock import PubSubClientMock
from src.dataflow_sim import DataflowJobSim
from src.bigquery_sim import BigQuerySim

# Create components
client = PubSubClientMock()
job = DataflowJobSim("etl-events")
bq = BigQuerySim()

# Publish events with price
client.publish("events-topic", {"user_id": 1, "event": "view_product", "price": 10.0})
client.publish("events-topic", {"user_id": 2, "event": "purchase", "price": 30.0})
client.publish("events-topic", {"user_id": 3, "event": "purchase", "price": 80.0})
client.publish("events-topic", {"user_id": 4, "event": "purchase", "price": 150.0})

# Run Dataflow sim and load into BigQuery sim
rows = job.run(client.messages)
bq.insert_rows("analytics.events", rows)


[PubSubMock] Publish to events-topic: {'user_id': 1, 'event': 'view_product', 'price': 10.0}
[PubSubMock] Publish to events-topic: {'user_id': 2, 'event': 'purchase', 'price': 30.0}
[PubSubMock] Publish to events-topic: {'user_id': 3, 'event': 'purchase', 'price': 80.0}
[PubSubMock] Publish to events-topic: {'user_id': 4, 'event': 'purchase', 'price': 150.0}
[DataflowSim] Process 4 elements
[BigQuerySim] Insert 4 rows into analytics.events


In [13]:
# Import numpy and reload VertexAISim module
import numpy as np
import importlib, src.vertex_ai_sim
importlib.reload(src.vertex_ai_sim)
from src.vertex_ai_sim import VertexAISim

# Build X and y from BigQuery sim table
table = bq.tables["analytics.events"]
X = np.array([[row["price"]] for row in table])
y = np.array([1 if row["price"] >= 50.0 else 0 for row in table])

# Train model using sim Vertex AI
vertex = VertexAISim()
model_name = vertex.train_model("price_high_value_model", X, y)

# Predict for one new price
prediction = vertex.predict(model_name, [40.0])
print("Model:", model_name)
print("Prediction:", prediction)


[VertexAISim] Train price_high_value_model
[VertexAISim] Predict with price_high_value_model on [40.0]
Model: price_high_value_model
Prediction: {'score': 0.03652943301539391}


In [14]:
# Import modules to write file
import os, textwrap

# Set base directory
BASE_DIR = "gcp_colab_sim"

# Create CloudRunSim file
with open(os.path.join(BASE_DIR, "src", "cloud_run_sim.py"), "w", encoding="utf-8") as f:
    f.write(textwrap.dedent("""
class CloudRunSim:
    def __init__(self, vertex_client, model_name: str):
        # Store Vertex AI sim client
        self.vertex_client = vertex_client
        # Store model name
        self.model_name = model_name

    def predict_request(self, payload: dict):
        # Print incoming payload
        print(f"[CloudRunSim] Receive request: {payload}")
        # Extract features from payload
        x_row = [payload["price"]]
        # Call Vertex AI sim client
        result = self.vertex_client.predict(self.model_name, x_row)
        # Build response dict
        response = {
            "input": payload,
            "prediction": result,
        }
        # Print response
        print(f"[CloudRunSim] Send response: {response}")
        # Return response
        return response
""").lstrip())


In [15]:
# Import CloudRunSim
from src.cloud_run_sim import CloudRunSim

# Create Cloud Run sim instance using existing vertex client and model_name
cloud_run = CloudRunSim(vertex_client=vertex, model_name=model_name)

# Build one example request payload
request_payload = {"user_id": 999, "event": "purchase", "price": 120.0}

# Call predict_request
api_response = cloud_run.predict_request(request_payload)

# Print final API response
print(api_response)


[CloudRunSim] Receive request: {'user_id': 999, 'event': 'purchase', 'price': 120.0}
[VertexAISim] Predict with price_high_value_model on [120.0]
[CloudRunSim] Send response: {'input': {'user_id': 999, 'event': 'purchase', 'price': 120.0}, 'prediction': {'score': 0.9999992824655318}}
{'input': {'user_id': 999, 'event': 'purchase', 'price': 120.0}, 'prediction': {'score': 0.9999992824655318}}


In [16]:
# Import modules to write file
import os, textwrap

# Set base directory
BASE_DIR = "gcp_colab_sim"

# Overwrite CloudFunctionSim file with better logic
with open(os.path.join(BASE_DIR, "src", "cloud_function_sim.py"), "w", encoding="utf-8") as f:
    f.write(textwrap.dedent("""
class CloudFunctionSim:
    def __init__(self, downstream_callable):
        # Store downstream callable (for example Cloud Run sim)
        self.downstream_callable = downstream_callable

    def handle_request(self, request: dict):
        # Print incoming request
        print(f"[CloudFunctionSim] Receive request: {request}")

        # Validate that price key exist
        if "price" not in request:
            print("[CloudFunctionSim] Missing price")
            return {"error": "missing_price"}

        # Validate that price be positive
        if request["price"] <= 0:
            print("[CloudFunctionSim] Invalid price")
            return {"error": "invalid_price"}

        # Create processed copy of request
        processed = dict(request)

        # Add high_value flag
        processed["high_value"] = processed["price"] >= 50.0

        # Add simple price_bucket label
        if processed["price"] < 50.0:
            processed["price_bucket"] = "low"
        elif processed["price"] < 100.0:
            processed["price_bucket"] = "medium"
        else:
            processed["price_bucket"] = "high"

        # Print processed request
        print(f"[CloudFunctionSim] Process request: {processed}")

        # Call downstream callable
        response = self.downstream_callable(processed)

        # Return response
        return response
""").lstrip())


In [17]:
# Import CloudFunctionSim
from src.cloud_function_sim import CloudFunctionSim

# Create Cloud Function sim that call Cloud Run sim
cloud_function = CloudFunctionSim(downstream_callable=cloud_run.predict_request)

# Build example request payload
cf_request = {"user_id": 555, "event": "purchase", "price": 70.0}

# Call handle_request
cf_response = cloud_function.handle_request(cf_request)

# Print final Cloud Function response
print(cf_response)


[CloudFunctionSim] Receive request: {'user_id': 555, 'event': 'purchase', 'price': 70.0}
[CloudFunctionSim] Process request: {'user_id': 555, 'event': 'purchase', 'price': 70.0, 'high_value': True, 'price_bucket': 'medium'}
[CloudRunSim] Receive request: {'user_id': 555, 'event': 'purchase', 'price': 70.0, 'high_value': True, 'price_bucket': 'medium'}
[VertexAISim] Predict with price_high_value_model on [70.0]
[CloudRunSim] Send response: {'input': {'user_id': 555, 'event': 'purchase', 'price': 70.0, 'high_value': True, 'price_bucket': 'medium'}, 'prediction': {'score': 0.9630317444057592}}
{'input': {'user_id': 555, 'event': 'purchase', 'price': 70.0, 'high_value': True, 'price_bucket': 'medium'}, 'prediction': {'score': 0.9630317444057592}}


In [18]:
# Import modules to write file
import os, textwrap

# Set base directory
BASE_DIR = "gcp_colab_sim"

# Create CloudStorageSim file
with open(os.path.join(BASE_DIR, "src", "cloud_storage_sim.py"), "w", encoding="utf-8") as f:
    f.write(textwrap.dedent("""
import json

class CloudStorageSim:
    def __init__(self):
        # Initialize dict of buckets
        self.buckets = {}

    def upload_json(self, bucket: str, path: str, obj: dict):
        # Ensure bucket key exist
        self.buckets.setdefault(bucket, {})
        # Convert dict to json string
        data = json.dumps(obj)
        # Store data under path
        self.buckets[bucket][path] = data
        # Print info
        print(f"[CloudStorageSim] Upload {path} to {bucket}")

    def read_json(self, bucket: str, path: str):
        # Get json string from storage
        data = self.buckets[bucket][path]
        # Convert json string back to dict
        return json.loads(data)
""").lstrip())


In [19]:
# Import CloudStorageSim
from src.cloud_storage_sim import CloudStorageSim

# Create Cloud Storage sim client
gcs = CloudStorageSim()

# Build simple aggregate from BigQuery sim table
table = bq.tables["analytics.events"]
total_revenue = sum(row["price"] for row in table)
num_events = len(table)

# Create summary dict
summary_obj = {"num_events": num_events, "total_revenue": total_revenue}

# Upload summary to one bucket and path
gcs.upload_json("raw-analytics-bucket", "daily/summary.json", summary_obj)

# Read it back and print
loaded = gcs.read_json("raw-analytics-bucket", "daily/summary.json")
print(loaded)


[CloudStorageSim] Upload daily/summary.json to raw-analytics-bucket
{'num_events': 4, 'total_revenue': 270.0}


In [20]:
# Import modules to write file
import os, textwrap

# Set base directory
BASE_DIR = "gcp_colab_sim"

# Create DataprocSim file
with open(os.path.join(BASE_DIR, "src", "dataproc_sim.py"), "w", encoding="utf-8") as f:
    f.write(textwrap.dedent("""
class DataprocSim:
    def __init__(self, job_name: str):
        # Store job name
        self.job_name = job_name

    def run(self, summary_obj: dict):
        # Print received summary
        print(f"[DataprocSim] Run {self.job_name} on {summary_obj}")
        # Compute average revenue
        avg_revenue = summary_obj["total_revenue"] / max(summary_obj["num_events"], 1)
        # Build result dict
        result = {
            "num_events": summary_obj["num_events"],
            "total_revenue": summary_obj["total_revenue"],
            "avg_revenue": avg_revenue,
        }
        # Print result
        print(f"[DataprocSim] Result {result}")
        # Return result
        return result
""").lstrip())


In [21]:
# Import DataprocSim
from src.dataproc_sim import DataprocSim

# Read summary json from Cloud Storage sim
summary_obj = gcs.read_json("raw-analytics-bucket", "daily/summary.json")

# Create Dataproc sim job
dp_job = DataprocSim("daily-analytics-job")

# Run job on summary
dp_result = dp_job.run(summary_obj)

# Insert result into new BigQuery sim table
bq.insert_rows("analytics.daily_summary_dp", [dp_result])

# Print new table content
print(bq.tables["analytics.daily_summary_dp"])


[DataprocSim] Run daily-analytics-job on {'num_events': 4, 'total_revenue': 270.0}
[DataprocSim] Result {'num_events': 4, 'total_revenue': 270.0, 'avg_revenue': 67.5}
[BigQuerySim] Insert 1 rows into analytics.daily_summary_dp
[{'num_events': 4, 'total_revenue': 270.0, 'avg_revenue': 67.5}]


In [22]:
# Import modules to write file
import os, textwrap

# Set base directory
BASE_DIR = "gcp_colab_sim"

# Create CloudSchedulerSim file
with open(os.path.join(BASE_DIR, "src", "cloud_scheduler_sim.py"), "w", encoding="utf-8") as f:
    f.write(textwrap.dedent("""
class CloudSchedulerSim:
    def __init__(self, job_name: str, job_callable):
        # Store job name
        self.job_name = job_name
        # Store callable to run
        self.job_callable = job_callable

    def run_now(self):
        # Print trigger message
        print(f"[CloudSchedulerSim] Trigger job {self.job_name}")
        # Call job callable
        result = self.job_callable()
        # Return result
        return result
""").lstrip())


In [23]:
# Import CloudSchedulerSim
from src.cloud_scheduler_sim import CloudSchedulerSim

# Define scheduled job that call Cloud Function sim and Dataproc sim
def daily_pipeline_job():
    # Call Cloud Function sim for one request
    cf_request = {"user_id": 777, "event": "purchase", "price": 90.0}
    cf_response = cloud_function.handle_request(cf_request)

    # Read summary from Cloud Storage sim
    summary_obj = gcs.read_json("raw-analytics-bucket", "daily/summary.json")

    # Run Dataproc sim on summary
    dp_result = dp_job.run(summary_obj)

    # Insert Dataproc result into BigQuery sim
    bq.insert_rows("analytics.daily_summary_dp", [dp_result])

    # Build combined result
    return {"cloud_function": cf_response, "dataproc": dp_result}

# Create Cloud Scheduler sim
scheduler = CloudSchedulerSim("daily-pipeline-job", daily_pipeline_job)

# Trigger job now
scheduler_result = scheduler.run_now()

# Print final result
print(scheduler_result)


[CloudSchedulerSim] Trigger job daily-pipeline-job
[CloudFunctionSim] Receive request: {'user_id': 777, 'event': 'purchase', 'price': 90.0}
[CloudFunctionSim] Process request: {'user_id': 777, 'event': 'purchase', 'price': 90.0, 'high_value': True, 'price_bucket': 'medium'}
[CloudRunSim] Receive request: {'user_id': 777, 'event': 'purchase', 'price': 90.0, 'high_value': True, 'price_bucket': 'medium'}
[VertexAISim] Predict with price_high_value_model on [90.0]
[CloudRunSim] Send response: {'input': {'user_id': 777, 'event': 'purchase', 'price': 90.0, 'high_value': True, 'price_bucket': 'medium'}, 'prediction': {'score': 0.9995072391577136}}
[DataprocSim] Run daily-analytics-job on {'num_events': 4, 'total_revenue': 270.0}
[DataprocSim] Result {'num_events': 4, 'total_revenue': 270.0, 'avg_revenue': 67.5}
[BigQuerySim] Insert 1 rows into analytics.daily_summary_dp
{'cloud_function': {'input': {'user_id': 777, 'event': 'purchase', 'price': 90.0, 'high_value': True, 'price_bucket': 'mediu

In [24]:
# Import modules to write file
import os, textwrap

# Set base directory
BASE_DIR = "gcp_colab_sim"

# Create Composer sim file
with open(os.path.join(BASE_DIR, "src", "composer_sim.py"), "w", encoding="utf-8") as f:
    f.write(textwrap.dedent("""
class TaskSim:
    def __init__(self, name, func):
        # Store task name
        self.name = name
        # Store callable function
        self.func = func

    def run(self, context):
        # Print start message
        print(f"[TaskSim] Run {self.name}")
        # Execute function with context
        result = self.func(context)
        # Return result
        return result

class ComposerDAGSim:
    def __init__(self, name):
        # Store dag name
        self.name = name
        # Initialize list of tasks
        self.tasks = []

    def add_task(self, task: TaskSim):
        # Append task to list
        self.tasks.append(task)

    def run(self):
        # Initialize shared context dict
        context = {}
        # Loop over tasks in order
        for task in self.tasks:
            # Run task and store result
            context[task.name] = task.run(context)
        # Return full context
        return context
""").lstrip())


In [25]:
# Import sys and numpy
import sys, os, numpy as np

# Set base directory and add to path
BASE_DIR = "gcp_colab_sim"
sys.path.append(BASE_DIR)

# Import sims
from src.composer_sim import ComposerDAGSim, TaskSim
from src.pubsub_mock import PubSubClientMock
from src.dataflow_sim import DataflowJobSim
from src.bigquery_sim import BigQuerySim
from src.vertex_ai_sim import VertexAISim
from src.cloud_run_sim import CloudRunSim
from src.cloud_function_sim import CloudFunctionSim
from src.cloud_storage_sim import CloudStorageSim
from src.dataproc_sim import DataprocSim

# Define ingest task (Pub/Sub + Dataflow + BigQuery)
def task_ingest(context):
    # Create components
    client = PubSubClientMock()
    job = DataflowJobSim("etl-events-dag")
    bq_local = BigQuerySim()
    # Publish events
    client.publish("events-topic", {"user_id": 1, "event": "view_product", "price": 10.0})
    client.publish("events-topic", {"user_id": 2, "event": "purchase", "price": 60.0})
    client.publish("events-topic", {"user_id": 3, "event": "purchase", "price": 120.0})
    # Run Dataflow sim
    rows = job.run(client.messages)
    # Insert into BigQuery sim
    bq_local.insert_rows("analytics.events", rows)
    # Return objects
    return {"bq": bq_local}

# Define train task (Vertex AI from BigQuery)
def task_train(context):
    # Get BigQuery sim from ingest
    bq_local = context["ingest"]["bq"]
    table = bq_local.tables["analytics.events"]
    # Build X and y
    X = np.array([[row["price"]] for row in table])
    y = np.array([1 if row["price"] >= 50.0 else 0 for row in table])
    # Train model
    vertex_local = VertexAISim()
    model_name_local = vertex_local.train_model("price_high_value_model_dag", X, y)
    # Return model info
    return {"vertex": vertex_local, "model_name": model_name_local, "bq": bq_local}

# Define batch analytics task (Cloud Storage + Dataproc)
def task_batch(context):
    # Get BigQuery sim from train task
    bq_local = context["train"]["bq"]
    table = bq_local.tables["analytics.events"]
    # Build summary
    total_revenue = sum(row["price"] for row in table)
    num_events = len(table)
    summary_obj = {"num_events": num_events, "total_revenue": total_revenue}
    # Use Cloud Storage sim
    gcs_local = CloudStorageSim()
    gcs_local.upload_json("raw-analytics-bucket-dag", "daily/summary.json", summary_obj)
    # Run Dataproc sim
    dp_job_local = DataprocSim("daily-analytics-job-dag")
    dp_result_local = dp_job_local.run(summary_obj)
    # Insert into BigQuery sim
    bq_local.insert_rows("analytics.daily_summary_dp_dag", [dp_result_local])
    # Return results
    return {"gcs": gcs_local, "dataproc_result": dp_result_local}

# Define predict task (Cloud Function + Cloud Run + Vertex)
def task_predict(context):
    # Get Vertex AI objects
    vertex_local = context["train"]["vertex"]
    model_name_local = context["train"]["model_name"]
    # Create Cloud Run sim
    cloud_run_local = CloudRunSim(vertex_client=vertex_local, model_name=model_name_local)
    # Create Cloud Function sim
    cloud_function_local = CloudFunctionSim(downstream_callable=cloud_run_local.predict_request)
    # Build request
    cf_request = {"user_id": 999, "event": "purchase", "price": 90.0}
    # Call Cloud Function sim
    response = cloud_function_local.handle_request(cf_request)
    # Return response
    return {"api_response": response}


In [26]:
# Create Composer DAG sim
dag = ComposerDAGSim("full_gcp_sim_dag")

# Add tasks in order
dag.add_task(TaskSim("ingest", task_ingest))
dag.add_task(TaskSim("train", task_train))
dag.add_task(TaskSim("batch", task_batch))
dag.add_task(TaskSim("predict", task_predict))

# Run whole DAG
dag_context = dag.run()

# Print final context keys
print("DAG context keys:", dag_context.keys())
print("Predict task output:", dag_context["predict"])


[TaskSim] Run ingest
[PubSubMock] Publish to events-topic: {'user_id': 1, 'event': 'view_product', 'price': 10.0}
[PubSubMock] Publish to events-topic: {'user_id': 2, 'event': 'purchase', 'price': 60.0}
[PubSubMock] Publish to events-topic: {'user_id': 3, 'event': 'purchase', 'price': 120.0}
[DataflowSim] Process 3 elements
[BigQuerySim] Insert 3 rows into analytics.events
[TaskSim] Run train
[VertexAISim] Train price_high_value_model_dag
[TaskSim] Run batch
[CloudStorageSim] Upload daily/summary.json to raw-analytics-bucket-dag
[DataprocSim] Run daily-analytics-job-dag on {'num_events': 3, 'total_revenue': 190.0}
[DataprocSim] Result {'num_events': 3, 'total_revenue': 190.0, 'avg_revenue': 63.333333333333336}
[BigQuerySim] Insert 1 rows into analytics.daily_summary_dp_dag
[TaskSim] Run predict
[CloudFunctionSim] Receive request: {'user_id': 999, 'event': 'purchase', 'price': 90.0}
[CloudFunctionSim] Process request: {'user_id': 999, 'event': 'purchase', 'price': 90.0, 'high_value': Tr