# Pipeline

1. Maak SQL scripts voor schema's: RAW, ARCHIVED, CLEANSED -finished
2. Importeer source data in RAW -finished
3. Data cleaning => RAW naar ARCHIVED en CLEANSED -finished
4. Maak SQL scripts voor Data Warehouse / Ster schema
5. Import van CLEANSED naar DWH - finished
6. Prep Data lake: export tabellen naar Parquet files - finished
7. Upload Parquet files naar S3 (eerst bucket aanmaken) - finished
8. Maak Athena tables
9. Gebruik Athena in BI tool naar keuze

In [211]:
%pip install -q pandas sqlalchemy psycopg2-binary

Note: you may need to restart the kernel to use updated packages.


## Stap 1: SQL scripts

In [212]:
import psycopg2

# Verbindingsgegevens
host = "192.168.56.1"
dbname = "postgres"
user = "postgres"
password = "Newpassword"
port = "5433"  # Standaard PostgreSQL poort

# Maak de verbinding
conn = psycopg2.connect(
    host=host,
    dbname=dbname,
    user=user,
    password=password,
    port=port
)

# Maak een cursor aan
cur = conn.cursor()
# Open het SQL-bestand
with open('./sql_scripts/raw.sql', 'r') as file:
    sql_script = file.read()
cur.execute(sql_script)

with open('./sql_scripts/archived.sql', 'r') as file:
    sql_script = file.read()
cur.execute(sql_script)

with open('./sql_scripts/cleansed.sql', 'r') as file:
    sql_script = file.read()
cur.execute(sql_script)

conn.commit()  # Vergeet niet te committeren als het script wijzigingen maakt



## Stap 2: raw importeren

In [213]:
from sqlalchemy import create_engine, types as sqlalchemytypes
import pandas as pd

# Database connection details
engine = create_engine('postgresql://postgres:Newpassword@192.168.56.1:5433/postgres')

# List of table names and corresponding file paths
tables = {
    'aankomst': './source_data/export_aankomst.txt',
    'banen': './source_data/export_banen.csv',
    'klant': './source_data/export_klant.csv',
    'luchthavens': './source_data/export_luchthavens.txt',
    'maatschappijen': './source_data/export_maatschappijen.txt',
    'planning': './source_data/export_planning.txt',
    'vertrek': './source_data/export_vertrek.txt',
    'vliegtuig': './source_data/export_vliegtuig.txt',
    'vliegtuigtype': './source_data/export_vliegtuigtype.csv',
    'vlucht': './source_data/export_vlucht.txt',
    'weer': './source_data/export_weer.txt'
}

