# Assignment 3

Import libraries and define common helper functions

In [1]:
import os
import sys
import gzip
import json
from pathlib import Path
import csv

import pandas as pd
import s3fs
import pyarrow as pa
from pyarrow.json import read_json
import pyarrow.parquet as pq
import fastavro
import pygeohash
import snappy
import jsonschema
from jsonschema.exceptions import ValidationError


endpoint_url='https://storage.budsc.midwest-datascience.com'

current_dir = Path(os.getcwd()).absolute()
schema_dir = current_dir.joinpath('schemas')
results_dir = current_dir.joinpath('results')
results_dir.mkdir(parents=True, exist_ok=True)


def read_jsonl_data():
    s3 = s3fs.S3FileSystem(
        anon=True,
        client_kwargs={
            'endpoint_url': endpoint_url
        }
    )
    src_data_path = 'data/processed/openflights/routes.jsonl.gz'
    with s3.open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            records = [json.loads(line) for line in f.readlines()]
        

    return records

Load the records from https://storage.budsc.midwest-datascience.com/data/processed/openflights/routes.jsonl.gz 

In [2]:
records = read_jsonl_data()

## 3.1

### 3.1.a JSON Schema

In [3]:
def validate_jsonl_data(records):
    schema_path = schema_dir.joinpath('routes-schema.json')
    validation_csv_path = results_dir.joinpath('validation-results.csv')
    with open(schema_path) as f:
        schema = json.load(f)
        
    with open(validation_csv_path, 'w') as f:
        fieldnames = ['row_num', 'is_valid', 'msg']
        csv_writer = csv.DictWriter(f, fieldnames=fieldnames)
        csv_writer.writeheader()
        
        for i, record in enumerate(records):
            try:
                ## TODO: Validate record 
                #print("record is: \n",record)
            
                jsonschema.validate(instance=record, schema=schema)
                
                #result = dict(
                    ## TODO
                #   row_num = i,
                #    is_valid = True,
                #   msg = "Record is valid."
                #)
                
                
            except ValidationError as e:
                ## Print message if invalid record
                print("Invalid record found at row: ",i)
                result = dict(
                    ## TODO
                    row_num = i,
                    is_valid = False,
                    msg = e
                )
                print(i)
                csv_writer.writerow(result)
            

validate_jsonl_data(records)

### 3.1.b Avro

In [4]:
def create_avro_dataset(records):
    schema_path = schema_dir.joinpath('routes.avsc')
    data_path = results_dir.joinpath('routes.avro')
    ## TODO: Use fastavro to create Avro dataset
    with open(schema_path) as f:
        schema = json.load(f)
    parsed_schema = fastavro.parse_schema(schema)
    
    with open(data_path, 'wb') as out:
        fastavro.writer(out, parsed_schema, records)
    
        
create_avro_dataset(records)

### 3.1.c Parquet

In [5]:
def create_parquet_dataset():
    src_data_path = 'data/processed/openflights/routes.jsonl.gz'
    parquet_output_path = results_dir.joinpath('routes.parquet')
    s3 = s3fs.S3FileSystem(
        anon=True,
        client_kwargs={
            'endpoint_url': endpoint_url
        }
    )
    
    with s3.open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            pass
            table = read_json(f)
            ## TODO: Use Apache Arrow to create Parquet table and save the dataset
    #print(table)
    pq.write_table(table, parquet_output_path)
    #df = table.to_pandas()
    #print(df.head())
create_parquet_dataset()

### 3.1.d Protocol Buffers

In [6]:
sys.path.insert(0, os.path.abspath('routes_pb2'))

import routes_pb2

def _airport_to_proto_obj(airport):
    obj = routes_pb2.Airport()
    if airport is None:
        return None
    if airport.get('airport_id') is None:
        return None

    obj.airport_id = airport.get('airport_id')
    if airport.get('name'):
        obj.name = airport.get('name')
    if airport.get('city'):
        obj.city = airport.get('city')
    if airport.get('iata'):
        obj.iata = airport.get('iata')
    if airport.get('icao'):
        obj.icao = airport.get('icao')
    if airport.get('altitude'):
        obj.altitude = airport.get('altitude')
    if airport.get('timezone'):
        obj.timezone = airport.get('timezone')
    if airport.get('dst'):
        obj.dst = airport.get('dst')
    if airport.get('tz_id'):
        obj.tz_id = airport.get('tz_id')
    if airport.get('type'):
        obj.type = airport.get('type')
    if airport.get('source'):
        obj.source = airport.get('source')

    obj.latitude = airport.get('latitude')
    obj.longitude = airport.get('longitude')

    return obj


