<a href="https://colab.research.google.com/github/tyflowio01/tyflowio01/blob/main/gtfs_scratch_hard.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# GTFS Data and IoT Integration Solution

## Introduction

This notebook demonstrates the integration of GTFS (General Transit Feed Specification) data and IoT (Internet of Things) data using Google Cloud Platform (GCP) services. The solution processes GTFS data and real-time data from IoT devices, stores it in BigQuery, and visualizes it using a Streamlit app.

## Function of this Notebook

1. **GCP Authentication and Project Selection**: Authenticate and select the GCP project to use for this solution.
2. **Install Required Libraries**: Install necessary Python libraries for working with GCP services and Streamlit.
3. **Enable Required APIs**: Enable the required Google Cloud APIs for this solution.
4. **Select GCS Bucket**: Select a Google Cloud Storage bucket to store the GTFS data.
5. **Download GTFS Data**: Download GTFS data from a specified source or upload a local file.
6. **Create Pub/Sub Topic**: Create a Pub/Sub topic for IoT data.
7. **Load GTFS Schema into BigQuery**: Infer the schema from the downloaded GTFS files and create corresponding tables in BigQuery.
8. **Install and Set Up Streamlit**: Install Streamlit and set up a Streamlit app for data visualization.
9. **Dataflow Pipeline**: Configure and run a Dataflow pipeline to process data from Pub/Sub and write it to BigQuery.
10. **Visualization**: Use the Streamlit app to visualize the real-time data stored in BigQuery.


## Block Diagram of the GTFS Data and IoT Integration Solution