# Column types for each table
column_types_raw = {
    'aankomst': {
        "Vluchtid": sqlalchemytypes.String,
        "Vliegtuigcode": sqlalchemytypes.String,
        "Terminal": sqlalchemytypes.String,
        "Gate": sqlalchemytypes.String,
        "Baan": sqlalchemytypes.String,
        "Bezetting": sqlalchemytypes.String,
        "Vracht": sqlalchemytypes.String,
        "Aankomsttijd": sqlalchemytypes.String,
    },
    'banen': {
        "Baannummer": sqlalchemytypes.String,
        "Code": sqlalchemytypes.String,
        "Naam": sqlalchemytypes.String,
        "Lengte": sqlalchemytypes.String,
    },
    'klant': {
        "Vluchtid": sqlalchemytypes.String,
        "Operatie": sqlalchemytypes.String,
        "Faciliteiten": sqlalchemytypes.String,
        "Shops": sqlalchemytypes.String,
    },
    'luchthavens': {
        "Airport": sqlalchemytypes.String,
        "City": sqlalchemytypes.String,
        "Country": sqlalchemytypes.String,
        "IATA": sqlalchemytypes.String,
        "ICAO": sqlalchemytypes.String,
        "Lat": sqlalchemytypes.String,
        "Lon": sqlalchemytypes.String,
        "Alt": sqlalchemytypes.String,
        "TZ": sqlalchemytypes.String,
        "DST": sqlalchemytypes.String,
        "TzName": sqlalchemytypes.String,
    },
    'maatschappijen': {
        "Name": sqlalchemytypes.String,
        "IATA": sqlalchemytypes.String,
        "ICAO": sqlalchemytypes.String,
    },
    'planning': {
        "Vluchtnr": sqlalchemytypes.String,
        "Airlinecode": sqlalchemytypes.String,
        "Destcode": sqlalchemytypes.String,
        "Planterminal": sqlalchemytypes.String,
        "Plangate": sqlalchemytypes.String,
        "Plantijd": sqlalchemytypes.String,
    },
    'vertrek': {
        "Vluchtid": sqlalchemytypes.String,
        "Vliegtuigcode": sqlalchemytypes.String,
        "Terminal": sqlalchemytypes.String,
        "Gate": sqlalchemytypes.String,
        "Baan": sqlalchemytypes.String,
        "Bezetting": sqlalchemytypes.String,
        "Vracht": sqlalchemytypes.String,
        "Vertrektijd": sqlalchemytypes.String,
    },
    'vliegtuig': {
        "Airlinecode": sqlalchemytypes.String,
        "Vliegtuigcode": sqlalchemytypes.String,
        "Vliegtuigtype": sqlalchemytypes.String,
        "Bouwjaar": sqlalchemytypes.String,
    },
    'vliegtuigtype': {
        "IATA": sqlalchemytypes.String,
        "ICAO": sqlalchemytypes.String,
        "Merk": sqlalchemytypes.String,
        "Type": sqlalchemytypes.String,
        "Wake": sqlalchemytypes.String,
        "Cat": sqlalchemytypes.String,
        "Capaciteit": sqlalchemytypes.String,
        "Vracht": sqlalchemytypes.String,
    },
    'vlucht': {
        "Vluchtid": sqlalchemytypes.String,
        "Vluchtnr": sqlalchemytypes.String,
        "Airlinecode": sqlalchemytypes.String,
        "Destcode": sqlalchemytypes.String,
        "Vliegtuigcode": sqlalchemytypes.String,
        "Datum": sqlalchemytypes.String,
    },
    'weer': {
        "Datum": sqlalchemytypes.String,
        "DDVEC": sqlalchemytypes.String,
        "FHVEC": sqlalchemytypes.String,
        "FG": sqlalchemytypes.String,
        "FHX": sqlalchemytypes.String,
        "FHXH": sqlalchemytypes.String,
        "FHN": sqlalchemytypes.String,
        "FHNH": sqlalchemytypes.String,
        "FXX": sqlalchemytypes.String,
        "FXXH": sqlalchemytypes.String,
        "TG": sqlalchemytypes.String,
        "TN": sqlalchemytypes.String,
        "TNH": sqlalchemytypes.String,
        "TX": sqlalchemytypes.String,
        "TXH": sqlalchemytypes.String,
        "T10N": sqlalchemytypes.String,
        "T10NH": sqlalchemytypes.String,
        "SQ": sqlalchemytypes.String,
        "SP": sqlalchemytypes.String,
        "Q": sqlalchemytypes.String,
        "DR": sqlalchemytypes.String,
        "RH": sqlalchemytypes.String,
        "RHX": sqlalchemytypes.String,
        "RHXH": sqlalchemytypes.String,
        "PG": sqlalchemytypes.String,
        "PX": sqlalchemytypes.String,
        "PXH": sqlalchemytypes.String,
        "PN": sqlalchemytypes.String,
        "PNH": sqlalchemytypes.String,
        "VVN": sqlalchemytypes.String,
        "VVNH": sqlalchemytypes.String,
        "VVX": sqlalchemytypes.String,
        "VVXH": sqlalchemytypes.String,
        "NG": sqlalchemytypes.String,
        "UG": sqlalchemytypes.String,
        "UX": sqlalchemytypes.String,
        "UXH": sqlalchemytypes.String,
        "UN": sqlalchemytypes.String,
        "UNH": sqlalchemytypes.String,
        "EV2": sqlalchemytypes.String,
    }
}

# Load each table's CSV file and write to the database
for table, file_path in tables.items():
    if table in ['banen', 'klant', 'vliegtuigtype']:
        try:
            df = pd.read_csv(file_path, sep=';', dtype=str, encoding='utf-8')
        except UnicodeDecodeError:
            df = pd.read_csv(file_path, sep=';', dtype=str, encoding='latin1')
    else:
        try:
            df = pd.read_csv(file_path, sep='\t', dtype=str, encoding='utf-8')
        except UnicodeDecodeError:
            df = pd.read_csv(file_path, sep='\t', dtype=str, encoding='latin1')


    df.to_sql(table, con=engine, schema='raw', if_exists='append', index=False, dtype=column_types_raw[table])

print("Data import complete.")


Data import complete.


# Data Cleaning

In [214]:
import re
from sqlalchemy import create_engine, inspect
from sqlalchemy import create_engine, types as sqlalchemytypes
import pandas as pd

# Initialize variables
aankomst_df = pd.DataFrame()
banen_df = pd.DataFrame()
klant_df = pd.DataFrame()
luchthavens_df = pd.DataFrame()
maatschappijen_df = pd.DataFrame()
planning_df = pd.DataFrame()
vertrek_df = pd.DataFrame()
vliegtuig_df = pd.DataFrame()
vliegtuigtype_df = pd.DataFrame()
vlucht_df = pd.DataFrame()
weer_df = pd.DataFrame()

tables_raw = []
tables_clean = []

