# Airports ETL from Flight Samples â†’ MongoDB

This notebook derives **airports** from raw flight CSV files (columns include `estdepartureairport`, `estarrivalairport`), cleans ICAO codes, computes simple traffic stats, writes a JSONL, and **upserts** into the validated `openair.airports` collection.

It is designed to work with files like `flight_sample_2022-09-01.csv`, and can be pointed at multiple days.


In [10]:
# ðŸ”§ Parameters
INPUT_FILES = [
    "../data_raw/flight_sample_2022-09-01.csv","../data_raw/flight_sample_2022-09-02.csv","../data_raw/flight_sample_2022-09-03.csv","../data_raw/flight_sample_2022-09-04.csv",
    "../data_raw/flight_sample_2022-09-05.csv","../data_raw/flight_sample_2022-09-06.csv", "../data_raw/flight_sample_2022-09-07.csv","../data_raw/flight_sample_2022-09-08.csv",
    "../data_raw/flight_sample_2022-09-09.csv","../data_raw/flight_sample_2022-09-10.csv"
]
OUTPUT_JSONL = "../data_enriched/airports_from_flights.jsonl"

# Mongo
MONGO_URI = "mongodb://localhost:27017"
DB_NAME   = "openair"

INPUT_FILES, OUTPUT_JSONL, MONGO_URI, DB_NAME

(['../data_raw/flight_sample_2022-09-01.csv',
  '../data_raw/flight_sample_2022-09-02.csv',
  '../data_raw/flight_sample_2022-09-03.csv',
  '../data_raw/flight_sample_2022-09-04.csv',
  '../data_raw/flight_sample_2022-09-05.csv',
  '../data_raw/flight_sample_2022-09-06.csv',
  '../data_raw/flight_sample_2022-09-07.csv',
  '../data_raw/flight_sample_2022-09-08.csv',
  '../data_raw/flight_sample_2022-09-09.csv',
  '../data_raw/flight_sample_2022-09-10.csv'],
 '../data_enriched/airports_from_flights.jsonl',
 'mongodb://localhost:27017',
 'openair')

In [11]:
import pandas as pd, numpy as np, re, json, os
from datetime import datetime, timezone
from pymongo import MongoClient, UpdateOne

print("Imports ready")

Imports ready


## Helpers

In [12]:
ICAO4 = re.compile(r"^[A-Z0-9]{4}$")

def clean_icao_airport(x):
    if x is None or (isinstance(x, float) and np.isnan(x)):
        return None
    s = str(x).strip().upper()
    return s if ICAO4.fullmatch(s) else None

def to_dt(ts):
    if ts is None or (isinstance(ts, float) and np.isnan(ts)):
        return None
    try:
        return datetime.fromtimestamp(int(ts), tz=timezone.utc)
    except Exception:
        return None

def ensure_dirs(path):
    d = os.path.dirname(path)
    if d and not os.path.exists(d):
        os.makedirs(d, exist_ok=True)

## Load flight CSVs

In [13]:
dfs = []
for p in INPUT_FILES:
    df = pd.read_csv(p)
    # strip accidental header whitespace like ' typecode'/' registration'
    df.columns = [c.strip() for c in df.columns]
    dfs.append(df)

raw = pd.concat(dfs, ignore_index=True)
print("Rows:", len(raw))
raw.head(5)

Rows: 1051765


Unnamed: 0,icao24,firstseen,lastseen,callsign,estdepartureairport,estarrivalairport,model,typecode,registration
0,44cdc6,1662054000.0,1662065618,BEL40V,,EBBR,A320 214,A320,OO-SNF
1,44cdc6,1662032000.0,1662047517,BEL7QJ,EBBR,,A320 214,A320,OO-SNF
2,44cdc6,1662022000.0,1662028405,BEL8DG,LIRF,EBBR,A320 214,A320,OO-SNF
3,44cdc6,1662011000.0,1662016737,BEL1YC,EBBR,LIRF,A320 214,A320,OO-SNF
4,4601f7,1662067000.0,1662068749,FIN7A,EFHK,EFTP,ATR 72 500,AT75,OH-ATJ


## Clean & Select Fields

In [14]:
df = raw.copy()

# Airports: validate ICAO codes
df["dep_icao"] = df["estdepartureairport"].apply(clean_icao_airport) if "estdepartureairport" in df else None
df["arr_icao"] = df["estarrivalairport"].apply(clean_icao_airport)   if "estarrivalairport" in df else None

# Times for optional traffic window bounds
df["dep_time"] = df["firstseen"].apply(to_dt)  if "firstseen"  in df else None
df["arr_time"] = df["lastseen"].apply(to_dt)   if "lastseen"   in df else None

# Drop rows where both airports are missing
df = df[ df["dep_icao"].notna() | df["arr_icao"].notna() ].copy()
print("Rows with at least one valid airport:", len(df))

# Build airport entries for dep and arr, stack
dep = df[["dep_icao","dep_time"]].dropna(subset=["dep_icao"]).rename(columns={"dep_icao":"icao","dep_time":"t"})
arr = df[["arr_icao","arr_time"]].dropna(subset=["arr_icao"]).rename(columns={"arr_icao":"icao","arr_time":"t"})
aa  = pd.concat([dep, arr], ignore_index=True)

