# Assignment 3

Import libraries and define common helper functions

In [1]:
import os
import sys
import gzip
import json
from pathlib import Path
import csv

import pandas as pd
import s3fs
import pyarrow as pa
from pyarrow.json import read_json
import pyarrow.parquet as pq
import fastavro
import pygeohash
import snappy
import jsonschema
from jsonschema.exceptions import ValidationError


endpoint_url='https://storage.budsc.midwest-datascience.com'

current_dir = Path(os.getcwd()).absolute()
schema_dir = current_dir.joinpath('schemas')
results_dir = current_dir.joinpath('results')
results_dir.mkdir(parents=True, exist_ok=True)


def read_jsonl_data():
    s3 = s3fs.S3FileSystem(
        anon=True,
        client_kwargs={
            'endpoint_url': endpoint_url
        }
    )
    src_data_path = 'data/processed/openflights/routes.jsonl.gz'
    with s3.open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            records = [json.loads(line) for line in f.readlines()]
        

    return records

Load the records from https://storage.budsc.midwest-datascience.com/data/processed/openflights/routes.jsonl.gz 

In [2]:
records = read_jsonl_data()

In [3]:
records[0]

{'airline': {'airline_id': 410,
  'name': 'Aerocondor',
  'alias': 'ANA All Nippon Airways',
  'iata': '2B',
  'icao': 'ARD',
  'callsign': 'AEROCONDOR',
  'country': 'Portugal',
  'active': True},
 'src_airport': {'airport_id': 2965,
  'name': 'Sochi International Airport',
  'city': 'Sochi',
  'country': 'Russia',
  'iata': 'AER',
  'icao': 'URSS',
  'latitude': 43.449902,
  'longitude': 39.9566,
  'altitude': 89,
  'timezone': 3.0,
  'dst': 'N',
  'tz_id': 'Europe/Moscow',
  'type': 'airport',
  'source': 'OurAirports'},
 'dst_airport': {'airport_id': 2990,
  'name': 'Kazan International Airport',
  'city': 'Kazan',
  'country': 'Russia',
  'iata': 'KZN',
  'icao': 'UWKD',
  'latitude': 55.606201171875,
  'longitude': 49.278701782227,
  'altitude': 411,
  'timezone': 3.0,
  'dst': 'N',
  'tz_id': 'Europe/Moscow',
  'type': 'airport',
  'source': 'OurAirports'},
 'codeshare': False,
 'equipment': ['CR2']}

## 3.1

### 3.1.a JSON Schema

In [5]:
from jsonschema import validate
def validate_jsonl_data(records):
    schema_path = schema_dir.joinpath('routes-schema.json')
    with open(schema_path) as f:
        schema = json.load(f)
        
    validation_csv_path = 'results/validation-json.csv'
    with open(validation_csv_path, 'w') as f: 
        for i, record in enumerate(records):
            try:
                ## TODO: Validate record
                validate(instance=record, schema=schema)
                f.write(f"Record: {i} is valid\r\n")
                pass
            except ValidationError as e:
                ## Print message if invalid record
                f.write(f"Error: {e.message}; failed validating {e.validator} in schema {e.schema_path}\r\n")
                print(e)
                pass
            

validate_jsonl_data(records)

### 3.1.b Avro

In [27]:
from fastavro import writer, reader, parse_schema
from fastavro.schema import load_schema
def create_avro_dataset(records):
    schema_path = schema_dir.joinpath('routes.avsc')
    data_path = results_dir.joinpath('routes.avro')
    ## TODO: Use fastavro to create Avro dataset
    #parse the schema
    parsed_schema = load_schema(schema_path)
    parsed_schema = parse_schema(parsed_schema)
    #open the avro file for writing
    with open(data_path,'wb') as out:
        #write the records to the output .avro file
        writer(out, parsed_schema,records,codec='deflate')
        
create_avro_dataset(records)

In [30]:
#read the avro file
data_path = results_dir.joinpath('routes.avro')
with open(data_path,'rb') as fo:
    avro_reader = reader(fo)
    for record in avro_reader:
        print(record)

