## Assignment 7 - Kayla Thompson

### 7.1a

In [77]:
import os
import json
from pathlib import Path
import gzip
import hashlib
import shutil
import pandas as pd
import pygeohash
import s3fs
endpoint_url='https://storage.budsc.midwest-datascience.com'
current_dir = Path(os.getcwd()).absolute()
results_dir = current_dir.joinpath('results')
if results_dir.exists():
    shutil.rmtree(results_dir)
results_dir.mkdir(parents=True, exist_ok=True)
def read_jsonl_data():
    s3 = s3fs.S3FileSystem(
        anon=True,
        client_kwargs={'endpoint_url': endpoint_url}
    )
    src_data_path = 'data/processed/openflights/routes.jsonl.gz'
    with s3.open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            records = [json.loads(line) for line in f.readlines()]
        
    return records
def flatten_record(record):
    flat_record = dict()
    for key, value in record.items():
        if key in ['airline', 'src_airport', 'dst_airport']:
            if isinstance(value, dict):
                for child_key, child_value in value.items():
                    flat_key = '{}_{}'.format(key, child_key)
                    flat_record[flat_key] = child_value
        else:
            flat_record[key] = value
    
    return flat_record
def create_flattened_dataset():
    records = read_jsonl_data()
    parquet_path = results_dir.joinpath('routes-flattened.parquet')
    return pd.DataFrame.from_records([flatten_record(record) for record in records])
df = create_flattened_dataset()
df['key'] = df['src_airport_iata'].astype(str) + df['dst_airport_iata'].astype(str) + df['airline_iata'].astype(str)

In [78]:
df.head()

Unnamed: 0,airline_airline_id,airline_name,airline_alias,airline_iata,airline_icao,airline_callsign,airline_country,airline_active,src_airport_airport_id,src_airport_name,...,dst_airport_longitude,dst_airport_altitude,dst_airport_timezone,dst_airport_dst,dst_airport_tz_id,dst_airport_type,dst_airport_source,codeshare,equipment,key
0,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2965.0,Sochi International Airport,...,49.278702,411.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],AERKZN2B
1,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,49.278702,411.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],ASFKZN2B
2,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,43.081902,1054.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],ASFMRV2B
3,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,49.278702,411.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],CEKKZN2B
4,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,82.650703,365.0,7.0,N,Asia/Krasnoyarsk,airport,OurAirports,False,[CR2],CEKOVB2B


In [79]:
partitions = (('A', 'A'), ('B', 'B'), ('C', 'D'), ('E', 'F'),
        ('G', 'H'), ('I', 'J'), ('K', 'L'), ('M', 'M'),
        ('N', 'N'), ('O', 'P'), ('Q', 'R'), ('S', 'T'),
        ('U', 'U'), ('V', 'V'), ('W', 'X'), ('Y', 'Z'))

def get_partition(key):
    for a, v in partitions:
        if a == v:
            kv_key = a
        else:
            kv_key = a + '-' + v
        if key[0] >= a and key[0] <= v:
            return kv_key

In [80]:
df['kv_key'] = df['key'].apply(get_partition)

In [81]:
df.head()

Unnamed: 0,airline_airline_id,airline_name,airline_alias,airline_iata,airline_icao,airline_callsign,airline_country,airline_active,src_airport_airport_id,src_airport_name,...,dst_airport_altitude,dst_airport_timezone,dst_airport_dst,dst_airport_tz_id,dst_airport_type,dst_airport_source,codeshare,equipment,key,kv_key
0,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2965.0,Sochi International Airport,...,411.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],AERKZN2B,A
1,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,411.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],ASFKZN2B,A
2,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,1054.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],ASFMRV2B,A
3,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,411.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],CEKKZN2B,C-D
4,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,365.0,7.0,N,Asia/Krasnoyarsk,airport,OurAirports,False,[CR2],CEKOVB2B,C-D


In [82]:
df.to_parquet(os.getcwd() + '/results/kv.parquet', partition_cols=['kv_key'])

### 7.1b

In [83]:
def hash_key(key):
    m = hashlib.sha256()
    m.update(str(key).encode('utf-8'))
    return m.hexdigest()

In [84]:
df['hashed'] = df['key'].apply(lambda x: hash_key(x))
df['hashed_key'] = df['hashed'].str[0]

In [85]:
df.head()

