# Data Partitioning 
## Victoria Hall
## 7.23.2022


The purpose of this project is to demonstrate different creation and retrieval strategies for accessing airline data stored in Parquet file formats. For this dataset, the key for each route will be the three-letter source airport code concatenated with the three-letter destination airport code and the two-letter airline.

In [1]:
#importing libraries
import os
import json
from pathlib import Path
import gzip
import hashlib
import shutil
import pandas as pd
import pygeohash
import uuid
import math


#setting directories and creating output folder
current_dir = Path(os.getcwd()).absolute()
results_dir = current_dir.joinpath('results')



if results_dir.exists():
    shutil.rmtree(results_dir)
    results_dir.mkdir(parents=True, exist_ok=True)


#reading in data
def read_jsonl_data():
    with gzip.open('openflights/routes.jsonl.gz', 'rb') as f:
        records = [json.loads(line) for line in f.readlines()]
        

    return records

records = read_jsonl_data()

In [2]:
records[1]

{'airline': {'airline_id': 410,
  'name': 'Aerocondor',
  'alias': 'ANA All Nippon Airways',
  'iata': '2B',
  'icao': 'ARD',
  'callsign': 'AEROCONDOR',
  'country': 'Portugal',
  'active': True},
 'src_airport': {'airport_id': 2966,
  'name': 'Astrakhan Airport',
  'city': 'Astrakhan',
  'country': 'Russia',
  'iata': 'ASF',
  'icao': 'URWA',
  'latitude': 46.2832984924,
  'longitude': 48.0063018799,
  'altitude': -65,
  'timezone': 4.0,
  'dst': 'N',
  'tz_id': 'Europe/Samara',
  'type': 'airport',
  'source': 'OurAirports'},
 'dst_airport': {'airport_id': 2990,
  'name': 'Kazan International Airport',
  'city': 'Kazan',
  'country': 'Russia',
  'iata': 'KZN',
  'icao': 'UWKD',
  'latitude': 55.606201171875,
  'longitude': 49.278701782227,
  'altitude': 411,
  'timezone': 3.0,
  'dst': 'N',
  'tz_id': 'Europe/Moscow',
  'type': 'airport',
  'source': 'OurAirports'},
 'codeshare': False,
 'equipment': ['CR2']}

### Partitioning data based on keys

In [3]:
#Creating dataset
#Flattening Records from JSON format
def flatten_record(record):
    flat_record = dict()
    for key, value in record.items():
        if key in ['airline', 'src_airport', 'dst_airport']:
            if isinstance(value, dict):
                for child_key, child_value in value.items():
                    flat_key = '{}_{}'.format(key, child_key)
                    flat_record[flat_key] = child_value
        else:
            flat_record[key] = value
    
    return flat_record


def create_flattened_dataset():
    parquet_path = results_dir.joinpath('results.parquet')
    return pd.DataFrame.from_records([flatten_record(record) for record in records])


In [4]:
#Creating dataframe and adding key value column
df = create_flattened_dataset()
df['key'] = df['src_airport_iata'].astype(str) + df['dst_airport_iata'].astype(str) + df['airline_iata'].astype(str)

In [5]:
df['key'] = df['key'].str.upper()

In [6]:
df.head(5)

Unnamed: 0,airline_airline_id,airline_name,airline_alias,airline_iata,airline_icao,airline_callsign,airline_country,airline_active,src_airport_airport_id,src_airport_name,...,dst_airport_longitude,dst_airport_altitude,dst_airport_timezone,dst_airport_dst,dst_airport_tz_id,dst_airport_type,dst_airport_source,codeshare,equipment,key
0,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2965.0,Sochi International Airport,...,49.278702,411.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],AERKZN2B
1,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,49.278702,411.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],ASFKZN2B
2,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,43.081902,1054.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],ASFMRV2B
3,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,49.278702,411.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],CEKKZN2B
4,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,82.650703,365.0,7.0,N,Asia/Krasnoyarsk,airport,OurAirports,False,[CR2],CEKOVB2B


