In [12]:
def format_for_db(aircraft: dict) -> tuple:
    """
    Format aircraft data for database insertion.
    
    Parameters:
    - aircraft: A dictionary containing aircraft data.
    
    Returns:
    - A tuple formatted for database insertion, ensuring that the data types
      are consistent with the database schema.
    """
    try:
        altitude_baro = int(aircraft.get("alt_baro", 0))  # Try to convert 'alt_baro' to integer, default to 0
    except ValueError:
        altitude_baro = 0  # Default to 0 if conversion fails

    emergency_str = aircraft.get("emergency", "").lower()  # Get emergency status and convert to lowercase
    had_emergency = emergency_str in ['true', 't', '1', 'yes']  # Determine boolean value for emergency status

    # Prepare and return the tuple formatted for database insertion
    return (
        aircraft.get("hex", ""),  # ICAO aircraft address
        aircraft.get("r", ""),  # Registration
        aircraft.get("t", ""),  # Aircraft type
        altitude_baro,  # Barometric altitude
        float(aircraft.get("gs", 0.0)),  # Ground speed, default to 0.0
        had_emergency,  # Emergency status
        pd.to_datetime(aircraft.get("timestamp")),  # Timestamp converted to datetime object
        float(aircraft.get("lat", 0.0)),  # Latitude, default to 0.0
        float(aircraft.get("lon", 0.0)),  # Longitude, default to 0.0
        int(aircraft.get("messages", 0)),  # Number of ADS-B messages
        float(aircraft.get("now", 0.0))  # Current time, default to 0.0
    )

In [13]:
import psycopg2
from psycopg2.extras import execute_values
import boto3
import json
import pandas as pd
import logging
import gzip

# Database configuration variables
DB_HOST = 'localhost'
DB_PORT = 5433
DB_NAME = 'postgres'
DB_PASSWORD = 'postgres'
DB_USERNAME = 'postgres'

# Hardcoded AWS S3 credentials
AWS_ACCESS_KEY_ID = ''
AWS_SECRET_ACCESS_KEY = ''
AWS_SESSION_TOKEN = ''
S3_BUCKET_NAME = 'bdi-aircraft-rafaelbraga'

def download_and_prepare_data():
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)
    s3_client = boto3.client(
        's3',
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        aws_session_token=AWS_SESSION_TOKEN
    )

    logger.info("Starting data download...")
    response = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME)
    all_files = [obj['Key'] for obj in response['Contents']] if 'Contents' in response else []

    formatted_data_list = []

    for file in all_files:
        if file.endswith('.json.gz'):
            obj = s3_client.get_object(Bucket=S3_BUCKET_NAME, Key=file)
            with gzip.GzipFile(fileobj=obj['Body']) as gzipfile:
                file_content = gzipfile.read().decode('utf-8')
            data_dict = json.loads(file_content)
            formatted_data = [format_for_db(ac) for ac in data_dict.get('aircraft', [])]
            formatted_data_list.extend(formatted_data)

    logger.info("Data download and preparation completed successfully.")
    return formatted_data_list


