# OpenSky Network - Flight Tracker

Fetch real-time flight data using REST data source.

**Available Regions:**
- `EUROPE`, `NORTH_AMERICA`, `SOUTH_AMERICA`, `ASIA`, `AUSTRALIA`, `AFRICA`

**Note:** Make sure to install the `pyspark-data-sources` package before running this notebook.

In [None]:
# Setup
from pyspark.sql import SparkSession
from pyspark_datasources import rest_api_call_csv, parse_array_response, parse_array_response_streaming, RestDataSource
import json

# Use getOrCreate to work with Databricks cluster
spark = SparkSession.builder.appName("OpenSky").getOrCreate()

# Define regions (from opensky.py)
regions = {
    "EUROPE": {"lamin": 35.0, "lamax": 72.0, "lomin": -25.0, "lomax": 45.0},
    "NORTH_AMERICA": {"lamin": 7.0, "lamax": 72.0, "lomin": -168.0, "lomax": -60.0},
    "SOUTH_AMERICA": {"lamin": -56.0, "lamax": 15.0, "lomin": -90.0, "lomax": -30.0},
    "ASIA": {"lamin": -10.0, "lamax": 82.0, "lomin": 45.0, "lomax": 180.0},
    "AUSTRALIA": {"lamin": -50.0, "lamax": -10.0, "lomin": 110.0, "lomax": 180.0},
    "AFRICA": {"lamin": -35.0, "lamax": 37.0, "lomin": -20.0, "lomax": 52.0},
}

# Column names for OpenSky flight arrays
column_names = [
    "icao24", "callsign", "origin_country", "time_position", "last_contact",
    "longitude", "latitude", "geo_altitude", "on_ground", "velocity",
    "true_track", "vertical_rate", "sensors", "baro_altitude",
    "squawk", "spi", "category"
]

print("✓ Ready!")

## Batch Example - One-Time Fetch

In [None]:
# Choose region
region = "NORTH_AMERICA"
bbox = regions[region]

# Create input DataFrame
input_df = spark.createDataFrame([{"region": region, **bbox}])
display(input_df)

# Fetch flights using CSV method (better for Databricks)
url = f"https://opensky-network.org/api/states/all?lamin={{lamin}}&lamax={{lamax}}&lomin={{lomin}}&lomax={{lomax}}"
response = rest_api_call_csv(input_df, url=url, method="GET", queryType="querystring", partitions="1")

# Parse to individual flights
flights = parse_array_response(response, array_path="states", column_names=column_names, timestamp_field="time")

print(f"\n✓ Found {flights.count()} flights")
display(flights.select("region", "time", "icao24", "callsign", "origin_country", "latitude", "longitude"))

## Alternative: Polling with Batch Calls

Since the OpenSky API returns array-based data (not object-based), we can use a simple loop with batch calls for continuous monitoring.

In [None]:
import time
from datetime import datetime

# Choose region
region = "NORTH_AMERICA"
bbox = regions[region]

# Create input DataFrame
input_df = spark.createDataFrame([{"region": region, **bbox}])

# Polling loop - fetch every 10 seconds for 5 minutes
print(f"Starting continuous monitoring of {region}...")
print("Press 'Stop' in Databricks to end the monitoring\n")

url = f"https://opensky-network.org/api/states/all?lamin={{lamin}}&lamax={{lamax}}&lomin={{lomin}}&lomax={{lomax}}"

try:
    for i in range(30):  # 30 iterations = 5 minutes at 10 seconds each
        # Fetch flights
        response = rest_api_call_csv(input_df, url=url, method="GET", queryType="querystring", partitions="1")
        flights = parse_array_response(response, array_path="states", column_names=column_names, timestamp_field="time")
        
        # Display results
        count = flights.count()
        print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Poll #{i+1}: Found {count} flights")
        
        if count > 0:
            display(flights.select("time", "icao24", "callsign", "origin_country", "latitude", "longitude").limit(10))
        
        # Wait before next poll (skip on last iteration)
        if i < 29:
            time.sleep(10)
            
except KeyboardInterrupt:
    print("\nMonitoring stopped by user")
    
print("\n✓ Monitoring completed")

## Streaming Example - Generic REST Data Source

Uses the generic REST data source with streaming. The key is to NOT extract individual records in the streaming reader, but return the whole response for parsing.

In [None]:
# Register REST data source
spark.dataSource.register(RestDataSource)

# Choose region
region = "NORTH_AMERICA"
bbox = regions[region]

# Create input DataFrame
input_df = spark.createDataFrame([{"region": region, **bbox}])
input_json = json.dumps([{"region": region, **bbox}])

print(f"Setting up streaming for region: {region}")
print(f"Bounding box: {bbox}\n")

# Configure streaming
# IMPORTANT: Set dataField="" to return the whole response as JSON string
url = "https://opensky-network.org/api/states/all?lamin={lamin}&lamax={lamax}&lomin={lomin}&lomax={lomax}"

print("Creating streaming DataFrame...")
stream_df = spark.readStream.format("rest") \
    .option("url", url) \
    .option("method", "GET") \
    .option("streaming", "true") \
    .option("inputData", input_json) \
    .option("queryType", "querystring") \
    .option("streamingInterval", "10") \
    .option("offsetType", "timestamp") \
    .option("offsetField", "time") \
    .option("initialOffset", "0") \
    .option("dataField", "") \
    .load()

print(f"✓ Stream DataFrame created")
print(f"Schema: {stream_df.schema}\n")

# Parse array response using helper function
print("Parsing array response using parse_array_response_streaming...")
flights = parse_array_response_streaming(
    stream_df, 
    array_path="states", 
    column_names=column_names, 
    timestamp_field="time"
)

print(f"✓ Flights DataFrame created")
print(f"Schema: {flights.schema}\n")

# Select columns to display
flights_display = flights.select(
    "region", "time", "icao24", "callsign", "origin_country", 
    "longitude", "latitude", "geo_altitude", "velocity"
)

print("Starting streaming query...")
print("Polling every 10 seconds. Use 'Stop' button to stop.\n")

# Start streaming
display(flights_display, streamName="opensky_flights")