## 3.1a

In [1]:
import os
import sys
import gzip
import json
from pathlib import Path
import csv
from jsonschema import validate
import pandas as pd
import s3fs
import pyarrow as pa
from pyarrow.json import read_json
import pyarrow.parquet as pq
import fastavro
import pygeohash
import snappy
import jsonschema
from jsonschema.exceptions import ValidationError
from genson import SchemaBuilder
import genson
from fastavro import writer, reader, parse_schema
from pygeohash import distances



endpoint_url='https://storage.budsc.midwest-datascience.com'

current_dir = Path(os.getcwd()).absolute()
schema_dir = current_dir.joinpath('schemas')
results_dir = current_dir.joinpath('results')
results_dir.mkdir(parents=True, exist_ok=True)


def read_jsonl_data():
    src_data_path = 'routes.jsonl.gz'
    with gzip.open(src_data_path, 'rb') as f:
        records = [json.loads(line) for line in f.readlines()]
    return records



In [2]:
records = read_jsonl_data()

In [5]:
schema_builder = SchemaBuilder()
for item in records:
    schema_builder.add_schema(item)
    
schema = schema_builder.to_schema()
with open('routes-schema.json', "r") as f:
    existing_data = json.load(f)
    
existing_data["schema"] = schema

with open('routes-schema.json',"w") as f:
    json.dump(existing_data, f)
    
f.close()

In [9]:
def validate_jsonl_data(records):
    with open('routes-schema.json') as f:
        schema = json.load(f)
    
    validation_csv_path = str(current_dir)+'/results/validations.csv'
        
    with open(validation_csv_path, 'w') as f:    
        for i, record in enumerate(records):
            try:
                validate(i, schema=schema)
                pass
            except ValidationError as e:
                print("Invalid JSON record")
                pass
            

validate_jsonl_data(records)

## 3.1b

In [11]:
schema_path = schema_dir.joinpath('routes.avsc')
data_path = results_dir.joinpath('routes.avro')

def create_avro_dataset(records):
    
    with open('routes.avsc', "r") as f:
        schema = json.load(f)
    
    parsed_schema = parse_schema(schema)
    
    with open(data_path, "wb") as f_out:
        writer(f_out,parsed_schema,records)
    
        
create_avro_dataset(records)

## 3.1c

In [13]:
def create_parquet_dataset():
    src_data_path = 'routes.jsonl.gz'
    parquet_output_path = results_dir.joinpath('routes.parquet')
#     s3 = s3fs.S3FileSystem(
#         anon=True,
#         client_kwargs={
#             'endpoint_url': endpoint_url
#         }
#     )
    
    with gzip.open(src_data_path, 'rb') as f:
        table = read_json(f)
        pq.write_table(table,parquet_output_path)

create_parquet_dataset()

In [15]:
# View the parquet data

table = pq.read_table('results/routes.parquet')
df = table.to_pandas()
print(df)

                                                 airline  \