# Column types specified
column_types_cleansed = {
    'aankomst': {
        "vluchtid": sqlalchemytypes.String,
        "vliegtuigcode": sqlalchemytypes.String,
        "terminal": sqlalchemytypes.String,
        "gate": sqlalchemytypes.String,
        "baan": sqlalchemytypes.String,
        "bezetting": sqlalchemytypes.SmallInteger,
        "vracht": sqlalchemytypes.String,
        "aankomsttijd": sqlalchemytypes.TIMESTAMP,
    },
    'banen': {
        "baannummer": sqlalchemytypes.String,
        "code": sqlalchemytypes.String,
        "naam": sqlalchemytypes.String,
        "lengte": sqlalchemytypes.SmallInteger,
    },
    'klant': {
        "vluchtid": sqlalchemytypes.String,
        "operatie": sqlalchemytypes.Numeric,
        "faciliteiten": sqlalchemytypes.Numeric,
        "shops": sqlalchemytypes.Numeric,
    },
    'luchthavens': {
        "airport": sqlalchemytypes.String,
        "city": sqlalchemytypes.String,
        "country": sqlalchemytypes.String,
        "iata": sqlalchemytypes.String,
        "icao": sqlalchemytypes.String,
        "lat": sqlalchemytypes.Float,
        "lon": sqlalchemytypes.Float,
        "alt": sqlalchemytypes.SmallInteger,
        "tz": sqlalchemytypes.String,
        "dst": sqlalchemytypes.String,
        "tzname": sqlalchemytypes.String,
    },
    'maatschappijen': {
        "name": sqlalchemytypes.String,
        "iata": sqlalchemytypes.String,
        "icao": sqlalchemytypes.String,
    },
    'planning': {
        "vluchtnr": sqlalchemytypes.String,
        "airlinecode": sqlalchemytypes.String,
        "destcode": sqlalchemytypes.String,
        "planterminal": sqlalchemytypes.String,
        "plangate": sqlalchemytypes.String,
        "plantijd": sqlalchemytypes.TIME,
    },
    'vertrek': {
        "vluchtid": sqlalchemytypes.String,
        "vliegtuigcode": sqlalchemytypes.String,
        "terminal": sqlalchemytypes.String,
        "gate": sqlalchemytypes.String,
        "baan": sqlalchemytypes.String,
        "bezetting": sqlalchemytypes.SmallInteger,
        "vracht": sqlalchemytypes.String,
        "vertrektijd": sqlalchemytypes.TIME,
    },
    'vliegtuig': {
        "airlinecode": sqlalchemytypes.String,
        "vliegtuigcode": sqlalchemytypes.String,
        "vliegtuigtype": sqlalchemytypes.String,
        "bouwjaar": sqlalchemytypes.String,
    },
    'vliegtuigtype': {
        "iata": sqlalchemytypes.String,
        "icao": sqlalchemytypes.String,
        "merk": sqlalchemytypes.String,
        "type": sqlalchemytypes.String,
        "wake": sqlalchemytypes.String,
        "cat": sqlalchemytypes.String,
        "capaciteit": sqlalchemytypes.String,
        "vracht": sqlalchemytypes.String,
    },
    'vlucht': {
        "vluchtid": sqlalchemytypes.String,
        "vluchtnr": sqlalchemytypes.String,
        "airlinecode": sqlalchemytypes.String,
        "destcode": sqlalchemytypes.String,
        "vliegtuigcode": sqlalchemytypes.String,
        "datum": sqlalchemytypes.String,
    },
    'weer': {
        "datum": sqlalchemytypes.String,
        "ddvec": sqlalchemytypes.String,
        "fhvec": sqlalchemytypes.String,
        "fg": sqlalchemytypes.String,
        "fhx": sqlalchemytypes.String,
        "fhxh": sqlalchemytypes.String,
        "fhn": sqlalchemytypes.String,
        "fhnh": sqlalchemytypes.String,
        "fxx": sqlalchemytypes.String,
        "fxxh": sqlalchemytypes.String,
        "tg": sqlalchemytypes.String,
        "tn": sqlalchemytypes.String,
        "tnh": sqlalchemytypes.String,
        "tx": sqlalchemytypes.String,
        "txh": sqlalchemytypes.String,
        "t10n": sqlalchemytypes.String,
        "t10nh": sqlalchemytypes.String,
        "sq": sqlalchemytypes.String,
        "sp": sqlalchemytypes.String,
        "q": sqlalchemytypes.String,
        "dr": sqlalchemytypes.String,
        "rh": sqlalchemytypes.String,
        "rhx": sqlalchemytypes.String,
        "rhxh": sqlalchemytypes.String,
        "pg": sqlalchemytypes.String,
        "px": sqlalchemytypes.String,
        "pxh": sqlalchemytypes.String,
        "pn": sqlalchemytypes.String,
        "pnh": sqlalchemytypes.String,
        "vvn": sqlalchemytypes.String,
        "vvnh": sqlalchemytypes.String,
        "vvx": sqlalchemytypes.String,
        "vvxh": sqlalchemytypes.String,
        "ng": sqlalchemytypes.String,
        "ug": sqlalchemytypes.String,
        "ux": sqlalchemytypes.String,
        "uxh": sqlalchemytypes.String,
        "un": sqlalchemytypes.String,
        "unh": sqlalchemytypes.String,
        "ev2": sqlalchemytypes.String,
    }
}

