In [70]:
days_back = 2
days_forward = 7
hopsworks_api_key = None 
swedavia_api_key = None 

In [73]:
import pandas as pd
import requests
from datetime import datetime, timedelta
import sys
import hopsworks
import great_expectations as ge
import time
import json
sys.path.append('.')

In [72]:
Swedavia_API_Base = "https://api.swedavia.se/flightinfo/v2"

In [26]:
def route_type_classification(airport_code):
    domestic = ["GOT", "MMX", "BRO", "VBY", "LLA", "UME", "OSD", "RNB", "KRN"]
    nordic = ["CPH", "OSL", "HEL", "BLL", "TRD", "BGO", "AAL", "KEF"]

    if airport_code in domestic:
        return "domestic"
    elif airport_code in nordic:
        return "nordic"
    else:
        return "international"

In [37]:
def fetch_swedavia_arrivals_departures(date_str, arrival_or_departure):
    if arrival_or_departure == 'arrival':
        url = f"{Swedavia_API_Base}/{Arlanda}/arrivals/{date_str}"
        headers = {
            "Ocp-Apim-Subscription-Key": swedavia_api_key,
            "Accept": "application/json",
            "Cache-Control": "no-cache"
        }
    
        try:
            response = requests.get(url, headers=headers, timeout=30)
            response.raise_for_status()
            data = response.json()
    
            flights = data.get('flights', [])
            return flights
    
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 429:
                print("Rate Limit hit. Wait 1 minute")
                time.sleep(60)
                return []
            else:
                print(f"Error: {e}")
                return []
                
    elif arrival_or_departure == 'departure':
        url = f"{Swedavia_API_Base}/{Arlanda}/departures/{date_str}"
        headers = {
            "Ocp-Apim-Subscription-Key": swedavia_api_key,
            "Accept": "application/json",
            "Cache-Control": "no-cache"
        }
    
        try:
            response = requests.get(url, headers=headers, timeout=30)
            response.raise_for_status()
            data = response.json()
    
            flights = data.get('flights', [])
            return flights
    
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 429:
                print("Rate Limit hit. Wait 1 minute")
                time.sleep(60)
                return []
            else:
                print(f"Error: {e}")
                return []

In [49]:
def process_arrivals(raw_flights, date_str):
    processed = []

    for flight in raw_flights:
        try:
            flight_id = flight.get('flightId', 'Unknown')
            airline_iata = flight.get('airlineOperator', {}).get('iata', 'UNK')

            flight_leg = flight.get('flightLegIdentifier', {})
            origin_iata = flight_leg.get('departureAirportIata', 'Unknown')
            origin_name = flight.get('departureAirportEnglish', origin_iata)

            if origin_iata == 'Unknown' or flight_id == 'Unknown':
                continue

            arrival_time = flight.get('arrivalTime', {})
            scheduled_utc = arrival_time.get('scheduledUtc')
            actual_utc = arrival_time.get('actualUtc')
            estimated_utc = arrival_time.get('estimatedUtc')

            if not scheduled_utc:
                continue

            scheduled_time = datetime.fromisoformat(scheduled_utc.replace('Z', '+00:00'))
            actual_time = None

            if actual_utc:
                actual_time = datetime.fromisoformat(actual_utc.replace('Z', '+00:00'))
            elif estimated_utc:
                actual_time = datetime.fromisoformat(estimated_utc.replace('Z', '+00:00'))

            delay_minutes = 0
            is_delayed = False
            if actual_time and scheduled_time:
                delay_seconds = (actual_time - scheduled_time).total_seconds()
                delay_minutes = delay_seconds / 60
                is_delayed = delay_minutes > 15

            location_status = flight.get('locationAndStatus', {})
            status = location_status.get('flightLegStatus', 'UNKNOWN')
            terminal = location_status.get('terminal', '')
            gate = location_status.get('gate', '')

            route_type = route_type_classification(origin_iata)

            record = {
                
                'flight_id': f"{flight_id}_{date_str}_ARR",
                'flight_number': flight_id,
                'airline_code': airline_iata,
                'flight_direction': 'arrival',
                'origin_airport': origin_iata,
                'destination_airport': 'ARN',
                'arn_airport_role': 'destination',
                'route': f"{origin_iata}-ARN",
                'scheduled_time': scheduled_time.strftime('%Y-%m-%d %H:%M:%S'),
                'actual_time': actual_time.strftime('%Y-%m-%d %H:%M:%S') if actual_time else None,
                'delay_minutes': float(delay_minutes),
                'is_delayed': bool(is_delayed),
                'route_type': route_type,
                'flight_status': status,
                'terminal': terminal,
                'gate': gate,
                'di_indicator': flight.get('diIndicator', ''),
                'created_at': datetime.now()
            }
            
            processed.append(record)
        except Exception as e:
            print(f"Error: {e}")
            continue

    return processed