```plaintext
 +-------------+      +-----------+      +-------------+
 |             |      |           |      |             |
 |  GTFS Data  |      |  IoT      |      |  Storage    |
 |  Source     |----->|  Device   |----->|  Bucket     |
 |             |      |  (Moxa)   |      |             |
 +-------------+      +-----------+      +-------------+
        |                  |                   |
        v                  v                   |
 +-------------+      +-----------+            |
 |             |      |           |            |
 |  Simulate   |      |  MQTT     |            |
 |  Data       |      |  Broker   |            |
 +-------------+      +-----------+            |
        |                  |                   |
        v                  v                   v
 +--------------------------------------------+
 |                                            |
 |                Pub/Sub Topic               |
 |                (iot-data-topic)            |
 +--------------------------------------------+
                            |
                            v
                     +-------------+
                     |             |
                     |  Dataflow   |
                     |  Pipeline   |
                     +-------------+
                            |
                            v
                     +-------------+
                     |             |
                     |  BigQuery   |
                     |  Table      |
                     |  (gtfs_data)|
                     +-------------+
                            |
                            v
                     +-------------+
                     |             |
                     |  Streamlit  |
                     |  App        |
                     +-------------+
                            |
                            v
                    Visualization


### Summary of Flow
- **GTFS Data Source**: Obtain GTFS data.
- **Upload to GCS Bucket**: Store the GTFS data in a Google Cloud Storage bucket.
- **Pub/Sub Topic (iot-data)**: Publish the data from IoT devices to this topic.
- **Dataflow Pipeline (gtfs-dataflow)**: Process the data from Pub/Sub and write it to BigQuery.
- **BigQuery Table (gtfs_data)**: Store the processed data in a BigQuery table.
- **Streamlit App**: Visualize the data.
- **Visualization**: Display the real-time data.


In [None]:
# Cell 0: GCP Authentication and Project Selection
from google.colab import auth
from googleapiclient.discovery import build

# Authenticate the user
auth.authenticate_user()

# Get the list of projects
cloud_resource_manager = build('cloudresourcemanager', 'v1')
projects = cloud_resource_manager.projects().list().execute()

# Display the list of projects
print("Select a project from the list below:")
project_list = [project['projectId'] for project in projects.get('projects', [])]
for idx, project in enumerate(project_list):
    print(f"{idx + 1}. {project}")

# Prompt user to select a project
project_index = int(input("Enter the number of the project you want to use: ")) - 1
project_id = project_list[project_index]
print(f"Selected project: {project_id}")


In [None]:
# Cell 1: Install Required Libraries
!pip install apache-beam[gcp] streamlit pyngrok google-cloud-storage google-cloud-pubsub google-cloud-bigquery


In [None]:
# Cell 2: GCP Authentication and Project Selection
from google.colab import auth
from googleapiclient.discovery import build

# Authenticate the user
auth.authenticate_user()

# Get the list of projects
cloud_resource_manager is build('cloudresourcemanager', 'v1')
projects = cloud_resource_manager.projects().list().execute()

# Display the list of projects
print("Select a project from the list below:")
project_list = [project['projectId'] for project in projects.get('projects', [])]
for idx, project in enumerate(project_list):
    print(f"{idx + 1}. {project}")

# Prompt user to select a project
project_index = int(input("Enter the number of the project you want to use: ")) - 1
project_id = project_list[project_index]
print(f"Selected project: {project_id}")


In [None]:
# Cell 3: Enable Required APIs
!gcloud config set project {project_id}
!gcloud services enable storage.googleapis.com pubsub.googleapis.com cloudfunctions.googleapis.com bigquery.googleapis.com run.googleapis.com dataflow.googleapis.com


In [None]:
# Cell 4: Select GCS Bucket
from google.cloud import storage

# Initialize GCS client with the selected project
storage_client = storage.Client(project=project_id)

# List available buckets
buckets = list(storage_client.list_buckets())
print("Select a bucket from the list below:")
bucket_list = [bucket.name for bucket in buckets]
for idx, bucket_name in enumerate(bucket_list):
    print(f"{idx + 1}. {bucket_name}")

# Prompt user to select a bucket
bucket_index = int(input("Enter the number of the bucket you want to use: ")) - 1
bucket_name = bucket_list[bucket_index]
print(f"Selected bucket: {bucket_name}")


In [None]:
# Cell 5: Download GTFS Data
import requests
from google.cloud import storage
from google.colab import files

# Define GTFS data source URLs for San Francisco Bay Area
gtfs_sources = [
    "https://transitfeeds.com/p/sfmta/60/latest/download",  # BART
    "http://transitfeeds.com/p/ac-transit/406/latest/download",  # AC Transit
    "https://data.trilliumtransit.com/gtfs/marin-transit-ca-us/marin-transit-ca-us.zip"  # Marin Transit
]

# Provide options to the user
print("Select a GTFS data source:")
print("1. BART (San Francisco Bay Area Rapid Transit)")
print("2. AC Transit")
print("3. Marin Transit")
print("4. Upload from local file")
source_option = int(input("Enter the number of your choice: "))

# Download GTFS data from the selected source
if source_option in [1, 2, 3]:
    gtfs_url = gtfs_sources[source_option - 1]
    response = requests.get(gtfs_url)
    gtfs_file_path = 'gtfs_data.zip'
    with open(gtfs_file_path, 'wb') as f:
        f.write(response.content)
elif source_option == 4:
    print("Please upload your GTFS zip file.")
    uploaded = files.upload()
    gtfs_file_path = list(uploaded.keys())[0]
else:
    print("Invalid selection. Please run the cell again and select a valid option.")
    gtfs_file_path = None

if gtfs_file_path:
    # Upload GTFS data to GCS bucket
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob('gtfs_data.zip')
    blob.upload_from_filename(gtfs_file_path)
    print(f"Uploaded GTFS data to: gs://{bucket_name}/gtfs_data.zip")
else:
    print("No GTFS data uploaded.")


In [None]:
# Cell 6: Create Pub/Sub Topic
import subprocess

# Define the Pub/Sub topic name
topic_name = 'iot-data-topic'

# Create the Pub/Sub topic
subprocess.run([
    'gcloud', 'pubsub', 'topics', 'create', topic_name,
    '--project', project_id
])
print(f"Created Pub/Sub topic: {topic_name}")


In [None]:
# Cell 7: Load GTFS Schema into BigQuery from Downloaded Files
import zipfile
import os
import pandas as pd
from google.cloud import bigquery

# Function to infer schema from GTFS files
def infer_schema_from_gtfs(gtfs_file_path):
    schemas = {}

    with zipfile.ZipFile(gtfs_file_path, 'r') as zip_ref:
        zip_ref.extractall('gtfs_data')

    for filename in os.listdir('gtfs_data'):
        if filename.endswith('.txt'):
            file_path = os.path.join('gtfs_data', filename)
            try:
                df = pd.read_csv(file_path)
                schema = []
                for column in df.columns:
                    dtype = df[column].dtype
                    if dtype == 'int64':
                        field_type = 'INTEGER'
                    elif dtype == 'float64':
                        field_type = 'FLOAT'
                    elif dtype == 'bool':
                        field_type = 'BOOLEAN'
                    elif 'datetime' in str(dtype):
                        field_type = 'TIMESTAMP'
                    else:
                        field_type = 'STRING'
                    schema.append(bigquery.SchemaField(column, field_type))
                schemas[filename] = schema
            except Exception as e:
                print(f"Error reading {filename}: {e}")
    return schemas

# Initialize BigQuery client
bigquery_client = bigquery.Client(project=project_id)

# Define the dataset name
dataset_id = f"{project_id}.gtfs_dataset"

# Create the dataset if it does not exist
try:
    bigquery_client.get_dataset(dataset_id)  # Make an API request.
    print(f"Dataset {dataset_id} already exists.")
except Exception:
    dataset = bigquery.Dataset(dataset_id)
    dataset.location = "US"
    bigquery_client.create_dataset(dataset, timeout=30)  # Make an API request.
    print(f"Created dataset {dataset_id}.")

# Infer schema from GTFS files
schemas = infer_schema_from_gtfs(gtfs_file_path)

# Create tables in BigQuery based on inferred schema
for filename, schema in schemas.items():
    table_name = filename.replace('.txt', '')
    table_id = f"{dataset_id}.{table_name}"

    try:
        bigquery_client.get_table(table_id)  # Make an API request.
        print(f"Table {table_id} already exists.")
    except Exception:
        table = bigquery.Table(table_id, schema=schema)
        bigquery_client.create_table(table)  # Make an API request.
        print(f"Created table {table_id} with schema inferred from {filename}.")

print("Schema inference and table creation completed.")


In [None]:
# Cell 8: Create BigQuery Dataset
from google.cloud import bigquery

# Initialize BigQuery client
bigquery_client is bigquery.Client(project=project_id)

# Define the dataset name
dataset_id = f"{project_id}.gtfs_dataset"

# Create the dataset if it does not exist
try:
    bigquery_client.get_dataset(dataset_id)  # Make an API request.
    print(f"Dataset {dataset_id} already exists.")
except Exception:
    dataset = bigquery.Dataset(dataset_id)
    dataset.location = "US"
    bigquery_client.create_dataset(dataset, timeout=30)  # Make an API request.
    print(f"Created dataset {dataset_id}.")


In [None]:
# Cell 9: Install Streamlit and Set Up for Colab
!pip install streamlit pyngrok

import streamlit as st
import pandas as pd
from google.cloud import bigquery
from pyngrok import ngrok
import subprocess
import time

# Prompt the user for their ngrok authtoken
ngrok_authtoken = input("Please enter your ngrok authtoken: ")
ngrok.set_auth_token(ngrok_authtoken)

# Define the Streamlit app content
streamlit_app_code = f"""
import streamlit as st
import pandas as pd
from google.cloud import bigquery

def run_streamlit():
    st.title('GTFS Data Visualization')

    client = bigquery.Client(project='{project_id}')
    query = \"""
    SELECT id, timestamp, latitude, longitude
    FROM `{project_id}.gtfs_dataset.gtfs_data`
    ORDER BY timestamp DESC
    LIMIT 100
    \"""
    query_job = client.query(query)
    results = query_job.result()
    df = results.to_dataframe()

    st.write("Latest GTFS Data:")
    st.dataframe(df)

if __name__ == '__main__':
    run_streamlit()
"""

# Write the Streamlit app code to a file
with open('streamlit_app.py', 'w') as f:
    f.write(streamlit_app_code)

# Run the Streamlit app in the background
subprocess.Popen(["streamlit", "run", "streamlit_app.py"])

# Wait a few seconds for the Streamlit app to start
time.sleep(5)

# Start ngrok tunnel
public_url = ngrok.connect(addr='8501', proto='http')
print(f"Streamlit public URL: {public_url}")

# Print the public URL
print(f"Streamlit app is running at: {public_url}")


In [None]:
# Cell 10: Dataflow Pipeline
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.io.gcp.pubsub import ReadFromPubSub
import json

# Define the pipeline options
options = PipelineOptions(
    streaming=True,
    runner='DataflowRunner',
    project=project_id,
    region='us-central1',
    staging_location=f'gs://{bucket_name}/dataflow/staging',
    temp_location=f'gs://{bucket_name}/dataflow/temp',
    job_name='gtfs-iot-dataflow-pipeline'
)

# Define the pipeline
def run():
    with beam.Pipeline(options=options) as p:
        (p
         | 'ReadFromPubSub' >> ReadFromPubSub(topic=f'projects/{project_id}/topics/iot-data-topic')
         | 'ParseJson' >> beam.Map(lambda msg: json.loads(msg.decode('utf-8')))
         | 'WriteToBigQuery' >> WriteToBigQuery(
             table=f'{project_id}:gtfs_dataset.gtfs_data',
             schema='id:STRING, timestamp:TIMESTAMP, sensor_value:FLOAT',
             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
             write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
         )
        )

# Start the pipeline
import threading

def start_pipeline():
    run()

# Run the pipeline in a separate thread
pipeline_thread = threading.Thread(target=start_pipeline)
pipeline_thread.start()

print("Dataflow pipeline has been started in the background.")


## Optional

In [None]:
# Cell 10-iot: Integrate IoT Device (Optional)
# This cell is optional and should be used to integrate a new IoT device into the Dataflow pipeline.
# It sets up the IoT device to publish data to the Pub/Sub topic.

from google.cloud import pubsub_v1
import json
import time

# Configuration
MOXA_IP = '192.168.1.100'  # IP address of the Moxa ioLogik
MOXA_PORT = 502  # Modbus/TCP port
PROJECT_ID = 'YOUR_PROJECT_ID'
TOPIC_ID = 'iot-data-topic'

# Initialize Pub/Sub publisher
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)

# Function to read data from Moxa device
def read_moxa_data(client):
    # Replace this with the actual Modbus addresses and data format for your Moxa device
    rr = client.read_holding_registers(0, 10, unit=1)
    return rr.registers

# Function to publish messages
def publish_message(data):
    message_data = json.dumps(data).encode('utf-8')
    future = publisher.publish(topic_path, message_data)
    future.result()  # Ensure the message is published

def main():
    # Connect to the Moxa device
    client = ModbusTcpClient(MOXA_IP, port=MOXA_PORT)
    client.connect()

    try:
        while True:
            # Read data from Moxa device
            data = read_moxa_data(client)
            # Publish data to Pub/Sub
            publish_message(data)
            # Wait for a defined interval
            time.sleep(5)
    except KeyboardInterrupt:
        print("Interrupted")
    finally:
        client.close()

if __name__ == '__main__':
    main()


## Data Generator

In [None]:
# Cell 11: Data Generator for Sending Messages through the Pipeline
import json
import time
import random
from google.cloud import pubsub_v1
from google.api_core.exceptions import GoogleAPICallError, RetryError

# Function to generate random GTFS data
def generate_random_gtfs_data():
    data = {
        'id': random.randint(1000, 9999),
        'timestamp': int(time.time()),
        'latitude': round(random.uniform(-90, 90), 6),
        'longitude': round(random.uniform(-180, 180), 6)
    }
    return json.dumps(data)

# Function to publish messages to Pub/Sub
def publish_messages(project_id, topic_name, message_count, interval):
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_name)

    for _ in range(message_count):
        message = generate_random_gtfs_data()
        try:
            future = publisher.publish(topic_path, message.encode('utf-8'))
            future.result()  # Ensure the message is published
            print(f"Published message: {message}")
        except (GoogleAPICallError, RetryError) as e:
            print(f"An error occurred while publishing the message: {e}")
        time.sleep(interval)

# Prompt the user for the project_id
project_id = input("Enter your GCP project ID: ")

# Parameters
topic_name = 'iot-data-topic'  # Ensure this matches the Pub/Sub topic name
message_count = 1000  # Total number of messages to send for testing
interval = 1  # Interval in seconds between messages

# Start publishing messages
publish_messages(project_id, topic_name, message_count, interval)


## Security Analysis

### Introduction

This section provides a security analysis of the GTFS Data and IoT Integration Solution, highlighting key security considerations and best practices to ensure the integrity, confidentiality, and availability of the system.

### Authentication and Authorization

1. **GCP Authentication**: The solution uses Google Cloud's built-in authentication mechanisms to ensure only authorized users can access the GCP resources. Users authenticate via OAuth2, which provides secure access tokens.
   
2. **IAM Roles and Permissions**: Proper Identity and Access Management (IAM) roles are assigned to users and service accounts. This ensures that only authorized entities can perform specific actions on GCP resources such as Pub/Sub, BigQuery, and Cloud Storage. Fine-grained IAM roles should be used to follow the principle of least privilege.

### Data Security

1. **Data Encryption**:
   - **At Rest**: Data stored in Google Cloud Storage and BigQuery is encrypted at rest using Google-managed encryption keys by default.
   - **In Transit**: Data transferred between clients and GCP services is encrypted using TLS (Transport Layer Security) to protect against interception and eavesdropping.

2. **Secure Storage Access**: Access to the Google Cloud Storage bucket is controlled using IAM policies to ensure only authorized users and service accounts can read or write data.

### Network Security

1. **Firewall Rules**: Implement firewall rules to restrict access to the VMs and other GCP resources to only trusted IP addresses. This helps prevent unauthorized access from external networks.

2. **Private Networking**: Utilize VPC (Virtual Private Cloud) to create isolated networks for different parts of the solution. Private networking ensures that internal traffic between GCP services does not traverse the public internet, reducing the attack surface.

### Logging and Monitoring

1. **Audit Logging**: Enable Cloud Audit Logs to track administrative activities and access to GCP resources. Audit logs help in detecting unauthorized access attempts and monitoring the usage of critical resources.

2. **Monitoring and Alerts**: Use Google Cloud Monitoring and Alerts to keep track of the system's health and performance. Set up alerts for unusual activities or performance degradation to respond promptly to potential security incidents.

### Data Integrity

1. **Data Validation**: Implement data validation checks when ingesting data from IoT devices and GTFS sources to ensure the data's integrity and accuracy.

2. **Error Handling**: Incorporate robust error handling mechanisms in the data processing pipeline to gracefully handle and log errors without compromising data integrity.

### Conclusion

By adhering to these security best practices, the GTFS Data and IoT Integration Solution ensures a secure and reliable environment for processing and visualizing transit data. Regular security assessments and updates to the security policies will help maintain the solution's integrity and protect against emerging threats.


## Cost Analysis

This section provides a cost analysis of the GTFS Data and IoT Integration Solution, based on one million iterations or invocations. The analysis includes the cost of using Google Cloud Storage, Pub/Sub, Dataflow, BigQuery, and Streamlit.

### Google Cloud Storage

- **Storage Cost**: \$0.026 per GB per month
- **Data Retrieval Cost**: \$0.01 per GB

Assuming 10 GB of storage and 1 GB of data retrieval:
- Storage: 10 GB * \$0.026 = \$0.26
- Data Retrieval: 1 GB * \$0.01 = \$0.01

Total Cloud Storage Cost: **\$0.27**

### Google Cloud Pub/Sub

- **Message Ingestion**: \$0.40 per million messages
- **Message Delivery**: \$0.25 per million messages

For 1 million messages:
- Ingestion: 1 million * \$0.40 = \$0.40
- Delivery: 1 million * \$0.25 = \$0.25

Total Pub/Sub Cost: **\$0.65**

### Google Cloud Dataflow

- **Dataflow Cost**: \$0.01 per vCPU hour and \$0.01 per GB hour

Assuming a simple job running for 1 hour using 4 vCPUs and processing 10 GB of data:
- vCPU Hours: 4 vCPUs * 1 hour * \$0.01 = \$0.04
- GB Hours: 10 GB * 1 hour * \$0.01 = \$0.10

Total Dataflow Cost: **\$0.14**

### Google Cloud BigQuery

- **Storage Cost**: \$0.02 per GB per month
- **Query Cost**: \$5.00 per TB processed

Assuming 10 GB of storage and 1 TB of query data:
- Storage: 10 GB * \$0.02 = \$0.20
- Query: 1 TB * \$5.00 = \$5.00

Total BigQuery Cost: **\$5.20**

### Streamlit

- **Streamlit Sharing**: Free for limited use

Assuming no additional costs for Streamlit.

### Total Cost Analysis

- **Google Cloud Storage**: \$0.27
- **Google Cloud Pub/Sub**: \$0.65
- **Google Cloud Dataflow**: \$0.14
- **Google Cloud BigQuery**: \$5.20
- **Streamlit**: Free

Total Estimated Cost for One Million Iterations: **\$6.26**

### Sources

- [Google Cloud Storage Pricing](https://cloud.google.com/storage/pricing)
- [Google Cloud Pub/Sub Pricing](https://cloud.google.com/pubsub/pricing)
- [Google Cloud Dataflow Pricing](https://cloud.google.com/dataflow/pricing)
- [Google Cloud BigQuery Pricing](https://cloud.google.com/bigquery/pricing)

This cost analysis is based on estimates and actual costs may vary depending on usage patterns and other factors.


## Sanitization

In [None]:
# Clean up GCP environment script

from google.cloud import storage, bigquery, pubsub_v1
from googleapiclient.discovery import build
import subprocess

def list_buckets():
    storage_client = storage.Client()
    buckets = list(storage_client.list_buckets())
    return buckets

def stop_dataflow_jobs(project_id):
    dataflow = build('dataflow', 'v1b3')
    jobs = dataflow.projects().locations().jobs().list(
        projectId=project_id, location='us-central1').execute()

    if 'jobs' in jobs:
        for job in jobs['jobs']:
            if job['currentState'] == 'JOB_STATE_RUNNING':
                job_id = job['id']
                dataflow.projects().locations().jobs().update(
                    projectId=project_id,
                    location='us-central1',
                    jobId=job_id,
                    body={'requestedState': 'JOB_STATE_CANCELLED'}
                ).execute()
                print(f"Cancelled Dataflow job: {job_id}")
    else:
        print("No running Dataflow jobs found.")

def delete_bigquery_table(project_id):
    bigquery_client = bigquery.Client(project=project_id)
    dataset_id = f"{project_id}.gtfs_dataset"
    table_id = f"{dataset_id}.gtfs_data"

    try:
        bigquery_client.delete_table(table_id)
        print(f"Deleted BigQuery table: {table_id}")
    except Exception as e:
        print(f"Error deleting BigQuery table: {e}")

def delete_files_in_bucket(bucket_name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    blobs = list(bucket.list_blobs())
    for blob in blobs:
        blob.delete()
        print(f"Deleted file: {blob.name}")

    print(f"All files deleted in bucket: {bucket_name}")

def delete_cloud_functions(project_id):
    functions = build('cloudfunctions', 'v1')
    result = functions.projects().locations().functions().list(
        parent=f"projects/{project_id}/locations/-").execute()

    if 'functions' in result:
        for function in result['functions']:
            function_name = function['name']
            functions.projects().locations().functions().delete(
                name=function_name).execute()
            print(f"Deleted Cloud Function: {function_name}")
    else:
        print("No Cloud Functions found.")

def delete_pubsub_resources(project_id, topic_name):
    publisher = pubsub_v1.PublisherClient()
    subscriber = pubsub_v1.SubscriberClient()
    topic_path = publisher.topic_path(project_id, topic_name)

    subscriptions = list(subscriber.list_subscriptions(request={"project": f"projects/{project_id}"}))
    for subscription in subscriptions:
        if subscription.topic == topic_path:
            subscriber.delete_subscription(request={"subscription": subscription.name})
            print(f"Deleted subscription: {subscription.name}")

    publisher.delete_topic(request={"topic": topic_path})
    print(f"Deleted Pub/Sub topic: {topic_path}")

def main():
    project_id = input("Enter your GCP project ID: ")

    stop_dataflow_jobs(project_id)
    delete_bigquery_table(project_id)

    # List buckets and prompt user to select one
    buckets = list_buckets()
    print("Select a bucket to delete data from:")
    for idx, bucket in enumerate(buckets):
        print(f"{idx + 1}. {bucket.name}")

    bucket_index = int(input("Enter the number of the bucket you want to delete data from: ")) - 1
    bucket_name = buckets[bucket_index].name

    delete_files_in_bucket(bucket_name)
    delete_cloud_functions(project_id)
    delete_pubsub_resources(project_id, 'iot-data-topic')

if __name__ == '__main__':
    main()
