In [None]:
%%bigquery
CREATE SCHEMA IF NOT EXISTS `challenge_4`
OPTIONS (
  location = 'US',
  description = 'Dataset for Challenge 4 flight data'
);

Query is running:   0%|          |

In [None]:
%%bigquery
CREATE TABLE IF NOT EXISTS `challenge_4.flight_data` (
  MT   STRING,
  TT   INT64,
  SID  STRING,
  AID  STRING,
  Hex  STRING,
  FID  STRING,
  DMG  DATE,
  TMG  TIME,
  DML  DATE,
  TML  TIME,
  CS   STRING,
  Alt  INT64,
  GS   INT64,
  Trk  INT64,
  Lat  FLOAT64,
  Lng  FLOAT64,
  VR   INT64,
  Sq   STRING,
  Alrt INT64,
  Emer INT64,
  SPI  INT64,
  Gnd  INT64
)

Query is running:   0%|          |

In [7]:
import json
from google.cloud import pubsub_v1
from google.cloud import bigquery

# Config
project_id = "paul-leroy"
topic_name = "flight-transponder"
subscription_name = "flight-transponder-sub"
dataset_id = "challenge_4"
table_id = "flight_data"

# Clients
subscriber = pubsub_v1.SubscriberClient()
bq_client = bigquery.Client()

# Paths
topic_path = f"projects/{project_id}/topics/{topic_name}"
subscription_path = subscriber.subscription_path('qwiklabs-gcp-01-4b0fd1e23a0f', subscription_name)
table_ref = bq_client.dataset(dataset_id).table(table_id)

In [None]:
pip install google-cloud-pubsub

Collecting google-cloud-pubsub
  Downloading google_cloud_pubsub-2.31.1-py3-none-any.whl.metadata (10 kB)
Downloading google_cloud_pubsub-2.31.1-py3-none-any.whl (319 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m319.2/319.2 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: google-cloud-pubsub
Successfully installed google-cloud-pubsub-2.31.1


In [None]:
pip install google-cloud-bigquery



In [19]:
from google.cloud import pubsub_v1
from google.cloud import bigquery

project_id = "paul-leroy"
subscription_name = "flight-transponder-sub"
dataset_id = "challenge_4"
table_id = "flight_data"

subscriber = pubsub_v1.SubscriberClient()
bq_client = bigquery.Client()

subscription_path = subscriber.subscription_path('qwiklabs-gcp-01-4b0fd1e23a0f', subscription_name)
table_ref = bq_client.dataset(dataset_id).table(table_id)

bq_fields = [
    "MT", "TT", "SID", "AID", "Hex", "FID", "DMG", "TMG", "DML", "TML",
    "CS", "Alt", "GS", "Trk", "Lat", "Lng", "VR", "Sq", "Alrt", "Emer", "SPI", "Gnd"
]

pubsub_fields = [
    "message_type", "transponder_type", "session_id", "aircraft_id", "hex_code", "flight_id",
    "date_generated", "time_generated", "date_logged", "time_logged", "callsign",
    "altitude", "ground_speed", "track", "latitude", "longitude", "vertical_rate",
    "squawk", "alert", "emergency", "spi", "on_ground"
]

response = subscriber.pull(
    request={
        "subscription": subscription_path,
        "max_messages": 50
    }
)

for received_message in response.received_messages:
    try:
        message_str = received_message.message.data.decode("utf-8")
        print(f"Raw message string: '{message_str}'")

        if not message_str.strip():
            print("Skipping empty message.")
            continue

        values = message_str.split(",")
        if len(values) != len(pubsub_fields):
            print(f"Unexpected field count: {len(values)}. Skipping.")
            continue

        row = {}
        for bq_field in bq_fields:
            pubsub_key = next((k for k, v in zip(pubsub_fields, bq_fields) if v == bq_field), None)
            if pubsub_key is None:
                continue

            idx = pubsub_fields.index(pubsub_key)
            val = values[idx]

            if val == "":
                row[bq_field] = None
            elif bq_field in ["TT", "Alt", "GS", "Trk", "VR", "Alrt", "Emer", "SPI", "Gnd"]:
                row[bq_field] = int(val)
            elif bq_field in ["Lat", "Lng"]:
                row[bq_field] = float(val)
            elif bq_field.startswith("DM") or bq_field.startswith("DL"):
                row[bq_field] = val.replace("/", "-")
            else:
                row[bq_field] = val

        print("Parsed row:", row)

        errors = bq_client.insert_rows_json(table_ref, [row])
        if errors:
            print(f"BigQuery insert errors: {errors}")
        else:
            print("Inserted message into BigQuery.")

        subscriber.acknowledge(
            request={
                "subscription": subscription_path,
                "ack_ids": [received_message.ack_id]
            }
        )
    except Exception as e:
        print(f"Error processing message: {e}")



Raw message string: 'MSG,5,1,1,4CADF9,1,2025/09/04,20:36:49.226,2025/09/04,20:36:49.243,,31725,,,,,,,0,,0,'
Parsed row: {'MT': 'MSG', 'TT': 5, 'SID': '1', 'AID': '1', 'Hex': '4CADF9', 'FID': '1', 'DMG': '2025-09-04', 'TMG': '20:36:49.226', 'DML': '2025-09-04', 'TML': '20:36:49.243', 'CS': None, 'Alt': 31725, 'GS': None, 'Trk': None, 'Lat': None, 'Lng': None, 'VR': None, 'Sq': None, 'Alrt': 0, 'Emer': None, 'SPI': 0, 'Gnd': None}
Inserted message into BigQuery.
Raw message string: 'MSG,5,1,1,4075FF,1,2025/09/04,20:39:27.820,2025/09/04,20:39:27.840,,14900,,,,,,,0,,0,'
Parsed row: {'MT': 'MSG', 'TT': 5, 'SID': '1', 'AID': '1', 'Hex': '4075FF', 'FID': '1', 'DMG': '2025-09-04', 'TMG': '20:39:27.820', 'DML': '2025-09-04', 'TML': '20:39:27.840', 'CS': None, 'Alt': 14900, 'GS': None, 'Trk': None, 'Lat': None, 'Lng': None, 'VR': None, 'Sq': None, 'Alrt': 0, 'Emer': None, 'SPI': 0, 'Gnd': None}
Inserted message into BigQuery.
Raw message string: 'MSG,8,1,1,3C5EE7,1,2025/09/04,20:39:27.820,2025/0

In [20]:
%%bigquery
SELECT COUNT(*) FROM `qwiklabs-gcp-01-4b0fd1e23a0f.challenge_4.flight_data`

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,f0_
0,60


In [21]:
%%bigquery
SELECT ST_GEOGPOINT(Lng, Lat) AS LOCATION FROM `qwiklabs-gcp-01-4b0fd1e23a0f.challenge_4.flight_data`
WHERE Lat IS NOT NULL AND Lng IS NOT NULL

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,LOCATION
0,POINT(-1.24491 52.07883)
1,POINT(1.05402 51.1778)