In [7]:
#defining Partitions 
partitions = (
        ('A', 'A'), ('B', 'B'), ('C', 'D'), ('E', 'F'),
        ('G', 'H'), ('I', 'J'), ('K', 'L'), ('M', 'M'),
        ('N', 'N'), ('O', 'P'), ('Q', 'R'), ('S', 'T'),
        ('U', 'U'), ('V', 'V'), ('W', 'X'), ('Y', 'Z')
    )

#creating list of keys
kv_keys = []
for x, y in partitions:
    if x == y:
        kv_keys.append(x)
    else:
        kv = str(x + '-' + y)
        kv_keys.append(kv)
            
kv = []
for i in df['key']:
    for key in kv_keys:
        if i[0] in key:
            x = key
            kv.append(x)

df['kv_key'] = kv

In [8]:
df.head(5)

Unnamed: 0,airline_airline_id,airline_name,airline_alias,airline_iata,airline_icao,airline_callsign,airline_country,airline_active,src_airport_airport_id,src_airport_name,...,dst_airport_altitude,dst_airport_timezone,dst_airport_dst,dst_airport_tz_id,dst_airport_type,dst_airport_source,codeshare,equipment,key,kv_key
0,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2965.0,Sochi International Airport,...,411.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],AERKZN2B,A
1,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,411.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],ASFKZN2B,A
2,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,1054.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],ASFMRV2B,A
3,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,411.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],CEKKZN2B,C-D
4,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,365.0,7.0,N,Asia/Krasnoyarsk,airport,OurAirports,False,[CR2],CEKOVB2B,C-D


In [9]:
kv = results_dir.joinpath('kv')
df.to_parquet(kv,partition_cols='kv_key')

### Partitioning with Hashvalues

In [10]:
import hashlib

def hash_key(key):
    m = hashlib.sha256()
    m.update(str(key).encode('utf-8'))
    return m.hexdigest()

In [11]:
df['hash'] = df['key'].apply(hash_key)

In [12]:
df['hash'] = df['hash'].str.upper()

In [13]:
df['hash_key'] = df['hash'].astype(str).str[0]

In [14]:
df.head(10)

Unnamed: 0,airline_airline_id,airline_name,airline_alias,airline_iata,airline_icao,airline_callsign,airline_country,airline_active,src_airport_airport_id,src_airport_name,...,dst_airport_dst,dst_airport_tz_id,dst_airport_type,dst_airport_source,codeshare,equipment,key,kv_key,hash,hash_key
0,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2965.0,Sochi International Airport,...,N,Europe/Moscow,airport,OurAirports,False,[CR2],AERKZN2B,A,652CDEC02010381F175EFE499E070C8CBAAC1522BAC59A...,6
1,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,N,Europe/Moscow,airport,OurAirports,False,[CR2],ASFKZN2B,A,9EEA5DD88177F8D835B2BB9CB27FB01268122B635B241A...,9
2,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,N,Europe/Moscow,airport,OurAirports,False,[CR2],ASFMRV2B,A,161143856AF25BD4475F62C80C19F68936A139F653C1D3...,1
3,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,N,Europe/Moscow,airport,OurAirports,False,[CR2],CEKKZN2B,C-D,39AA99E6AE2757341BEDE9584473906EF1089E30820C90...,3
4,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,N,Asia/Krasnoyarsk,airport,OurAirports,False,[CR2],CEKOVB2B,C-D,143B3389BCE68EEA3A13AC26A9C76C1FA583EC2BD26EA8...,1
5,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,4029.0,Domodedovo International Airport,...,N,Europe/Moscow,airport,OurAirports,False,[CR2],DMEKZN2B,C-D,E4EC7B234CD26C4AFD736CD49D1D02E4EC5F294F14533A...,E
6,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,4029.0,Domodedovo International Airport,...,N,Europe/Moscow,airport,OurAirports,False,[CR2],DMENBC2B,C-D,30114A9DC60716ADBADF6C54124A899A66EEA47335FDAE...,3
7,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,4029.0,Domodedovo International Airport,...,,,,,False,[CR2],DMENAN2B,C-D,48B4743FFE7182E7F5F06E3B5F824A8D33FFD82386B53E...,4
8,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,4029.0,Domodedovo International Airport,...,N,Europe/Moscow,airport,OurAirports,False,[CR2],DMEUUA2B,C-D,F763F8F658C0FB8DCEDCF3B2CCBFF0AC4E860CB0B96EF9...,F
9,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,6156.0,Belgorod International Airport,...,N,Europe/Kaliningrad,airport,OurAirports,False,[CR2],EGOKGD2B,E-F,AC5C008DC056E24302F24C6B88B98F1CBE2044D8061699...,A


