### Matthew Collins
#### DSC650: Week 3
#### Assignment 3

In [1]:
# Import Libraries

import os
import sys
import gzip
import json
from pathlib import Path
import csv

import pandas as pd
import s3fs
import pyarrow as pa
from pyarrow.json import read_json
import pyarrow.parquet as pq
import fastavro
import pygeohash
import snappy
import jsonschema
from jsonschema.exceptions import ValidationError

# Create directories

endpoint_url='https://storage.budsc.midwest-datascience.com'

current_dir = Path(os.getcwd()).absolute()
schema_dir = current_dir.joinpath('schemas')
results_dir = current_dir.joinpath('results')
results_dir.mkdir(parents=True, exist_ok=True)

# Create function to get and load the data

def read_jsonl_data():
    s3 = s3fs.S3FileSystem(
        anon=True,
        client_kwargs={
            'endpoint_url': endpoint_url
        }
    )
    src_data_path = 'data/processed/openflights/routes.jsonl.gz'
    with s3.open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            records = [json.loads(line) for line in f.readlines()]
        

    return records

In [2]:
# Load the file and view it.  I used this information to create the initial json schema

records = read_jsonl_data()
print(json.dumps(records[1], sort_keys=False, indent = 4))

{
    "airline": {
        "airline_id": 410,
        "name": "Aerocondor",
        "alias": "ANA All Nippon Airways",
        "iata": "2B",
        "icao": "ARD",
        "callsign": "AEROCONDOR",
        "country": "Portugal",
        "active": true
    },
    "src_airport": {
        "airport_id": 2966,
        "name": "Astrakhan Airport",
        "city": "Astrakhan",
        "country": "Russia",
        "iata": "ASF",
        "icao": "URWA",
        "latitude": 46.2832984924,
        "longitude": 48.0063018799,
        "altitude": -65,
        "timezone": 4.0,
        "dst": "N",
        "tz_id": "Europe/Samara",
        "type": "airport",
        "source": "OurAirports"
    },
    "dst_airport": {
        "airport_id": 2990,
        "name": "Kazan International Airport",
        "city": "Kazan",
        "country": "Russia",
        "iata": "KZN",
        "icao": "UWKD",
        "latitude": 55.606201171875,
        "longitude": 49.278701782227,
        "altitude": 411,
        "tim

### 3.1

#### 3.1a JSON Scehma

In [9]:
# Create a function to develop the JSON schema

def validate_jsonl_data(records):
    schema_path = schema_dir.joinpath('routes-schema.json')
    with open(schema_path) as f:
        schema = json.load(f)
          
    # create variable to write csv file
    validation_csv_path = results_dir.joinpath('validation-results.csv') 
    with open(validation_csv_path, 'w') as f:    
        fieldnames = ['row_num', 'is_valid', 'msg'] #'airline', 'src_airport', 'dst_airport', 'codeshare', 'equipment'] # create column names           
        csv_writer = csv.writer(f)             # write to file 
        csv_writer.writerow(fieldnames)        # add column names to the file
        
        
        for i, record in enumerate(records):

            try:                
                ## TODO validate the csv path
                jsonschema.validate(instance = record, schema = schema)
                result = dict(
                    row_num = i,
                    is_valid = True,
                    msg = record
                )

                
                ## Added code above
                #pass
            
            except ValidationError as e:
                ## Print message if invalid record
                #print('JSON is invalid')
                #pass
                result = dict(
                    row_num = i,
                    is_valid = False,
                    msg = e
                )
     
            finally:                
                csv_writer.writerow(result.values())
    
validate_jsonl_data(records)



In [10]:
# Provide the outcome information for the validation-results.csv counts to ensure schema fits data

pd.read_csv('results/validation-results.csv')['is_valid'].value_counts()

True    67663
Name: is_valid, dtype: int64

In [8]:
df = pd.read_csv('results/validation-results.csv')
df = df.loc[df['is_valid'] == False]
df.to_csv('results/df.csv')

#### 3.1b Avro

In [8]:
# Create a function to develop Avro schema

#import additional elements from fastavro
from fastavro import writer, reader, parse_schema

def create_avro_dataset(records):
    schema_path = schema_dir.joinpath('routes.avsc')
    data_path = results_dir.joinpath('routes.avro')
    ## TODO: Use fastavro to create Avro dataset
    
    with open(schema_path, 'r') as f:
        schema = json.load(f)
    
    parsed_schema = parse_schema(schema)
    
    with open(data_path, 'wb') as out:
        writer(out, parsed_schema, records)
    
    #with open(data_path, 'rb') as fo:
        #for record in reader(fo):
            #print(record)      
    
    ## Added code above    
      
create_avro_dataset(records)

#### 3.1c Parquet

In [4]:
# Create a function develop Parquet schema

def create_parquet_dataset():
    src_data_path = 'data/processed/openflights/routes.jsonl.gz'
    parquet_output_path = results_dir.joinpath('routes.parquet')
    s3 = s3fs.S3FileSystem(
        anon=True,
        client_kwargs={
            'endpoint_url': endpoint_url
        }
    )
    
    with s3.open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            #records = [json.loads(line) for line in f.readlines()]
                        
            #pass
            ## TODO: Use Apache Arrow to create Parquet table and save the dataset
            file = read_json(f)
            pq.write_table(file, parquet_output_path)
            
            ## Added code above

create_parquet_dataset()


In [18]:
# Read the Parquet dataset to ensure it exist (I cannot read binary)

pq_output = results_dir.joinpath('routes.parquet')
table2 = pq.read_table(pq_output)
table2.to_pandas()
table2

Unnamed: 0,airline,src_airport,dst_airport,codeshare,equipment
0,"{'airline_id': 410, 'name': 'Aerocondor', 'ali...","{'airport_id': 2965.0, 'name': 'Sochi Internat...","{'airport_id': 2990.0, 'name': 'Kazan Internat...",False,[CR2]
1,"{'airline_id': 410, 'name': 'Aerocondor', 'ali...","{'airport_id': 2966.0, 'name': 'Astrakhan Airp...","{'airport_id': 2990.0, 'name': 'Kazan Internat...",False,[CR2]
2,"{'airline_id': 410, 'name': 'Aerocondor', 'ali...","{'airport_id': 2966.0, 'name': 'Astrakhan Airp...","{'airport_id': 2962.0, 'name': 'Mineralnyye Vo...",False,[CR2]
3,"{'airline_id': 410, 'name': 'Aerocondor', 'ali...","{'airport_id': 2968.0, 'name': 'Chelyabinsk Ba...","{'airport_id': 2990.0, 'name': 'Kazan Internat...",False,[CR2]
4,"{'airline_id': 410, 'name': 'Aerocondor', 'ali...","{'airport_id': 2968.0, 'name': 'Chelyabinsk Ba...","{'airport_id': 4078.0, 'name': 'Tolmachevo Air...",False,[CR2]
...,...,...,...,...,...
67658,"{'airline_id': 4178, 'name': 'Regional Express...","{'airport_id': 6334.0, 'name': 'Whyalla Airpor...","{'airport_id': 3341.0, 'name': 'Adelaide Inter...",False,[SF3]
67659,"{'airline_id': 19016, 'name': 'Apache Air', 'a...","{'airport_id': 4029.0, 'name': 'Domodedovo Int...","{'airport_id': 2912.0, 'name': 'Manas Internat...",False,[734]
67660,"{'airline_id': 19016, 'name': 'Apache Air', 'a...","{'airport_id': 2912.0, 'name': 'Manas Internat...","{'airport_id': 4029.0, 'name': 'Domodedovo Int...",False,[734]
67661,"{'airline_id': 19016, 'name': 'Apache Air', 'a...","{'airport_id': 2912.0, 'name': 'Manas Internat...","{'airport_id': 2913.0, 'name': 'Osh Airport', ...",False,[734]


#### 3.1d Protocol Buffers

In [17]:
sys.path.insert(0, os.path.abspath('routes_pb2'))

import routes_pb2

def _airport_to_proto_obj(airport):
    obj = routes_pb2.Airport()
    if airport is None:
        return None
    if airport.get('airport_id') is None:        
        return None

    obj.airport_id = airport.get('airport_id')
    if airport.get('name'):
        obj.name = airport.get('name')
    if airport.get('city'):
        obj.city = airport.get('city')
    if airport.get('iata'):
        obj.iata = airport.get('iata')
    if airport.get('icao'):
        obj.icao = airport.get('icao')
    if airport.get('altitude'):
        obj.altitude = airport.get('altitude')
    if airport.get('timezone'):
        obj.timezone = airport.get('timezone')
    if airport.get('dst'):
        obj.dst = airport.get('dst')
    if airport.get('tz_id'):
        obj.tz_id = airport.get('tz_id')
    if airport.get('type'):
        obj.type = airport.get('type')
    if airport.get('source'):
        obj.source = airport.get('source')

    obj.latitude = airport.get('latitude')
    obj.longitude = airport.get('longitude')

    return obj


def _airline_to_proto_obj(airline):
    obj = routes_pb2.Airline()
    ## TODO: Create an Airline obj using Protocol Buffers API
    #if airline is None:
        #return obj
    obj.airline_id = airline.get('airline_id')  # required
    obj.name = airline.get('name')   # required    
    
    #alias optional
    if airline.get('alias'):
        obj.alias = airline.get('alias')        
    if not airline.get('alias'):
        return None
    
    # iata optional
    if airline.get('iata'):
        obj.iata = airline.get('iata')        
    if not airline.get('iata'):
        return None
    
    # icao optional
    if airline.get('icao'):
        obj.iata = airline.get('iata')        
    if not airline.get('icao'):
        return None
    
    # callsign optional
    if airline.get('callsign'):
        obj.iata = airline.get('callsign')        
    if not airline.get('callsign'):
        return None

    # country optional
    if airline.get('country'):
        obj.iata = airline.get('country')        
    if not airline.get('country'):
        return None    
    
    # active required
    obj.active = airline.get('active') # required    
    ## Added code above
    return obj



def create_protobuf_dataset(records):
    routes = routes_pb2.Routes()
    for record in records:
        route = routes_pb2.Route()
        
        ## Create code to grab the Protocol Buffers Dataset        
        airline = _airline_to_proto_obj(record.get('airline', {}))        
        if airline:
            route.airline.CopyFrom(airline)
            
        src_airport = _airport_to_proto_obj(record.get('src_airport', {}))        
        if src_airport:
            route.src_airport.CopyFrom(src_airport)
        
        dst_airport = _airport_to_proto_obj(record.get('dst_airport', {}))        
        if dst_airport:
            route.dst_airport.CopyFrom(dst_airport)
        
        codeshare = record.get('codeshare')
        route.codeshare = record['codeshare']
            
        equipment = record.get('equipment')
        route.equipment.extend(equipment)  # repeated value can only be appended or extended
        
        routes.route.append(route)

    data_path = results_dir.joinpath('routes.pb')

    with open(data_path, 'wb') as f:
        f.write(routes.SerializeToString())
        
    compressed_path = results_dir.joinpath('routes.pb.snappy')
    
    with open(compressed_path, 'wb') as f:
        f.write(snappy.compress(routes.SerializeToString()))
        
create_protobuf_dataset(records)

#### 3.1e Output sizes

In [47]:
# Get size of files

comparison_csv_path = results_dir.joinpath('comparison.csv')

# Create function to compare file sizes
with open(comparison_csv_path, 'w', encoding = 'utf-8') as f:
    fieldnames = ['file', 'size_MB'] # column names
    
    csv_writer = csv.DictWriter(f, fieldnames = fieldnames, lineterminator = '\n') # writer object
    csv_writer.writeheader()  # write column heads
    
    for files in os.walk(results_dir):
        try:
            for file in files[2]:
                if file == 'comparison.csv':
                    next
                    
                else:
                    print(f'{file:>25}: {int(os.stat(results_dir.joinpath(file))[6]) / 1024 / 1024:2f}MB')
                    
                    file_size = dict(file = file, size_MB = int(os.stat(results_dir.joinpath(file))[6]) / 1024 / 1024)  # make a dict to write to csv file
                    csv_writer.writerow(file_size) # write to csv file
                    
        except FileNotFoundError:
            pass

                   df.csv: 4.100471MB
           routes.parquet: 1.883954MB
              routes.avro: 18.736102MB
   validation-results.csv: 60.772452MB


### 3.2

#### 3.2a Create a Simple Geohash Index

In [6]:

def create_hash_dirs(records):
    geoindex_dir = results_dir.joinpath('geoindex')
    geoindex_dir.mkdir(exist_ok=True, parents=True)
    hashes = []
    ## TODO: Create hash index

    for record in records:
        src_airport = record.get('src_airport', {})
        if src_airport:
            latitude = src_airport.get('latitude')
            longitude = src_airport.get('longitude')
            if latitude and longitude:                               
                geohash = pygeohash.encode(latitude, longitude, precision = 5)
                hashes.append(geohash)
                record['geohash'] = geohash                
                ## Added code above
                      
    hashes.sort()
    three_letter = sorted(list(set([entry[:3] for entry in hashes])))
    hash_index = {value: [] for value in three_letter}
    for record in records:
        geohash = record.get('geohash')
        if geohash:
            hash_index[geohash[:3]].append(record)
        
    for key, values in hash_index.items():
        output_dir = geoindex_dir.joinpath(str(key[:1])).joinpath(str(key[:2]))
        output_dir.mkdir(exist_ok=True, parents=True)
        output_path = output_dir.joinpath('{}.jsonl.gz'.format(key))
        with gzip.open(output_path, 'w') as f:
            json_output = '\n'.join([json.dumps(value) for value in values])
            f.write(json_output.encode('utf-8'))
    

create_hash_dirs(records)

#### 3.2b Implement a simple search feature

In [112]:
import numpy as np

def airport_search(latitude, longitude):
    '''
    create a search function using geohash values
    latitutd and longitude     
    '''    
    # Create simple search to return nearest airports under 50 KMs
    # Convert lat and Long into geohash        
    search_hash = pygeohash.encode(latitude, longitude)
    
    # Use geohash to find airports within 50 Kilometers           
    retval = []
        
    for record in records:
        
        if record.get('geohash'):
            # get the geohash coordinate for the record
            match = record.get('geohash')            
            
            # calculate the distance difference
            x = (pygeohash.geohash_approximate_distance(match, search_hash[0:5]) / 1000)           
            
            # Get airports within 50 KM's
            if match:
                if x < 50:
                    if record.get('geohash') not in retval:
                        retval.append(record.get('geohash'))             
                        return (record.get('src_airport', {}), retval)
                        #print(retval)
                else: 
                    pass
            
                
        else:
            pass            
                            
    
airport_search(41.1499988, -95.91779)

({'airport_id': 3454,
  'name': 'Eppley Airfield',
  'city': 'Omaha',
  'country': 'United States',
  'iata': 'OMA',
  'icao': 'KOMA',
  'latitude': 41.3032,
  'longitude': -95.89409599999999,
  'altitude': 984,
  'timezone': -6.0,
  'dst': 'A',
  'tz_id': 'America/Chicago',
  'type': 'airport',
  'source': 'OurAirports'},
 ['9z7fc'])

#### 3.2c Evolve the Avro Schema

In [21]:
# Validate Schema by writing the data to it to ensure it works

def create_avro_dataset(records):
    schema_path = schema_dir.joinpath('routesv2.avsc')
    data_path = results_dir.joinpath('routesv2.avro')
    
    with open(schema_path, 'r') as f:
        schema = json.load(f)
    
    parsed_schema = fastavro.parse_schema(schema)
    
    with open('routesv2.avro', 'wb') as out:
        fastavro.writer(out, parsed_schema, records)
        
        
create_avro_dataset(records) 
    


In [32]:
# read first record to ensure geohash was in correct spots and data captured (probably better way but this was simple).

data_path = 'results/routesv2.avro'
with open(data_path, 'rb') as fo:
    for record in reader(fo):
        n = record                   # used n = record to stop after 1 record, if used print(record), would have gone on with 66,000 plus records

print(n)

{'airline': {'airline_id': 19016, 'name': 'Apache Air', 'alias': 'Apache', 'iata': 'ZM', 'icao': 'IWA', 'callsign': 'APACHE', 'country': 'United States', 'active': True}, 'src_airport': {'airport_id': 2913, 'name': 'Osh Airport', 'city': 'Osh', 'iata': 'OSS', 'icao': 'UAFO', 'latitude': 40.6090011597, 'longitude': 72.793296814, 'timezone': 6.0, 'dst': 'U', 'tz_id': 'Asia/Bishkek', 'type': 'airport', 'source': 'OurAirports', 'geohash': 'NONE'}, 'dst_airport': {'airport_id': 2912, 'name': 'Manas International Airport', 'city': 'Bishkek', 'iata': 'FRU', 'icao': 'UAFM', 'latitude': 43.0612983704, 'longitude': 74.4776000977, 'timezone': 6.0, 'dst': 'U', 'tz_id': 'Asia/Bishkek', 'type': 'airport', 'source': 'OurAirports', 'geohash': 'NONE'}, 'codeshare': False, 'stops': 0, 'equipment': ['734']}