# Database connection details
engine = create_engine('postgresql://postgres:Newpassword@192.168.56.1:5433/postgres')

# Create an inspector to get the list of tables in the schema
inspector = inspect(engine)

# Get the list of tables in the 'raw' schema
tables = inspector.get_table_names(schema='raw')

# Load each table into a DataFrame and assign it to a variable
for table in tables:
    df = pd.read_sql_table(table, con=engine, schema='raw')
    # Make all column names lowercase
    df.columns = df.columns.str.lower()
    globals()[f'{table}_df'] = df
    tables_raw.append(df)

# Regex pattern for detecting any special character
special_char_pattern = re.compile(r'[^a-zA-Z0-9\s]')

#cleaning
for df, table in zip([aankomst_df, banen_df, klant_df, luchthavens_df, maatschappijen_df, planning_df, vertrek_df, vliegtuig_df, vliegtuigtype_df, vlucht_df, weer_df], 
                     ['aankomst', 'banen', 'klant', 'luchthavens', 'maatschappijen', 'planning', 'vertrek', 'vliegtuig', 'vliegtuigtype', 'vlucht', 'weer']):
    
    # Drop rows with completely missing values and remove duplicates
    df.dropna(how='all', inplace=True)
    df.drop_duplicates(inplace=True)

    # Convert TIMESTAMP columns to the correct format
    if 'aankomsttijd' in df.columns:
        df['aankomsttijd'] = pd.to_datetime(df['aankomsttijd'], errors='coerce')
        df = df[df['aankomsttijd'].notnull()]

    if 'vertrektijd' in df.columns:
        df['vertrektijd'] = pd.to_datetime(df['vertrektijd'], errors='coerce')
        df = df[df['vertrektijd'].notnull()]

    # Fill missing gates based on terminal values
    if 'terminal' in df.columns and 'gate' in df.columns:
        df.loc[(df['terminal'] == 'F') & (df['gate'].isnull()), 'gate'] = 'F1'
        df.loc[(df['terminal'] == 'G') & (df['gate'].isnull()), 'gate'] = 'G1'

    # Replace missing 'bezetting' or 'vracht' with 0 if one of them is empty
    if 'bezetting' in df.columns and 'vracht' in df.columns:
        df.loc[df['bezetting'].isnull() & df['vracht'].notnull(), 'bezetting'] = 0
        df.loc[df['vracht'].isnull() & df['bezetting'].notnull(), 'vracht'] = 0
        df = df.dropna(subset=['bezetting', 'vracht'], how='all')

    # Remove rows with any special characters in 'maatschappijen' table
    if table == 'maatschappijen' or table == 'vliegtuig':
        # Exclude 'name' column from the filtering operation
        columns_to_check = df.columns[df.columns != 'name']
        # Apply the filtering operation only to specified columns
        df = df[~df[columns_to_check].apply(lambda row: row.astype(str).str.contains(special_char_pattern).any(), axis=1)]

    # Remove rows where 'icao' column is null
    if 'icao' in df.columns:
        df = df[df['icao'].notnull() & (df['icao'] != '/N')]

    if 'iata' in df.columns:
        df = df[df['iata'].notnull()]

    if 'cat' in df.columns:
        df = df[df['cat'].notnull()]

    if table == 'vliegtuigtype':
        df = df.dropna(thresh=df.shape[1]-1)

    tables_clean.append(df)
    # Write the cleaned DataFrame to SQL
    df.to_sql(table, con=engine, schema='cleansed', if_exists='append', index=False, dtype=column_types_cleansed[table])

# Move deleted records to the 'archived' schema
for df_raw, df_clean, table in zip(tables_raw, tables_clean, ['aankomst', 'banen', 'klant', 'luchthavens', 'maatschappijen', 'planning', 'vertrek', 'vliegtuig', 'vliegtuigtype', 'vlucht', 'weer']):
    deleted_indices = df_raw.index.difference(df_clean.index)
    deleted_records = df_raw.loc[deleted_indices]
    deleted_records_filtered = deleted_records[[col for col in deleted_records.columns if col in column_types_cleansed[table]]]
    
    deleted_records_filtered.to_sql(table, con=engine, schema='archived', if_exists='append', index=False, dtype=column_types_cleansed[table])
    
print("Data cleaning and import complete.")


Data cleaning and import complete.


# Warehouse script + sterschema