# Aggregate traffic stats per airport
grp = aa.groupby("icao").agg(
    first_seen=("t","min"),
    last_seen=("t","max"),
    traffic_rows=("t","count")
).reset_index()

# Split dep/arr counts separately
dep_counts = dep.groupby("icao").size().rename("dep_count")
arr_counts = arr.groupby("icao").size().rename("arr_count")
stats = grp.join(dep_counts, on="icao").join(arr_counts, on="icao").fillna(0)
stats["dep_count"] = stats["dep_count"].astype(int)
stats["arr_count"] = stats["arr_count"].astype(int)

print("Airports derived:", len(stats))
stats.head(10)

Rows with at least one valid airport: 1010688
Airports derived: 12367


Unnamed: 0,icao,first_seen,last_seen,traffic_rows,dep_count,arr_count
0,00AK,2022-09-01 18:32:59+00:00,2022-09-10 17:41:58+00:00,18,10,8
1,00AL,2022-09-01 16:06:39+00:00,2022-09-08 12:55:43+00:00,7,2,5
2,00AS,2022-09-08 18:21:59+00:00,2022-09-08 19:17:03+00:00,2,1,1
3,00AZ,2022-09-01 00:05:20+00:00,2022-09-10 16:57:44+00:00,67,27,40
4,00CL,2022-09-01 13:50:36+00:00,2022-09-10 17:59:33+00:00,12,3,9
5,00FA,2022-09-02 23:09:20+00:00,2022-09-06 15:22:12+00:00,3,1,2
6,00FL,2022-09-01 12:40:26+00:00,2022-09-10 14:32:28+00:00,6,2,4
7,00GA,2022-09-04 00:00:04+00:00,2022-09-08 20:43:11+00:00,3,2,1
8,00IL,2022-09-01 02:27:42+00:00,2022-09-10 17:33:57+00:00,6,3,3
9,00IS,2022-09-07 20:32:31+00:00,2022-09-08 23:07:46+00:00,2,1,1


## Export JSONL (schema-valid, with placeholder loc)

In [15]:
ensure_dirs(OUTPUT_JSONL)

with open(OUTPUT_JSONL, "w", encoding="utf-8") as f:
    for _, r in stats.iterrows():
        doc = {
            "icao": r["icao"],
            # placeholders; validator requires loc, other fields can be null
            "name": None,
            "city": None,
            "country": None,
            "loc": { "type": "Point", "coordinates": [0.0, 0.0] },
            # optional enrichment fields (validator allows extra fields)
            "first_seen": r["first_seen"].isoformat() if pd.notna(r["first_seen"]) else None,
            "last_seen":  r["last_seen"].isoformat()  if pd.notna(r["last_seen"])  else None,
            "dep_count": int(r["dep_count"]),
            "arr_count": int(r["arr_count"])
        }
        f.write(json.dumps(doc) + "\n")

print("Wrote:", OUTPUT_JSONL)

Wrote: ../data_enriched/airports_from_flights.jsonl


## Load to MongoDB (upsert)

In [16]:
client = MongoClient(MONGO_URI)
db = client[DB_NAME]

ops = []
with open(OUTPUT_JSONL, "r", encoding="utf-8") as f:
    for line in f:
        a = json.loads(line)
        # Convert ISO strings to Date for first/last_seen if present
        set_payload = {}
        for k in ("first_seen","last_seen"):
            if a.get(k):
                s = a[k].replace("Z","+00:00") if a[k].endswith("Z") else a[k]
                try:
                    set_payload[k] = datetime.fromisoformat(s)
                except Exception:
                    pass
        # always set counts
        set_payload["dep_count"] = a.get("dep_count", 0)
        set_payload["arr_count"] = a.get("arr_count", 0)
        # optional null-able
        for k in ("name","city","country"):
            if k in a:
                set_payload[k] = a[k]
        # loc is required by your schema; keep it via $setOnInsert so we don't overwrite real coords later
        ops.append(UpdateOne(
            { "icao": a["icao"] },
            {
                "$set": set_payload,
                "$setOnInsert": {
                    "icao": a["icao"],
                    "loc": a["loc"]
                }
            },
            upsert=True
        ))

if ops:
    res = db.airports.bulk_write(ops, ordered=False)
    print("Upserts:", res.upserted_count, " Modified:", res.modified_count)
else:
    print("No ops")

Upserts: 0  Modified: 0


## Sanity Checks

In [17]:
print("Airports count:", db.airports.count_documents({}))
print("Sample airport:")
print(db.airports.find_one({}, {"_id":0}))

Airports count: 12367
Sample airport:
{'icao': '00AK', 'arr_count': 8, 'city': None, 'country': None, 'dep_count': 10, 'first_seen': datetime.datetime(2022, 9, 1, 18, 32, 59), 'last_seen': datetime.datetime(2022, 9, 10, 17, 41, 58), 'loc': {'type': 'Point', 'coordinates': [0.0, 0.0]}, 'name': None}
