In [54]:
import google.auth
from google.cloud import bigquery
from google.api_core import exceptions

## Get the project_id
CREDENTIALS, PROJECT_ID = google.auth.default()
print(f"Detected Project ID: {PROJECT_ID}")

DATASET_ID = 'bootcamp_challenge4'
TABLE_ID = 'streaming_data'
LOCATION = 'US'

client = bigquery.Client(project=PROJECT_ID)

Detected Project ID: qwiklabs-gcp-00-c2e92c8fc9eb


In [55]:
schema = [
 bigquery.SchemaField("MT", "STRING", mode="NULLABLE", description="SEL ID AIR STA CLK MSG info http://woodair.net/sbs/Article/Barebones42_Socket_Data.htm"),
 bigquery.SchemaField("TT", "INT64", mode="NULLABLE", description="1 - 8"),
 bigquery.SchemaField("SID", "STRING", mode="NULLABLE", description="Database Session record number"),
 bigquery.SchemaField("AID", "STRING", mode="NULLABLE", description="Database Aircraft record number"),
 bigquery.SchemaField("Hex", "STRING", mode="NULLABLE", description="Aircraft Mode S hexadecimal code https://opensky-network.org/datasets/metadata/"),
 bigquery.SchemaField("FID", "STRING", mode="NULLABLE", description="Database Flight record number"),
 bigquery.SchemaField("DMG", "DATE", mode="NULLABLE", description="Date message generated"),
 bigquery.SchemaField("TMG", "TIME", mode="NULLABLE", description="Time message generated"),
 bigquery.SchemaField("DML", "DATE", mode="NULLABLE", description="Date message logged"),
 bigquery.SchemaField("TML", "TIME", mode="NULLABLE", description="Time message logged"),
bigquery.SchemaField("CS", "STRING", mode="NULLABLE", description="Callsign (flight number or registration)"),
 bigquery.SchemaField("Alt", "INT64", mode="NULLABLE", description="Mode C altitude (Flight Level)"),
 bigquery.SchemaField("GS", "INT64", mode="NULLABLE", description="Ground Speed"),
 bigquery.SchemaField("Trk", "INT64", mode="NULLABLE", description="Track"),
 bigquery.SchemaField("Lat", "FLOAT64", mode="NULLABLE", description="Latitude (N/E positive, S/W negative)"),
 bigquery.SchemaField("Lng", "FLOAT64", mode="NULLABLE", description="Longitude (N/E positive, S/W negative)"),
 bigquery.SchemaField("VR", "INT64", mode="NULLABLE", description="Vertical Rate"),
 bigquery.SchemaField("Sq", "STRING", mode="NULLABLE", description="Assigned Mode A squawk code"),
 bigquery.SchemaField("Alrt", "INT64", mode="NULLABLE", description="Flag to indicate squawk has changed"),
 bigquery.SchemaField("Emer", "INT64", mode="NULLABLE", description="Flag to indicate emergency code has been set"),
 bigquery.SchemaField("SPI", "INT64", mode="NULLABLE", description="Flag to indicate transponder Ident has been activated"),
 bigquery.SchemaField("Gnd", "INT64", mode="NULLABLE", description="Flag to indicate ground squat switch is active"),
]

In [56]:
## Create BigQuery Dataset

def setup_bigquery_resources():

    dataset_ref = bigquery.DatasetReference(PROJECT_ID, DATASET_ID)
    dataset = bigquery.Dataset(dataset_ref)
    dataset.location = LOCATION

    try:
        dataset = client.create_dataset(dataset, timeout=30)
        print(f"Created dataset {PROJECT_ID}.{DATASET_ID}")
    except exceptions.Conflict:
        print(f"Dataset {DATASET_ID} already exists.")

    # 4. Create Table
    table_ref = dataset_ref.table(TABLE_ID)
    table = bigquery.Table(table_ref, schema=schema)

    try:
        table = client.create_table(table)
        print(f"Created table {PROJECT_ID}.{DATASET_ID}.{TABLE_ID}")
    except exceptions.Conflict:
        print(f"Table {TABLE_ID} already exists.")

setup_bigquery_resources()

Dataset bootcamp_challenge4 already exists.
Table streaming_data already exists.


In [57]:
## Create a Topic
!gcloud pubsub subscriptions create flight-transponder-sub \
  --topic=projects/paul-leroy/topics/flight-transponder \
  --project=qwiklabs-gcp-00-c2e92c8fc9eb

