# Assignment 3

Import libraries and define common helper functions

In [26]:
import os
import sys
import gzip
import json
from pathlib import Path
import csv

import pandas as pd
import s3fs
import pyarrow as pa
from pyarrow.json import read_json
import pyarrow.parquet as pq
import fastavro
import pygeohash
import snappy
import jsonschema
from jsonschema.exceptions import ValidationError
from genson import SchemaBuilder

endpoint_url='https://storage.budsc.midwest-datascience.com'

current_dir = Path(os.getcwd()).absolute()
schema_dir = current_dir.joinpath('schemas')
results_dir = current_dir.joinpath('results')
results_dir.mkdir(parents=True, exist_ok=True)

def build_schema(datastore):

    builder = SchemaBuilder()
    builder.add_object(datastore )

    with open("schemas/routes-schema.json", "w") as schema_file:
        json.dump(builder.to_schema(), schema_file)

def read_jsonl_data():
    s3 = s3fs.S3FileSystem(
        anon=True,
        client_kwargs={
            'endpoint_url': endpoint_url
        }
    )
    src_data_path = 'data/processed/openflights/routes.jsonl.gz'
    with s3.open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            records = [json.loads(line) for line in f.readlines()]
        
    build_schema(records)
    
    return records

Load the records from https://storage.budsc.midwest-datascience.com/data/processed/openflights/routes.jsonl.gz 

In [27]:
records = read_jsonl_data()

## 3.1

### 3.1.a JSON Schema

In [56]:
def validate_jsonl_data(records):
    schema_path = schema_dir.joinpath('routes-schema.json')
    validation_csv_path = results_dir.joinpath('validation-results.csv')
    with open(schema_path) as f:
        schema = json.load(f)
        
    with open(validation_csv_path, 'w') as f:    
        for i, record in enumerate(records):
            try:
                jsonschema.validate(instance=record, schema=schema)
            except ValidationError as e:
                f.write(str(e))
            

validate_jsonl_data(records)

### 3.1.b Avro

In [31]:
def create_avro_dataset(records):
    schema_path = schema_dir.joinpath('routes.avsc')
    data_path = results_dir.joinpath('routes.avro')
    
    # Read schema
    with open(schema_path) as fo:
        schema = json.loads(fo.read())
    
    parsed_schema = fastavro.parse_schema(schema)

    # Write dataset
    with open(data_path, 'wb') as out:
        fastavro.writer(out, parsed_schema, records)
       
create_avro_dataset(records)

In [33]:
#read in avro dataset to validate
# Reading
with open('results/routes.avro', 'rb') as fo:
    for record in fastavro.reader(fo):
        print(record)

