In [3]:
import os
import sys
import gzip
import json
from pathlib import Path
import csv

import pandas as pd
import s3fs
import pyarrow as pa
from pyarrow.json import read_json
import pyarrow.parquet as pq
import fastavro
import pygeohash
import snappy
import jsonschema
from jsonschema.exceptions import ValidationError
from jsonschema import validate


endpoint_url='https://storage.budsc.midwest-datascience.com'

current_dir = Path(os.getcwd()).absolute()
schema_dir = current_dir.joinpath('schemas')
results_dir = current_dir.joinpath('results')
results_dir.mkdir(parents=True, exist_ok=True)


def read_jsonl_data():
    s3 = s3fs.S3FileSystem(
        anon=True,
        client_kwargs={
            'endpoint_url': endpoint_url
        }
    )
    src_data_path = 'routes.jsonl.gz'
    with open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            records = [json.loads(line) for line in f.readlines()]
        

    return records

In [4]:
records = read_jsonl_data()

In [5]:
import hashlib
import shutil
import uuid
import math
endpoint_url='https://storage.budsc.midwest-datascience.com'
current_dir = Path(os.getcwd()).absolute()
results_dir = current_dir.joinpath('results')
if results_dir.exists():
    shutil.rmtree(results_dir)
results_dir.mkdir(parents=True, exist_ok=True)

def flatten_record(record):
    flat_record = dict()
    for key, value in record.items():
        if key in ['airline', 'src_airport', 'dst_airport']:
            if isinstance(value, dict):
                for child_key, child_value in value.items():
                    flat_key = '{}_{}'.format(key, child_key)
                    flat_record[flat_key] = child_value
        else:
            flat_record[key] = value
    
    return flat_record
def create_flattened_dataset():
    records = read_jsonl_data()
    parquet_path = results_dir.joinpath('routes-flattened.parquet')
    return pd.DataFrame.from_records([flatten_record(record) for record in records])
df = create_flattened_dataset()
df['key'] = df['src_airport_iata'].astype(str) + df['dst_airport_iata'].astype(str) + df['airline_iata'].astype(str)

In [8]:
# 7.1a
# Create the 'kv_key' column using the first letter of the 'key' column
df['kv_key'] = df['key'].str[0].str.upper()

# Map 'kv_key' to folder names based on partitions
partitions = (
    ('A', 'A'), ('B', 'B'), ('C', 'D'), ('E', 'F'),
    ('G', 'H'), ('I', 'J'), ('K', 'L'), ('M', 'M'),
    ('N', 'N'), ('O', 'P'), ('Q', 'R'), ('S', 'T'),
    ('U', 'U'), ('V', 'V'), ('W', 'X'), ('Y', 'Z')
)

folder_mapping = {}
for folder_range in partitions:
    start, end = folder_range
    folder_name = f'{start}-{end}'
    folder_mapping[folder_range] = folder_name

df['kv_key'] = df['kv_key'].apply(lambda x: next((folder_mapping[part] for part in partitions if part[0] <= x <= part[1]), None))

# Save the partitioned dataset using to_parquet with partition_cols=['kv_key']
df.to_parquet('results/kv', partition_cols=['kv_key'], index=False)

In [9]:
# 7.1b
# Function to create SHA256 hash of the input key and return a hexadecimal string representation of the hash
def hash_key(key):
    m = hashlib.sha256()
    m.update(str(key).encode('utf-8'))
    return m.hexdigest()

# Step 4: Create the 'hashed' column using the hash function on the 'key' column
df['hashed'] = df['key'].apply(hash_key)

# Step 5: Extract the first character of the hexadecimal hash to create the partition column
df['hash_partition'] = df['hashed'].str[0].str.upper()

# Step 6: Save the partitioned dataset using to_parquet with partition_cols=['hash_partition']
df.to_parquet('results/hash', partition_cols=['hash_partition'], index=False)

In [18]:
# 7.1c
from geopy.distance import geodesic

# Function to calculate the distance between two geographical coordinates
def calculate_distance(row, data_center_coords):
    src_coords = (row['src_latitude'], row['src_longitude'])
    distances = [geodesic(src_coords, data_center_coords[dc]).miles for dc in data_center_coords]
    return list(data_center_coords.keys())[distances.index(min(distances))]

# Data center coordinates
data_center_coords = {
    'central': (41.1544433, -96.0422378),    # Papillion, NE
    'east': (39.08344, -77.6497145),        # Loudoun County, Virginia
    'west': (45.5945645, -121.1786823)      # The Dalles, Oregon
}

# Add geographical coordinates for source airports
df['src_latitude'] = df['src_airport_latitude']
df['src_longitude'] = df['src_airport_longitude']

# Drop rows with missing latitude and longitude values
df.dropna(subset=['src_latitude', 'src_longitude'], inplace=True)

# Calculate the closest data center to each source airport and create the 'location' column
df['location'] = df.apply(lambda row: calculate_distance(row, data_center_coords), axis=1)

# Save the partitioned dataset using to_parquet with partition_cols=['location']
df.to_parquet('results/geo', partition_cols=['location'], index=False)

In [21]:
#7.1d
def balance_partitions(keys, num_partitions):
    # Sort the keys
    sorted_keys = sorted(keys)

    # Calculate the number of keys in each partition
    keys_per_partition = len(keys) // num_partitions
    remainder = len(keys) % num_partitions

    # Initialize the starting index of each partition
    partition_indices = [0]
    for i in range(1, num_partitions):
        partition_indices.append(partition_indices[-1] + keys_per_partition + (1 if i <= remainder else 0))

    # Create partitions
    partitions = [sorted_keys[partition_indices[i]:partition_indices[i + 1]] for i in range(num_partitions - 1)]
    # Add the last partition separately to handle the remaining keys
    partitions.append(sorted_keys[partition_indices[-1]:])

    return partitions

In [22]:
keys = [10, 20, 5, 15, 25, 30, 35, 40, 45, 50]
num_partitions = 3

partitions = balance_partitions(keys, num_partitions)
print(partitions)

[[5, 10, 15, 20], [25, 30, 35], [40, 45, 50]]