[1;31mERROR:[0m Failed to create subscription [projects/qwiklabs-gcp-00-c2e92c8fc9eb/subscriptions/flight-transponder-sub]: Resource already exists in the project (resource=flight-transponder-sub).
[1;31mERROR:[0m (gcloud.pubsub.subscriptions.create) Failed to create the following: [flight-transponder-sub].


In [58]:
from google.cloud import pubsub_v1

PROJECT_ID = "qwiklabs-gcp-00-c2e92c8fc9eb"
SUBSCRIPTION_ID = "flight-transponder-sub"

subscriber = pubsub_v1.SubscriberClient()

subscription_path = subscriber.subscription_path(
    PROJECT_ID,
    SUBSCRIPTION_ID
)

response = subscriber.pull(
    request={
        "subscription": subscription_path,
        "max_messages": 10,
    }
)

for msg in response.received_messages:
    print(msg.message.data.decode("utf-8"))

MSG,8,1,1,485A32,1,2026/01/15,21:24:45.027,2026/01/15,21:24:45.056,,,,,,,,,,,,0
MSG,5,1,1,406FDA,1,2026/01/15,21:24:41.021,2026/01/15,21:24:41.068,,16300,,,,,,,0,,0,
MSG,5,1,1,407E7B,1,2026/01/15,21:24:45.029,2026/01/15,21:24:45.056,,32000,,,,,,,0,,0,
MSG,7,1,1,4071D7,1,2026/01/15,21:24:45.030,2026/01/15,21:24:45.056,,5825,,,,,,,,,,
MSG,8,1,1,801406,1,2026/01/15,21:24:45.031,2026/01/15,21:24:45.056,,,,,,,,,,,,0
MSG,8,1,1,4D2242,1,2026/01/15,21:24:45.034,2026/01/15,21:24:45.056,,,,,,,,,,,,0
MSG,8,1,1,4D2242,1,2026/01/15,21:24:45.035,2026/01/15,21:24:45.057,,,,,,,,,,,,0
MSG,8,1,1,40756E,1,2026/01/15,21:24:45.035,2026/01/15,21:24:45.057,,,,,,,,,,,,0
MSG,8,1,1,4D2242,1,2026/01/15,21:24:41.022,2026/01/15,21:24:41.068,,,,,,,,,,,,0
MSG,5,1,1,780D9D,1,2026/01/15,21:24:41.022,2026/01/15,21:24:41.068,,6425,,,,,,,0,,0,


In [59]:
## Load Data

def clean_date(date_str):
    return date_str.replace("/", "-") if date_str else None

def clean_time(time_str):
    return time_str.split(".")[0] if time_str else None

def parse_csv_message(line: str) -> dict:
    f = line.strip().split(",")

    return {
        "MT": f[0],
        "TT": int(f[1]) if f[1] else None,
        "SID": f[2],
        "AID": f[3],
        "Hex": f[4],
        "FID": f[5],
        "DMG": clean_date(f[6]),
        "TMG": clean_time(f[7]),
        "DML": clean_date(f[8]),
        "TML": clean_time(f[9]),
        "CS": f[10] or None,
        "Alt": int(f[11]) if f[11] else None,
        "GS": int(f[12]) if f[12] else None,
        "Trk": int(f[13]) if f[13] else None,
        "Lat": float(f[14]) if f[14] else None,
        "Lng": float(f[15]) if f[15] else None,
        "VR": int(f[16]) if f[16] else None,
        "Sq": f[17] or None,
        "Alrt": int(f[18]) if f[18] else None,
        "Emer": int(f[19]) if f[19] else None,
        "SPI": int(f[20]) if f[20] else None,
        "Gnd": int(f[21]) if f[21] else None,
    }

def pull_and_load():
    response = subscriber.pull(
        request={
            "subscription": subscription_path,
            "max_messages": 500,
        }
    )

    if not response.received_messages:
        return

    rows = []
    ack_ids = []

    for msg in response.received_messages:
        csv_line = msg.message.data.decode("utf-8")
        rows.append(parse_csv_message(csv_line))
        ack_ids.append(msg.ack_id)

    errors = client.insert_rows_json(f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}", rows)

    if errors:
        print("BigQuery errors:", errors)
    else:
        subscriber.acknowledge(
            request={
                "subscription": subscription_path,
                "ack_ids": ack_ids,
            }
        )
        print(f"Inserted {len(rows)} rows.")



import time
start_time = time.time()
duration = 90 # seconds

# The Loop logic you requested
while True:
    current_time = time.time()
    elapsed = current_time - start_time

    if elapsed >= duration:
        print(f"{duration} seconds elapsed. Shutting down...")
        streaming_pull_future.cancel() # Stop the subscriber
        break # Exit the while loop

    # Prevent the loop from maxing out CPU while waiting
    time.sleep(1)
    print(f"Time remaining: {int(duration - elapsed)}s", end="\r")

    # Execute the task
    pull_and_load()


Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 488 rows.
Inserted 430 rows.
Inserted 500 rows.
Inserted 500 rows.
Inserted 500 rows.
90 seconds elapsed. Shutting down...


In [60]:
## Count total records
sql_query = f"""
    SELECT
        COUNT(*) AS total_records
    FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`
"""

query_job = bq_client.query(sql_query)
results = query_job.result()

for row in results:
  print(f"--- Query Results ---")
  print(f"Total records in table: {row.total_records}")


--- Query Results ---
Total records in table: 105236