{'airline': {'airline_id': 410, 'name': 'Aerocondor', 'alias': 'ANA All Nippon Airways', 'iata': '2B', 'icao': 'ARD', 'callsign': 'AEROCONDOR', 'country': 'Portugal', 'active': True}, 'src_airport': {'airport_id': 2965, 'name': 'Sochi International Airport', 'city': 'Sochi', 'iata': 'AER', 'icao': 'URSS', 'latitude': 43.449902, 'longitude': 39.9566, 'timezone': 3.0, 'dst': 'N', 'tz_id': 'Europe/Moscow', 'type': 'airport', 'source': 'OurAirports'}, 'dst_airport': {'airport_id': 2990, 'name': 'Kazan International Airport', 'city': 'Kazan', 'iata': 'KZN', 'icao': 'UWKD', 'latitude': 55.606201171875, 'longitude': 49.278701782227, 'timezone': 3.0, 'dst': 'N', 'tz_id': 'Europe/Moscow', 'type': 'airport', 'source': 'OurAirports'}, 'codeshare': False, 'stops': 0, 'equipment': ['CR2']}
{'airline': {'airline_id': 410, 'name': 'Aerocondor', 'alias': 'ANA All Nippon Airways', 'iata': '2B', 'icao': 'ARD', 'callsign': 'AEROCONDOR', 'country': 'Portugal', 'active': True}, 'src_airport': {'airport_id'

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



{'airline': {'airline_id': 2009, 'name': 'Delta Air Lines', 'alias': 'CSA Czech Airlines', 'iata': 'DL', 'icao': 'DAL', 'callsign': 'DELTA', 'country': 'United States', 'active': True}, 'src_airport': {'airport_id': 3830, 'name': "Chicago O'Hare International Airport", 'city': 'Chicago', 'iata': 'ORD', 'icao': 'KORD', 'latitude': 41.9786, 'longitude': -87.9048, 'timezone': -6.0, 'dst': 'A', 'tz_id': 'America/Chicago', 'type': 'airport', 'source': 'OurAirports'}, 'dst_airport': {'airport_id': 507, 'name': 'London Heathrow Airport', 'city': 'London', 'iata': 'LHR', 'icao': 'EGLL', 'latitude': 51.4706, 'longitude': -0.461941, 'timezone': 0.0, 'dst': 'E', 'tz_id': 'Europe/London', 'type': 'airport', 'source': 'OurAirports'}, 'codeshare': True, 'stops': 0, 'equipment': ['343']}
{'airline': {'airline_id': 2009, 'name': 'Delta Air Lines', 'alias': 'CSA Czech Airlines', 'iata': 'DL', 'icao': 'DAL', 'callsign': 'DELTA', 'country': 'United States', 'active': True}, 'src_airport': {'airport_id': 

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



{'airline': {'airline_id': 4091, 'name': 'Qatar Airways', 'alias': 'Qantas Airways', 'iata': 'QR', 'icao': 'QTR', 'callsign': 'QATARI', 'country': 'Qatar', 'active': True}, 'src_airport': {'airport_id': 2223, 'name': 'Benazir Bhutto International Airport', 'city': 'Islamabad', 'iata': 'RYK', 'icao': 'OPRN', 'latitude': 33.616699, 'longitude': 73.099197, 'timezone': 5.0, 'dst': 'N', 'tz_id': 'Asia/Karachi', 'type': 'airport', 'source': 'OurAirports'}, 'dst_airport': {'airport_id': 11051, 'name': 'Hamad International Airport', 'city': 'Doha', 'iata': 'DOH', 'icao': 'OTHH', 'latitude': 25.273056, 'longitude': 51.608056, 'timezone': 3.0, 'dst': 'N', 'tz_id': 'Europe/Moscow', 'type': 'airport', 'source': 'OurAirports'}, 'codeshare': False, 'stops': 0, 'equipment': ['332', '333', '77L']}
{'airline': {'airline_id': 4091, 'name': 'Qatar Airways', 'alias': 'Qantas Airways', 'iata': 'QR', 'icao': 'QTR', 'callsign': 'QATARI', 'country': 'Qatar', 'active': True}, 'src_airport': {'airport_id': 1701

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



{'airline': {'airline_id': 4611, 'name': 'Shenzhen Airlines', 'alias': 'Swiss European', 'iata': 'ZH', 'icao': 'CSZ', 'callsign': 'SHENZHEN AIR', 'country': 'China', 'active': True}, 'src_airport': {'airport_id': 3364, 'name': 'Beijing Capital International Airport', 'city': 'Beijing', 'iata': 'PEK', 'icao': 'ZBAA', 'latitude': 40.0801010131836, 'longitude': 116.58499908447266, 'timezone': 8.0, 'dst': 'U', 'tz_id': 'Asia/Shanghai', 'type': 'airport', 'source': 'OurAirports'}, 'dst_airport': {'airport_id': 6394, 'name': 'Yiwu Airport', 'city': 'Yiwu', 'iata': 'YIW', 'icao': 'ZSYW', 'latitude': 29.344699859600002, 'longitude': 120.03199768100001, 'timezone': 8.0, 'dst': 'U', 'tz_id': 'Asia/Shanghai', 'type': 'airport', 'source': 'OurAirports'}, 'codeshare': True, 'stops': 0, 'equipment': ['737']}
{'airline': {'airline_id': 4611, 'name': 'Shenzhen Airlines', 'alias': 'Swiss European', 'iata': 'ZH', 'icao': 'CSZ', 'callsign': 'SHENZHEN AIR', 'country': 'China', 'active': True}, 'src_airpor

### 3.1.c Parquet

In [34]:
def create_parquet_dataset():
    src_data_path = 'data/processed/openflights/routes.jsonl.gz'
    parquet_output_path = results_dir.joinpath('routes.parquet')
    s3 = s3fs.S3FileSystem(
        anon=True,
        client_kwargs={
            'endpoint_url': endpoint_url
        }
    )
    
    with s3.open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            ## TODO: Use Apache Arrow to create Parquet table and save the dataset
            table = read_json(f)
            
    pq.write_table(table, parquet_output_path)

create_parquet_dataset()

In [40]:
#read parquet table to validate
parquet_output_path = results_dir.joinpath('routes.parquet')

pq_file = pq.ParquetFile(parquet_output_path)
pq_file.metadata

<pyarrow._parquet.FileMetaData object at 0x7f7c6819da90>
  created_by: parquet-cpp version 1.5.1-SNAPSHOT
  num_columns: 38
  num_rows: 67663
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 7569

In [41]:
pq_file.schema

<pyarrow._parquet.ParquetSchema object at 0x7f7c98091130>
required group field_id=0 schema {
  optional group field_id=1 airline {
    optional int64 field_id=2 airline_id;
    optional binary field_id=3 name (String);
    optional binary field_id=4 alias (String);
    optional binary field_id=5 iata (String);
    optional binary field_id=6 icao (String);
    optional binary field_id=7 callsign (String);
    optional binary field_id=8 country (String);
    optional boolean field_id=9 active;
  }
  optional group field_id=10 src_airport {
    optional int64 field_id=11 airport_id;
    optional binary field_id=12 name (String);
    optional binary field_id=13 city (String);
    optional binary field_id=14 country (String);
    optional binary field_id=15 iata (String);
    optional binary field_id=16 icao (String);
    optional double field_id=17 latitude;
    optional double field_id=18 longitude;
    optional int64 field_id=19 altitude;
    optional double field_id=20 timezone;
    opt

### 3.1.d Protocol Buffers

In [55]:
sys.path.insert(0, os.path.abspath('routes_pb2'))

import routes_pb2

def _airport_to_proto_obj(airport):
    obj = routes_pb2.Airport()
    if airport is None:
        return None
    if airport.get('airport_id') is None:
        return None

    obj.airport_id = airport.get('airport_id')
    if airport.get('name'):
        obj.name = airport.get('name')
    if airport.get('city'):
        obj.city = airport.get('city')
    if airport.get('iata'):
        obj.iata = airport.get('iata')
    if airport.get('icao'):
        obj.icao = airport.get('icao')
    if airport.get('altitude'):
        obj.altitude = airport.get('altitude')
    if airport.get('timezone'):
        obj.timezone = airport.get('timezone')
    if airport.get('dst'):
        obj.dst = airport.get('dst')
    if airport.get('tz_id'):
        obj.tz_id = airport.get('tz_id')
    if airport.get('type'):
        obj.type = airport.get('type')
    if airport.get('source'):
        obj.source = airport.get('source')

    obj.latitude = airport.get('latitude')
    obj.longitude = airport.get('longitude')

    return obj


def _airline_to_proto_obj(airline):
    obj = routes_pb2.Airline()
    if airline is None:
        return None
    if airline.get('airline_id') is None:
        return None

    obj.airline_id = airline.get('airline_id')
    if airline.get('name'):
        obj.name = airline.get('name')
    if airline.get('alias'):
        obj.alias = airline.get('alias')
    if airline.get('iata'):
        obj.iata = airline.get('iata')
    if airline.get('icao'):
        obj.icao = airline.get('icao')
    if airline.get('callsign'):
        obj.callsign = airline.get('callsign')
    if airline.get('country'):
        obj.country = airline.get('country')
    if airline.get('active'):
        obj.active = airline.get('active')
    else:
        obj.active = False

    return obj


def create_protobuf_dataset(records):
    routes = routes_pb2.Routes()
    for record in records:
        route = routes_pb2.Route()
        airline = _airline_to_proto_obj(record.get('airline', {}))
        if airline:
            route.airline.CopyFrom(airline)
        src_airport = _airport_to_proto_obj(record.get('src_airport', {}))
        if src_airport:
            route.src_airport.CopyFrom(src_airport)
        dst_airport = _airport_to_proto_obj(record.get('dst_airport', {}))
        if dst_airport:
            route.dst_airport.CopyFrom(dst_airport)
            
        if record.get('codeshare'):
            route.codeshare = record.get('codeshare')
        else:
            route.codeshare = False
            
        if record.get('stops'):
            route.stops = record.get('stops')
        
        equipment = record.get('equipment')
        
        if len(equipment) > 1:
            for i, v in enumerate(equipment):
                route.equipment.append(v)
        else:
            equipment = record.get('equipment')

        routes.route.append(route)
        
    data_path = results_dir.joinpath('routes.pb')
    
    with open(data_path, 'wb') as f:
        f.write(routes.SerializeToString())
        
    compressed_path = results_dir.joinpath('routes.pb.snappy')
    
    with open(compressed_path, 'wb') as f:
        f.write(snappy.compress(routes.SerializeToString()))
        
create_protobuf_dataset(records)

## 3.2

### 3.2.a Simple Geohash Index

In [None]:
def create_hash_dirs(records):
    geoindex_dir = results_dir.joinpath('geoindex')
    geoindex_dir.mkdir(exist_ok=True, parents=True)
    hashes = []
    ## TODO: Create hash index
            
create_hash_dirs(records)

### 3.2.b Simple Search Feature

In [None]:
def airport_search(latitude, longitude):
    ## TODO: Create simple search to return nearest airport
    pass
    
airport_search(41.1499988, -95.91779)