In [15]:
hash_ = results_dir.joinpath('hash')

df.to_parquet(hash_,partition_cols='hash_key')

### Simulating Geographically Distributed Data Centers. 

In [16]:
df.columns

Index(['airline_airline_id', 'airline_name', 'airline_alias', 'airline_iata',
       'airline_icao', 'airline_callsign', 'airline_country', 'airline_active',
       'src_airport_airport_id', 'src_airport_name', 'src_airport_city',
       'src_airport_country', 'src_airport_iata', 'src_airport_icao',
       'src_airport_latitude', 'src_airport_longitude', 'src_airport_altitude',
       'src_airport_timezone', 'src_airport_dst', 'src_airport_tz_id',
       'src_airport_type', 'src_airport_source', 'dst_airport_airport_id',
       'dst_airport_name', 'dst_airport_city', 'dst_airport_country',
       'dst_airport_iata', 'dst_airport_icao', 'dst_airport_latitude',
       'dst_airport_longitude', 'dst_airport_altitude', 'dst_airport_timezone',
       'dst_airport_dst', 'dst_airport_tz_id', 'dst_airport_type',
       'dst_airport_source', 'codeshare', 'equipment', 'key', 'kv_key', 'hash',
       'hash_key'],
      dtype='object')

In [17]:
df['src_airport_geohash'] = df.apply(
    lambda row: pygeohash.encode(row.src_airport_latitude, row.src_airport_longitude), axis=1
)
def determine_location(src_airport_geohash):
    locations = dict(
        central=pygeohash.encode(41.1544433, -96.0422378),
        west = pygeohash.encode(45.5945645, -121.1786823),
        east = pygeohash.encode(39.08344, -77.6497145)
        
    )
    
    #creating tuples of center name and distance
    central_dist = ('central',pygeohash.geohash_haversine_distance(src_airport_geohash,locations['central']))
    west_dist = ('west',pygeohash.geohash_haversine_distance(src_airport_geohash,locations['west']))
    east_dist = ('east',pygeohash.geohash_haversine_distance(src_airport_geohash,locations['east']))
    
    distances =[central_dist,west_dist,east_dist]
    
    
    #sorting list of distances based on value in each tuple
    distances.sort(key=lambda y: y[1])
    
    return(distances[0][0])
df['location'] = df['src_airport_geohash'].apply(determine_location)
df.to_parquet('results/geo', partition_cols=['location'])

In [18]:
df['location'].value_counts()

east       39761
west       23268
central     4634
Name: location, dtype: int64

### List of keys sorted by number of partitions

In [19]:
def balance_partitions(keys, num_partitions):
    keys.sort()
    partitions = []
    for i in range(0, len(keys), num_partitions):
        partitions.append(keys[i:i+num_partitions])
    return partitions
    

In [20]:
keys = ['B','D','A','C','Z']
balance_partitions(keys,2)

[['A', 'B'], ['C', 'D'], ['Z']]