def _airline_to_proto_obj(airline):
    obj = routes_pb2.Airline()
    if airline is None:
        return None
    if airline.get('airline_id') is None:
        return None
    
    obj.airline_id = airline.get('airline_id')
    if airline.get('name'):
        obj.name = airline.get('name')
    if airline.get('alias'):
        obj.alias = airline.get('alias')
    if airline.get('iata'):
        obj.iata = airline.get('iata')
    if airline.get('icao'):
        obj.icao = airline.get('icao')
    if airline.get('callsign'):
        obj.callsign = airline.get('callsign')
    if airline.get('country'):
        obj.country = airline.get('country')

    obj.active = airline.get('active')
    
    ## TODO: Create an Airline obj using Protocol Buffers API
    return obj


def create_protobuf_dataset(records):
    routes = routes_pb2.Routes()
    for record in records:
        route = routes_pb2.Route()
       
        ## TODO: Implement the code to create the Protocol Buffers Dataset
        airline = _airline_to_proto_obj(record.get('airline', {}))
        source_airport = _airport_to_proto_obj(record.get('src_airport', {}))
        dest_airport = _airport_to_proto_obj(record.get('dst_airport', {}))
        if (not airline is None):
            route.airline.CopyFrom(airline)
        if (not source_airport is None):
            route.src_airport.CopyFrom(source_airport)
        if (not dest_airport is None):
            route.dst_airport.CopyFrom(dest_airport)
        if record.get('codeshare'):
            route.codeshare = record.get('codeshare')
        else:
            route.codeshare = False
        if record.get('stops') is not None:
            route.stops = record.get('stops')
        #print(record)
       # route.stops = 0 # The data doesn't appear to have multistop flights?
        if record.get('equipment'):
            route.equipment.extend(record.get('equipment'))
        #print(route)
        routes.route.append(route)

    data_path = results_dir.joinpath('routes.pb')

    with open(data_path, 'wb') as f:
        f.write(routes.SerializeToString())
        
    compressed_path = results_dir.joinpath('routes.pb.snappy')
    
    with open(compressed_path, 'wb') as f:
        f.write(snappy.compress(routes.SerializeToString()))
        
create_protobuf_dataset(records)

## 3.2

### 3.2.a Simple Geohash Index

In [7]:
def create_hash_dirs(records):
    geoindex_dir = results_dir.joinpath('geoindex')
    geoindex_dir.mkdir(exist_ok=True, parents=True)
    hashes = []
    ## TODO: Create hash index
    for record in records:
        source_airport = record.get('src_airport', {})
        if not source_airport is None:
            latitude = source_airport.get('latitude')
            longitude = source_airport.get('longitude')
        if not latitude is None and not longitude is None:
            location = pygeohash.encode(latitude, longitude, precision = 3)
            record['geohash'] = location
            if not location in hashes:
                hashes.append(location)
    hashes.sort()
    #print(hashes)
    
    index_hash = {value: [] for value in hashes}
    
    for record in records:
        location = record.get('geohash')
        if location:
            index_hash[location].append(record)
    
    #print(index_hash['ucf'])
    for (key, values) in index_hash.items():
        output_dir = geoindex_dir.joinpath(key[0]).joinpath(key[0:2]) 
        #print(first_dir)
        output_dir.mkdir(exist_ok=True, parents=True)
        output = output_dir.joinpath('{}.jsonl.gz'.format(key))
        #print(output_path)
        
        with gzip.open(output, 'w') as f:
            json_output = '\n'.join([json.dumps(line) for line in values])
            f.write(json_output.encode('utf-8'))
    
    
create_hash_dirs(records)


### 3.2.b Simple Search Feature

In [8]:
def airport_search(latitude, longitude):
    ## TODO: Create simple search to return nearest airport
    target = pygeohash.encode(latitude, longitude, precision = 5)
    distance = 20000001
    closest = "None"
    for record in records:
        airport = record.get('src_airport')
        if airport:
            longitude = airport.get('longitude')
            latitude = airport.get('latitude')
            if longitude and latitude:
                location = pygeohash.encode(latitude, longitude, precision = 5) 
        #print(counter, '\n',location, '\n', target)
        crow = pygeohash.geohash_approximate_distance(location, target)
        #print(crow)
        if crow < distance:
            distance = crow
            closest = airport.get('name')
       # print(location, "\ntarget: ",target, "\ndistance:", crow)
    #print(distance)
    print("The closest airport is: ",closest,"\nIt is located ", distance/1000,"KM from your search location.")
    
    
airport_search(41.1499988, -95.91779)

#airport_search(27.918710, -80.629000)

The closest airport is:  Eppley Airfield 
It is located  19.545 KM from your search location.


625441

5003530