{'airline': {'airline_id': 410, 'name': 'Aerocondor', 'alias': 'ANA All Nippon Airways', 'iata': '2B', 'icao': 'ARD', 'callsign': 'AEROCONDOR', 'country': 'Portugal', 'active': True}, 'src_airport': {'airport_id': 2965, 'name': 'Sochi International Airport', 'city': 'Sochi', 'iata': 'AER', 'icao': 'URSS', 'latitude': 43.449902, 'longitude': 39.9566, 'timezone': 3.0, 'dst': 'N', 'tz_id': 'Europe/Moscow', 'type': 'airport', 'source': 'OurAirports'}, 'dst_airport': {'airport_id': 2990, 'name': 'Kazan International Airport', 'city': 'Kazan', 'iata': 'KZN', 'icao': 'UWKD', 'latitude': 55.606201171875, 'longitude': 49.278701782227, 'timezone': 3.0, 'dst': 'N', 'tz_id': 'Europe/Moscow', 'type': 'airport', 'source': 'OurAirports'}, 'codeshare': False, 'stops': 0, 'equipment': ['CR2']}
{'airline': {'airline_id': 410, 'name': 'Aerocondor', 'alias': 'ANA All Nippon Airways', 'iata': '2B', 'icao': 'ARD', 'callsign': 'AEROCONDOR', 'country': 'Portugal', 'active': True}, 'src_airport': {'airport_id'

{'airline': {'airline_id': 17885, 'name': 'Interjet (ABC Aerolineas)', 'alias': 'nan', 'iata': '4O', 'icao': 'IBS', 'callsign': 'INTERJET', 'country': 'Mexico', 'active': True}, 'src_airport': {'airport_id': 1848, 'name': 'General Francisco Javier Mina International Airport', 'city': 'Tampico', 'iata': 'TAM', 'icao': 'MMTM', 'latitude': 22.2964000702, 'longitude': -97.86589813229999, 'timezone': -6.0, 'dst': 'S', 'tz_id': 'America/Mexico_City', 'type': 'airport', 'source': 'OurAirports'}, 'dst_airport': {'airport_id': 1824, 'name': 'Licenciado Benito Juarez International Airport', 'city': 'Mexico City', 'iata': 'MEX', 'icao': 'MMMX', 'latitude': 19.4363, 'longitude': -99.072098, 'timezone': -6.0, 'dst': 'S', 'tz_id': 'America/Mexico_City', 'type': 'airport', 'source': 'OurAirports'}, 'codeshare': False, 'stops': 0, 'equipment': ['320']}
{'airline': {'airline_id': 17885, 'name': 'Interjet (ABC Aerolineas)', 'alias': 'nan', 'iata': '4O', 'icao': 'IBS', 'callsign': 'INTERJET', 'country': 

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



{'airline': {'airline_id': 4296, 'name': 'Ryanair', 'alias': 'Qantas Airways', 'iata': 'FR', 'icao': 'RYR', 'callsign': 'RYANAIR', 'country': 'Ireland', 'active': True}, 'src_airport': {'airport_id': 304, 'name': 'Brussels South Charleroi Airport', 'city': 'Charleroi', 'iata': 'CRL', 'icao': 'EBCI', 'latitude': 50.459202000000005, 'longitude': 4.4538199999999994, 'timezone': 1.0, 'dst': 'E', 'tz_id': 'Europe/Brussels', 'type': 'airport', 'source': 'OurAirports'}, 'dst_airport': {'airport_id': 1222, 'name': 'Girona Airport', 'city': 'Gerona', 'iata': 'GRO', 'icao': 'LEGE', 'latitude': 41.901000976999995, 'longitude': 2.7605500221, 'timezone': 1.0, 'dst': 'E', 'tz_id': 'Europe/Madrid', 'type': 'airport', 'source': 'OurAirports'}, 'codeshare': False, 'stops': 0, 'equipment': ['738']}
{'airline': {'airline_id': 4296, 'name': 'Ryanair', 'alias': 'Qantas Airways', 'iata': 'FR', 'icao': 'RYR', 'callsign': 'RYANAIR', 'country': 'Ireland', 'active': True}, 'src_airport': {'airport_id': 304, 'na

{'airline': {'airline_id': 4296, 'name': 'Ryanair', 'alias': 'Qantas Airways', 'iata': 'FR', 'icao': 'RYR', 'callsign': 'RYANAIR', 'country': 'Ireland', 'active': True}, 'src_airport': {'airport_id': 596, 'name': 'Cork Airport', 'city': 'Cork', 'iata': 'ORK', 'icao': 'EICK', 'latitude': 51.84130096435547, 'longitude': -8.491109848022461, 'timezone': 0.0, 'dst': 'E', 'tz_id': 'Europe/Dublin', 'type': 'airport', 'source': 'OurAirports'}, 'dst_airport': {'airport_id': 548, 'name': 'London Stansted Airport', 'city': 'London', 'iata': 'STN', 'icao': 'EGSS', 'latitude': 51.8849983215, 'longitude': 0.23499999940400002, 'timezone': 0.0, 'dst': 'E', 'tz_id': 'Europe/London', 'type': 'airport', 'source': 'OurAirports'}, 'codeshare': False, 'stops': 0, 'equipment': ['738']}
{'airline': {'airline_id': 4296, 'name': 'Ryanair', 'alias': 'Qantas Airways', 'iata': 'FR', 'icao': 'RYR', 'callsign': 'RYANAIR', 'country': 'Ireland', 'active': True}, 'src_airport': {'airport_id': 596, 'name': 'Cork Airport

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



{'airline': {'airline_id': 2297, 'name': 'easyJet', 'alias': 'EasyJet Airline', 'iata': 'U2', 'icao': 'EZY', 'callsign': 'EASY', 'country': 'United Kingdom', 'active': True}, 'src_airport': {'airport_id': 1529, 'name': 'Milano Linate Airport', 'city': 'Milan', 'iata': 'LIN', 'icao': 'LIML', 'latitude': 45.445099, 'longitude': 9.27674, 'timezone': 1.0, 'dst': 'E', 'tz_id': 'Europe/Rome', 'type': 'airport', 'source': 'OurAirports'}, 'dst_airport': {'airport_id': 1555, 'name': 'Leonardo da Vinci–Fiumicino Airport', 'city': 'Rome', 'iata': 'FCO', 'icao': 'LIRF', 'latitude': 41.800277799999996, 'longitude': 12.238888900000001, 'timezone': 1.0, 'dst': 'E', 'tz_id': 'Europe/Rome', 'type': 'airport', 'source': 'OurAirports'}, 'codeshare': False, 'stops': 0, 'equipment': ['319']}
{'airline': {'airline_id': 2297, 'name': 'easyJet', 'alias': 'EasyJet Airline', 'iata': 'U2', 'icao': 'EZY', 'callsign': 'EASY', 'country': 'United Kingdom', 'active': True}, 'src_airport': {'airport_id': 1529, 'name':

{'airline': {'airline_id': 5209, 'name': 'United Airlines', 'alias': 'TWA', 'iata': 'UA', 'icao': 'UAL', 'callsign': 'UNITED', 'country': 'United States', 'active': True}, 'src_airport': {'airport_id': 3550, 'name': 'George Bush Intercontinental Houston Airport', 'city': 'Houston', 'iata': 'IAH', 'icao': 'KIAH', 'latitude': 29.98439979553223, 'longitude': -95.34140014648438, 'timezone': -6.0, 'dst': 'A', 'tz_id': 'America/Chicago', 'type': 'airport', 'source': 'OurAirports'}, 'dst_airport': {'airport_id': 3576, 'name': 'Miami International Airport', 'city': 'Miami', 'iata': 'MIA', 'icao': 'KMIA', 'latitude': 25.79319953918457, 'longitude': -80.29060363769531, 'timezone': -5.0, 'dst': 'A', 'tz_id': 'America/New_York', 'type': 'airport', 'source': 'OurAirports'}, 'codeshare': False, 'stops': 0, 'equipment': ['319', '738', '73G', '320', '739']}
{'airline': {'airline_id': 5209, 'name': 'United Airlines', 'alias': 'TWA', 'iata': 'UA', 'icao': 'UAL', 'callsign': 'UNITED', 'country': 'United 

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



### 3.1.c Parquet

In [6]:
import pyarrow.parquet as pq
from pyarrow import json
def create_parquet_dataset():
    src_data_path = 'data/processed/openflights/routes.jsonl.gz'
    parquet_output_path = results_dir.joinpath('routes.parquet')
    s3 = s3fs.S3FileSystem(
        anon=True,
        client_kwargs={
            'endpoint_url': endpoint_url
        }
    )
    
    with s3.open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            #pass
            ## TODO: Use Apache Arrow to create Parquet table and save the dataset
            #create table from json file
            table = read_json(f)
            #write to parquet file
            pq.write_table(table,parquet_output_path)

create_parquet_dataset()

### 3.1.d Protocol Buffers

In [39]:
sys.path.insert(0, os.path.abspath('routes_pb2'))

import routes_pb2

def _airport_to_proto_obj(airport):
    obj = routes_pb2.Airport()
    if airport is None:
        return None
    if airport.get('airport_id') is None:
        return None

    obj.airport_id = airport.get('airport_id')
    if airport.get('name'):
        obj.name = airport.get('name')
    if airport.get('city'):
        obj.city = airport.get('city')
    if airport.get('iata'):
        obj.iata = airport.get('iata')
    if airport.get('icao'):
        obj.icao = airport.get('icao')
    if airport.get('altitude'):
        obj.altitude = airport.get('altitude')
    if airport.get('timezone'):
        obj.timezone = airport.get('timezone')
    if airport.get('dst'):
        obj.dst = airport.get('dst')
    if airport.get('tz_id'):
        obj.tz_id = airport.get('tz_id')
    if airport.get('type'):
        obj.type = airport.get('type')
    if airport.get('source'):
        obj.source = airport.get('source')

    obj.latitude = airport.get('latitude')
    obj.longitude = airport.get('longitude')

    return obj


def _airline_to_proto_obj(airline):
    obj = routes_pb2.Airline()
    ## TODO: Create an Airline obj using Protocol Buffers API
    if airline is None:
        return None
    if airline.get('airline_id') is None:
        return None
    
    #assign all fields from the airline schema
    obj.airline_id = airline.get('airline_id')
    if airline.get('name'):
        obj.name = airline.get('name')
    if airline.get('alias'):
        obj.alias = airline.get('alias')
    if airline.get('iata'):
        obj.iata = airline.get('iata')
    if airline.get('icao'):
        obj.icao = airline.get('icao')
    if airline.get('callsign'):
        obj.callsign = airline.get('callsign')
    if airline.get('country'):
        obj.country = airline.get('country')
    
    #assigning boolean value
    obj.active = airline.get('active')
        
    return obj


def create_protobuf_dataset(records):
    routes = routes_pb2.Routes()
    for record in records:
        route = routes_pb2.Route()
        ## TODO: Implement the code to create the Protocol Buffers Dataset
        
        #copy 'airline' data
        airline = _airline_to_proto_obj(record.get('airline'))
        if airline:
            route.airline.CopyFrom(airline)
            
        #copy 'src_airport' data
        src_airport = _airport_to_proto_obj(record.get('src_airport'))
        if src_airport:
            route.src_airport.CopyFrom(src_airport)
        
        #copy 'dst_airport'data
        dst_airport = _airport_to_proto_obj(record.get('dst_airport'))
        if dst_airport:
            route.dst_airport.CopyFrom(dst_airport)
        
        #assign codeshare field
        route.codeshare = record.get('codeshare')
            
        #copy equipment data
        equipment = record.get('equipment')
        #use extend function since equipment objects are lists
        if equipment:
            route.equipment.extend(equipment)
            
        #append route of airline and airports to the routes dataset
        routes.route.append(route)

    data_path = results_dir.joinpath('routes.pb')

    with open(data_path, 'wb') as f:
        f.write(routes.SerializeToString())
        
    compressed_path = results_dir.joinpath('routes.pb.snappy')
    
    with open(compressed_path, 'wb') as f:
        f.write(snappy.compress(routes.SerializeToString()))
        
create_protobuf_dataset(records)

### 3.1.e Output Sizes

In [6]:
#gathering paths of output files
validation_csv_path = 'results/validation-json.csv'
avro_data_path = results_dir.joinpath('routes.avro')
parquet_output_path = results_dir.joinpath('routes.parquet')
uncompressed_buff_path = results_dir.joinpath('routes.pb')
compressed_buff_path = results_dir.joinpath('routes.pb.snappy')

#create artifacts
output_paths = [validation_csv_path,avro_data_path,parquet_output_path,uncompressed_buff_path,compressed_buff_path]
output_file_sizes = {}

#loop throughout output paths and calculate file sizes to add to dict
for file in output_paths:
    output_file_sizes[file] = os.path.getsize(file)

comparison_csv_path = 'results/comparison.csv'
with open(comparison_csv_path, 'w') as f:
    for k,v in output_file_sizes.items():
        #get file name from the output paths objects
        file_name = str(k).split("/").pop()
        f.write(f"File size of {file_name}: {v} bytes = {v/1000000} MB\n\r\n")

## 3.2

### 3.2.a Simple Geohash Index

In [53]:
import pygeohash as pgh
# Assignment 3.2.a
def create_hash_dirs(records):
    geoindex_dir = results_dir.joinpath('geoindex')
    geoindex_dir.mkdir(exist_ok=True, parents=True)
    hashes = []
    for record in records:
        src_airport = record.get('src_airport', {})
        if src_airport:
            latitude = src_airport.get('latitude')
            longitude = src_airport.get('longitude')
            if latitude and longitude:
                ## TODO: use pygeohash.encode() to assign geohashes to the records and complete the hashes list
                #use encode function to get geohash for each latitude and longitude pair
                hashes.append(pgh.encode(latitude,longitude))

    hashes.sort()
    three_letter = sorted(list(set([entry[:3] for entry in hashes])))
    hash_index = {value: [] for value in three_letter}

    for record in records:
        geohash = record.get('geohash')
        if geohash:
            hash_index[geohash[:3]].append(record)

    for key, values in hash_index.items():
        output_dir = geoindex_dir.joinpath(str(key[:1])).joinpath(str(key[:2]))
        output_dir.mkdir(exist_ok=True, parents=True)
        output_path = output_dir.joinpath('{}.jsonl.gz'.format(key))
        with gzip.open(output_path, 'w') as f:
            json_output = '\n'.join([json.dumps(value) for value in values])
            f.write(json_output.encode('utf-8'))

#call function for records from json file
create_hash_dirs(records)

### 3.2.b Simple Search Feature

In [18]:
import pygeohash as pgh
from iteration_utilities import unique_everseen
def airport_search(latitude, longitude):
    #geohash for given airport coordinates
    geohash = pgh.encode(latitude,longitude)
    AirportDistances = []
    recout = []
    #traverse for all elements and get unique airports
    for record in records:
        for key, value in record.items():
            if key == 'src_airport' and value is not None:
                if value not in recout:
                    recout.append(value)
    
    #doing distance compare with other airports and adding dictionaries to list
    for record in recout:
        dstname = record['name']
        dstlat = record['latitude']
        dstlong = record['longitude']
        geohash_dst = pgh.encode(dstlat,dstlong)
        #conversion to km by diving by 1000
        distm_dstgeo1 = pgh.geohash_approximate_distance(geohash,geohash_dst)/1000
        airport_dist = {
            "Airport":dstname,
            "Geohash":geohash_dst,
            "Latitude":dstlat,
            "Longitude":dstlong,
            "Distance":distm_dstgeo1
        }
        #make sure same airport is not added to the AirportDistances list since it will be the closest to itself
        if dstlat == latitude and dstlong == longitude:
            continue
        #add other airports to list
        else:
            AirportDistances.append(airport_dist)
        
    AirportDistancesOut = list(unique_everseen(AirportDistances))
    
    #define minimum distance for comparison purposes in below for loop
    min_distance = AirportDistancesOut[0]['Distance']/1000
    min_airport_name = AirportDistancesOut[0]['Airport']
    
    #loop through airports and identify ones in close proximy to given airport
    for i in range(len(AirportDistancesOut)):
        #find closest airport with smallest distance in km to airport
        if (AirportDistancesOut[i]['Distance']/1000) < min_distance :
            min_distance = AirportDistancesOut[i]['Distance']/1000
            min_airport_name = AirportDistancesOut[i]['Airport']
            
    print(f"The closest airport is {min_airport_name} and it is {min_distance} km away.\r\n")
    
airport_search(41.1499988, -95.91779)

The closest airport is Eppley Airfield and it is 0.019545000000000003 km away.



In [19]:
#testing another airport --> Kazan International Airport
airport_search(55.606201171875,49.278701782227)

The closest airport is Begishevo Airport and it is 0.625441 km away.