In [50]:
def process_departures(raw_flights, date_str):
    processed = []

    for flight in raw_flights:
        try:
            flight_id = flight.get('flightId', 'Unknown')
            airline_iata = flight.get('airlineOperator', {}).get('iata', 'UNK')

            flight_leg = flight.get('flightLegIdentifier', {})
            dest_iata = flight_leg.get('arrivalAirportIata', 'Unknown')
            dest_name = flight.get('arrivalAirportEnglish', dest_iata)

            if dest_iata == 'Unknown' or flight_id == 'Unknown':
                continue

            departure_time = flight.get('departureTime', {})
            scheduled_utc = departure_time.get('scheduledUtc')
            actual_utc = departure_time.get('actualUtc')
            estimated_utc = departure_time.get('estimatedUtc')

            if not scheduled_utc:
                continue

            scheduled_time = datetime.fromisoformat(scheduled_utc.replace('Z', '+00:00'))
            actual_time = None

            if actual_utc:
                actual_time = datetime.fromisoformat(actual_utc.replace('Z', '+00:00'))
            elif estimated_utc:
                actual_time = datetime.fromisoformat(estimated_utc.replace('Z', '+00:00'))

            delay_minutes = 0
            is_delayed = False
            if actual_time and scheduled_time:
                delay_seconds = (actual_time - scheduled_time).total_seconds()
                delay_minutes = delay_seconds / 60
                is_delayed = delay_minutes > 15

            location_status = flight.get('locationAndStatus', {})
            status = location_status.get('flightLegStatus', 'UNKNOWN')
            terminal = location_status.get('terminal', '')
            gate = location_status.get('gate', '')

            route_type = route_type_classification(dest_iata)

            record = {
                
                'flight_id': f"{flight_id}_{date_str}_DEP",
                'flight_number': flight_id,
                'airline_code': airline_iata,
                'flight_direction': 'departure',
                'origin_airport': 'ARN',
                'destination_airport': dest_iata,
                'arn_airport_role': 'origin',
                'route': f"ARN-{dest_iata}",
                'scheduled_time': scheduled_time.strftime('%Y-%m-%d %H:%M:%S'),
                'actual_time': actual_time.strftime('%Y-%m-%d %H:%M:%S') if actual_time else None,
                'delay_minutes': float(delay_minutes),
                'is_delayed': bool(is_delayed),
                'route_type': route_type,
                'flight_status': status,
                'terminal': terminal,
                'gate': gate,
                'di_indicator': flight.get('diIndicator', ''),
                'created_at': datetime.now()
            }
            
            processed.append(record)
        except Exception as e:
            print(f"Error: {e}")
            continue

    return processed

In [51]:
def fetch_all_arn_flights(days_before, days_after):
    if not swedavia_api_key:
        raise ValueError(
            "API not found!"
        )

    all_flights = []

    for day in range(days_before):
        date = datetime.now() - timedelta(days=day)
        date_str = date.strftime("%Y-%m-%d")

        arr_raw = fetch_swedavia_arrivals_departures(date_str, 'arrival')
        arr_processed = process_arrivals(arr_raw, date_str)
        all_flights.extend(arr_processed)

        time.sleep(3)

        dep_raw = fetch_swedavia_arrivals_departures(date_str, 'departure')
        dep_processed = process_departures(dep_raw, date_str)
        all_flights.extend(dep_processed)

        if day < days_before - 1:
            time.sleep(3)

    for day in range(days_after):
        date = datetime.now() + timedelta(days=day)
        date_str = date.strftime("%Y-%m-%d")

        arr_raw = fetch_swedavia_arrivals_departures(date_str, 'arrival')
        arr_processed = process_arrivals(arr_raw, date_str)
        all_flights.extend(arr_processed)

        time.sleep(3)

        dep_raw = fetch_swedavia_arrivals_departures(date_str, 'departure')
        dep_processed = process_departures(dep_raw, date_str)
        all_flights.extend(dep_processed)

        if day < days_before - 1:
            time.sleep(3)
        

    return pd.DataFrame(all_flights)

In [59]:
df_flights = fetch_all_arn_flights(1, 90)

In [60]:
len(df_flights)

49666

## Uploading Created Dataframe on Hopsworks

In [61]:
project = hopsworks.login(api_key_value=hopsworks_api_key, host="eu-west.cloud.hopsworks.ai")

2026-01-02 00:32:22,114 INFO: Closing external client and cleaning up certificates.
2026-01-02 00:32:22,125 INFO: Connection closed.
2026-01-02 00:32:22,129 INFO: Initializing external client
2026-01-02 00:32:22,129 INFO: Base URL: https://eu-west.cloud.hopsworks.ai:443
2026-01-02 00:32:23,033 INFO: Python Engine initialized.

Logged in to project, explore it here https://eu-west.cloud.hopsworks.ai:443/p/3207


In [62]:
fs = project.get_feature_store()

In [64]:
airline_dep_arr_fg = fs.get_or_create_feature_group(
    name='flight_schedules',
    description='Complete Flight Data for Arlanda Airport from 01 Jan to 01 Apr',
    version=1,
    primary_key=['flight_id']
)

In [65]:
airline_dep_arr_fg.insert(df_flights)

Feature Group created successfully, explore it at 
https://eu-west.cloud.hopsworks.ai:443/p/3207/fs/3151/fg/3278


Uploading Dataframe: 100.00% |█████████████████████████████████████████████████████████████████████████████████| Rows 49666/49666 | Elapsed Time: 00:06 | Remaining Time: 00:00


Launching job: flight_schedules_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://eu-west.cloud.hopsworks.ai:443/p/3207/jobs/named/flight_schedules_1_offline_fg_materialization/executions


(Job('flight_schedules_1_offline_fg_materialization', 'SPARK'), None)

In [68]:
airline_dep_arr_fg.update_feature_description("di_indicator", "Indicates whether a flight is domestic or international")

<hsfs.feature_group.FeatureGroup at 0x306dd5720>