0      {'airline_id': 410, 'name': 'Aerocondor', 'ali...   
1      {'airline_id': 410, 'name': 'Aerocondor', 'ali...   
2      {'airline_id': 410, 'name': 'Aerocondor', 'ali...   
3      {'airline_id': 410, 'name': 'Aerocondor', 'ali...   
4      {'airline_id': 410, 'name': 'Aerocondor', 'ali...   
...                                                  ...   
67658  {'airline_id': 4178, 'name': 'Regional Express...   
67659  {'airline_id': 19016, 'name': 'Apache Air', 'a...   
67660  {'airline_id': 19016, 'name': 'Apache Air', 'a...   
67661  {'airline_id': 19016, 'name': 'Apache Air', 'a...   
67662  {'airline_id': 19016, 'name': 'Apache Air', 'a...   

                                             src_airport  \
0      {'airport_id': 2965.0, 'name': 'Sochi Internat...   
1      {'airport_id': 2966.0, 'name': 'Astrakhan Airp...   
2      {'airport_id': 2966.0, 'name': 'Astrakhan Airp...   
3      {'airport_id': 2968.0, 'name': '

## 3.1d

In [12]:
#sys.path.insert(0, os.path.abspath('routes_pb2'))

import routes_pb2

def _airport_to_proto_obj(airport):
    obj = routes_pb2.Airport()
    if airport is None:
        return None
    if airport.get('airport_id') is None:
        return None

    obj.airport_id = airport.get('airport_id')
    if airport.get('name'):
        obj.name = airport.get('name')
    if airport.get('city'):
        obj.city = airport.get('city')
    if airport.get('iata'):
        obj.iata = airport.get('iata')
    if airport.get('icao'):
        obj.icao = airport.get('icao')
    if airport.get('altitude'):
        obj.altitude = airport.get('altitude')
    if airport.get('timezone'):
        obj.timezone = airport.get('timezone')
    if airport.get('dst'):
        obj.dst = airport.get('dst')
    if airport.get('tz_id'):
        obj.tz_id = airport.get('tz_id')
    if airport.get('type'):
        obj.type = airport.get('type')
    if airport.get('source'):
        obj.source = airport.get('source')

    obj.latitude = airport.get('latitude')
    obj.longitude = airport.get('longitude')

    return obj

def _airline_to_proto_obj(airline):
    obj = routes_pb2.Airline()
    if not airline.get('name'):
        return None
    if not airline.get('airline_id'):
        return None
    obj.airline_id = airline.get('airline_id')
    if airline.get('name'):
        obj.name = airline.get('name')
    if airline.get('alias'):
        obj.alias = airline.get('alias')
    if airline.get('iata'):
        obj.iata = airline.get('iata')
    if airline.get('icao'):
        obj.icao = airline.get('icao')
    if airline.get('callsign'):
        obj.callsign = airline.get('callsign')
    if airline.get('country'):
        obj.country = airline.get('country')
    if airline.get('active'):
        obj.active = airline.get('active')    

    return obj

def create_protobuf_dataset(records):
    routes = routes_pb2.Routes()
    for record in records:
        route = routes_pb2.Route()
        airline = _airline_to_proto_obj(record.get('airline', {}))
        if airline:
            route.airline.CopyFrom(airline)
        src_airport = _airport_to_proto_obj(record.get('src_airport', {}))
        if src_airport:
            route.src_airport.CopyFrom(src_airport)
        dst_airport = _airport_to_proto_obj(record.get('dst_airport', {}))
        if dst_airport:
            route.dst_airport.CopyFrom(dst_airport)
        route.codeshare = record.get('codeshare', {})
        stops = _airport_to_proto_obj(record.get('stops', {}))
        if stops:
            route.stops.CopyFrom(stops)



def _airport_to_proto_obj(airport):
    obj = routes_pb2.Airport()
    if airport is None:
        return None
    if airport.get('airport_id') is None:
        return None

    obj.airport_id = airport.get('airport_id')
    if airport.get('name'):
        obj.name = airport.get('name')
    if airport.get('city'):
        obj.city = airport.get('city')
    if airport.get('iata'):
        obj.iata = airport.get('iata')
    if airport.get('icao'):
        obj.icao = airport.get('icao')
    if airport.get('altitude'):
        obj.altitude = airport.get('altitude')
    if airport.get('timezone'):
        obj.timezone = airport.get('timezone')
    if airport.get('dst'):
        obj.dst = airport.get('dst')
    if airport.get('tz_id'):
        obj.tz_id = airport.get('tz_id')
    if airport.get('type'):
        obj.type = airport.get('type')
    if airport.get('source'):
        obj.source = airport.get('source')

    obj.latitude = airport.get('latitude')
    obj.longitude = airport.get('longitude')

    return obj
    
    routes.route.append(route)
    data_path = results_dir.joinpath('routes.pb')
    with open(data_path, 'wb') as f:
        f.write(routes.SerializeToString())
        
    compressed_path = results_dir.joinpath('routes.pb.snappy')
    
    with open(compressed_path, 'wb') as f:
        f.write(snappy.compress(routes.SerializeToString()))
        
create_protobuf_dataset(records)

## 3.1.e

In [13]:
# I couldn't figure this out in time, sorry

## 3.2.a

In [5]:
def create_hash_dirs(records):
    geoindex_dir = results_dir.joinpath('geoindex')
    geoindex_dir.mkdir(exist_ok=True, parents=True)
    hashes = []
    for record in records:
        src_airport = record.get('src_airport', {})
        if src_airport:
            latitude = src_airport.get('latitude')
            longitude = src_airport.get('longitude')
            if latitude and longitude:
                ## TODO: use pygeohash.encode() to assign geohashes to the records and complete the hashes list
                geo = pygeohash.encode(latitude, longitude)
                hashes.append(geo)
                record['geohash'] = geo
    
    
    hashes.sort()
    three_letter = sorted(list(set([entry[:3] for entry in hashes])))
    hash_index = {value: [] for value in three_letter}
    for record in records:
        geohash = record.get('geohash')
        if geohash:
            hash_index[geohash[:3]].append(record)
    for key, values in hash_index.items():
        output_dir = geoindex_dir.joinpath(str(key[:1])).joinpath(str(key[:2]))
        output_dir.mkdir(exist_ok=True, parents=True)
        output_path = output_dir.joinpath('{}.jsonl.gz'.format(key))
        with gzip.open(output_path, 'w') as f:
            json_output = '\n'.join([json.dumps(value) for value in values])
            f.write(json_output.encode('utf-8'))
            
create_hash_dirs(records)

## 3.2.b

In [73]:
def create_hash_index(records):
    hashes = []
    for record in records:
        src_airport = record.get('src_airport', {})
        if src_airport:
            latit = src_airport.get('latitude')
            longit = src_airport.get('longitude')
        if latit and longit:
            geo = pygeohash.encode(latit, longit)
            hashes.append(geo)
            record['geohash'] = geo
    hashes.sort()
    three_letter = sorted(list(set([entry[:3] for entry in hashes])))
    hash_index = {value: [] for value in three_letter}
    for record in records:
        geohash = record.get('geohash')
        if geohash:
            hash_index[geohash[:3]].append(record)
    return hash_index

        
def encode_for_search(latitude, longitude):
    target_geohash = pygeohash.encode(latitude, longitude)
    return target_geohash

distance_hashes = {}

In [77]:
def airport_search(latitude, longitude):
    t_geohash = encode_for_search(latitude, longitude)
    records = read_jsonl_data()
    hashed_index = {}
    hashed_index = create_hash_index(records)
    for h in hashes:
        t_distance = pygeohash.geohash_approximate_distance(t_geohash,h)
        distance_hashes.setdefault(h,t_distance)
    sorted_distance_hashes = sorted(distance_hashes.items(),key=lambda x:x[1],reverse=False)
    closest_airport_geohash = sorted_distance_hashes[::len(sorted_distance_hashes)-1]
    closest_airport_geohash = closest_airport_geohash[0]
    closest_airport_geohash = closest_airport_geohash[0]
    value = hashed_index[closest_airport_geohash[:3]]
    src_airport = value[0].get('src_airport', {})
    name = src_airport.get('name')
#     print(closest_airport_geohash[0])
    print("The closest airport is",name)
    
    
airport_search(41.1499988, -95.91779)

The closest airport is Eppley Airfield