Unnamed: 0,airline_airline_id,airline_name,airline_alias,airline_iata,airline_icao,airline_callsign,airline_country,airline_active,src_airport_airport_id,src_airport_name,...,dst_airport_dst,dst_airport_tz_id,dst_airport_type,dst_airport_source,codeshare,equipment,key,kv_key,hashed,hashed_key
0,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2965.0,Sochi International Airport,...,N,Europe/Moscow,airport,OurAirports,False,[CR2],AERKZN2B,A,652cdec02010381f175efe499e070c8cbaac1522bac59a...,6
1,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,N,Europe/Moscow,airport,OurAirports,False,[CR2],ASFKZN2B,A,9eea5dd88177f8d835b2bb9cb27fb01268122b635b241a...,9
2,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,N,Europe/Moscow,airport,OurAirports,False,[CR2],ASFMRV2B,A,161143856af25bd4475f62c80c19f68936a139f653c1d3...,1
3,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,N,Europe/Moscow,airport,OurAirports,False,[CR2],CEKKZN2B,C-D,39aa99e6ae2757341bede9584473906ef1089e30820c90...,3
4,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,N,Asia/Krasnoyarsk,airport,OurAirports,False,[CR2],CEKOVB2B,C-D,143b3389bce68eea3a13ac26a9c76c1fa583ec2bd26ea8...,1


In [86]:
df.to_parquet(os.getcwd() + '/results/hash.parquet', partition_cols=['hashed_key'])

### 7.1c

In [87]:
import pygeohash

In [88]:
! pip install haversine

Collecting haversine
  Downloading haversine-2.3.1-py2.py3-none-any.whl (5.5 kB)
Installing collected packages: haversine
Successfully installed haversine-2.3.1


In [89]:
import haversine as hs

In [90]:
df['airport_geo'] = df.apply(lambda row: pygeohash.encode(row.src_airport_latitude, row.src_airport_longitude),
                             axis=1)

In [91]:
def det_loc(airport_geo):
    loca_dic = dict(
        west = pygeohash.encode(45.5945645, -121.1786823), 
        central = pygeohash.encode(41.1544433, -96.0422378),
        east = pygeohash.encode(39.08344, -77.6497145))
    distances = []
    for location, geohash in loca_dic.items():
        hav = pygeohash.geohash_haversine_distance(airport_geo, geohash)
        distances.append(tuple((hav, location)))
    
    distances.sort()
    return distances[0][1]

df['location'] = df['airport_geo'].apply(det_loc)        

In [92]:
df.head()

Unnamed: 0,airline_airline_id,airline_name,airline_alias,airline_iata,airline_icao,airline_callsign,airline_country,airline_active,src_airport_airport_id,src_airport_name,...,dst_airport_type,dst_airport_source,codeshare,equipment,key,kv_key,hashed,hashed_key,airport_geo,location
0,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2965.0,Sochi International Airport,...,airport,OurAirports,False,[CR2],AERKZN2B,A,652cdec02010381f175efe499e070c8cbaac1522bac59a...,6,szsrjjzd02b3,east
1,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,airport,OurAirports,False,[CR2],ASFKZN2B,A,9eea5dd88177f8d835b2bb9cb27fb01268122b635b241a...,9,v04pk3t5gbjj,east
2,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,airport,OurAirports,False,[CR2],ASFMRV2B,A,161143856af25bd4475f62c80c19f68936a139f653c1d3...,1,v04pk3t5gbjj,east
3,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,airport,OurAirports,False,[CR2],CEKKZN2B,C-D,39aa99e6ae2757341bede9584473906ef1089e30820c90...,3,v3gdxs17du83,west
4,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,airport,OurAirports,False,[CR2],CEKOVB2B,C-D,143b3389bce68eea3a13ac26a9c76c1fa583ec2bd26ea8...,1,v3gdxs17du83,west


In [93]:
df.to_parquet(os.getcwd() + '/results/geo.parquet', partition_cols=['location'])

### 7.1d

In [94]:
import numpy as np

In [95]:
print(*np.array_split(range(20),3))

[0 1 2 3 4 5 6] [ 7  8  9 10 11 12 13] [14 15 16 17 18 19]


In [96]:
def balance_partitions(keys, num_partitions):
    partitions = [np.array_split(keys, num_partitions)]
    return partitions

In [97]:
keys=[1, 2, 3, 'cat1', 'cat2', 'moon', 'blue', 15, 0, 'bay']

In [98]:
print(balance_partitions(keys, 5))

[[array(['1', '2'], dtype='<U21'), array(['3', 'cat1'], dtype='<U21'), array(['cat2', 'moon'], dtype='<U21'), array(['blue', '15'], dtype='<U21'), array(['0', 'bay'], dtype='<U21')]]