In [14]:
def initialize_database_and_store_data(formatted_data):
    """
    Initializes the database by creating a table for aircraft data if it doesn't already exist,
    and then stores the formatted aircraft data in the database.

    Parameters:
    - formatted_data: A list of tuples representing formatted aircraft data.
    """
    conn = psycopg2.connect(
        dbname=DB_NAME,
        user=DB_USERNAME,
        password=DB_PASSWORD,
        host=DB_HOST,
        port=DB_PORT
    )

    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS aircraft_data (
            icao VARCHAR(255) UNIQUE,
            registration VARCHAR(255),
            aircraft_type VARCHAR(255),
            altitude_baro INTEGER,
            ground_speed DOUBLE PRECISION,
            had_emergency BOOLEAN,
            timestamp TIMESTAMP,
            lat DOUBLE PRECISION,
            lon DOUBLE PRECISION,
            messages BIGINT,
            now DOUBLE PRECISION
        )
    ''')
    conn.commit()

    if formatted_data:
        query = """
        INSERT INTO aircraft_data (icao, registration, aircraft_type, altitude_baro, ground_speed, had_emergency, timestamp, lat, lon, messages, now)
        VALUES %s ON CONFLICT (icao) DO NOTHING;
        """
        execute_values(cursor, query, formatted_data, template=None, page_size=100)
        conn.commit()

    cursor.close()
    conn.close()
    print("Database initialization and data storage completed successfully.")

In [15]:
# Call the functions to download data and store it in the database
formatted_data = download_and_prepare_data()
initialize_database_and_store_data(formatted_data)

Database initialization and data storage completed successfully.


In [17]:
import psycopg2
import pandas as pd

# Your database connection parameters
DB_HOST = 'localhost'
DB_PORT = 5433
DB_NAME = 'postgres'
DB_PASSWORD = 'postgres'
DB_USERNAME = 'postgres'

# Connect to your database
conn = psycopg2.connect(
    dbname=DB_NAME,
    user=DB_USERNAME,
    password=DB_PASSWORD,
    host=DB_HOST,
    port=DB_PORT
)

# Create a new cursor
cur = conn.cursor()

# Execute a SQL query to retrieve the first 10 rows from your table
sql_query = "SELECT * FROM aircraft_data LIMIT 10;"
cur.execute(sql_query)

# Fetch the results
rows = cur.fetchall()

# Optionally, convert to a pandas DataFrame for nicer formatting in Jupyter Notebook
df = pd.DataFrame(rows, columns=[desc[0] for desc in cur.description])

# Display the DataFrame
display(df)

# Close the cursor and the connection
cur.close()
conn.close()

Unnamed: 0,icao,registration,aircraft_type,altitude_baro,ground_speed,had_emergency,timestamp,lat,lon,messages,now
0,aaffa2,N808AN,B788,37975,407.0,False,,58.719498,-175.694336,47993703,0.0
1,a65800,N508DN,A359,39000,416.0,False,,57.828506,-173.738586,46887580,0.0
2,71c085,HL8085,B789,36000,422.9,False,,62.407232,-173.663635,41100097,0.0
3,c01754,C-FIVR,B77W,30000,428.0,False,,55.97731,-171.224503,64708413,0.0
4,a4dfa6,N413DX,A339,37000,383.0,False,,55.986607,-170.797745,58531834,0.0
5,ab3856,N822AN,B789,38000,407.0,False,,60.002529,-169.682755,57111970,0.0
6,869232,JA788A,B77W,32000,394.0,False,,60.08083,-169.276069,29588563,0.0
7,86e800,JA875A,B789,38000,414.0,False,,56.130314,-169.066544,42993801,0.0
8,86e778,JA871A,B789,37975,449.6,False,,63.682484,-168.199475,41631434,0.0
9,a51e24,N429MC,B744,35000,551.0,False,,59.46257,-167.749865,30523236,0.0


In [19]:
import sqlite3
import json

# Define your local database path
db_path = "/Users/vanessabragacosta/Documents/S8_Clone/big-data-infrastructure-exercises/airflow/dags/data/aircraft_type/aircraft_database.db"

# Define the path to your JSON files
directory_path = "/Users/vanessabragacosta/Documents/S8_Clone/big-data-infrastructure-exercises/airflow/dags/data/aircraft_type"

# Connect to SQLite database (will be created if it doesn't exist)
conn = sqlite3.connect(db_path)
c = conn.cursor()

# Create tables
c.execute('''CREATE TABLE IF NOT EXISTS aircraft_basic_info
             (icao TEXT PRIMARY KEY, reg TEXT, icaotype TEXT, year TEXT, 
             manufacturer TEXT, model TEXT, ownop TEXT, faa_pia BOOLEAN, faa_ladd BOOLEAN, 
             short_type TEXT, mil BOOLEAN)''')
c.execute('''CREATE TABLE IF NOT EXISTS fuel_consumption
             (icao TEXT PRIMARY KEY, name TEXT, galph INTEGER, category TEXT, source TEXT)''')

# Insert data into aircraft_basic_info
with open(f'{directory_path}/basic-ac-db.json') as f:
    for line in f:
        data = json.loads(line)
        c.execute('''INSERT OR IGNORE INTO aircraft_basic_info 
                     VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', 
                  (data['icao'], data['reg'], data['icaotype'], data['year'], 
                   data['manufacturer'], data['model'], data['ownop'], 
                   data['faa_pia'], data['faa_ladd'], data['short_type'], data['mil']))

# Insert data into fuel_consumption
with open(f'{directory_path}/aircraft_type_fuel_consumption_rates.json') as f:
    fuel_data = json.load(f)
    for icao, info in fuel_data.items():
        c.execute('''INSERT OR IGNORE INTO fuel_consumption 
                     VALUES (?, ?, ?, ?, ?)''', 
                  (icao, info['name'], info['galph'], info['category'], info['source']))

# Commit and close
conn.commit()
conn.close()
