# ETL for NASA Satellite and Coast Guard GPS Data
John Bonfardeci<br/>
2021-01-31

## Data Security

Access to the Raw data was provided by the customer by way of providing a publicly available S3 bucket.  In turn, the data that we make available for the customer will be in a publicly available S3 bucket to allow for easy access of the unclassified dataset.

In secure environments, data at rest is encrypted using a key stored in the AWS key store. This key is made accessible via IAM Roles and Security Groups to all applications running in the secure environment that must utilize the data. This allows for storage and transfer of encrypted data within the working environment.

Code that is used to build containers is run through the CI/CD pipline to be built, tested and scanned. Jenkins has plug-ins for open source scanning solutions such as Twistlok and Anchore Engine which we integrate into production environments. For this scenario, we utilized ECR with it's ability to use Clair, another open source tool to perform security scans of containers that get pushed from Jenkins.

Where possible, the team utilized approved containers from DoD IronBank as the base for the containers we produce. 

## 1. Pull Data - NiFi (Complete)
To pull the source data into our S3 environment, we utilized Apache NiFi.  The nifi folder in the Data Management directory of the Team Leidos VAULT git repo contains the XML template for the dataflow as well as instructions for deploying the nifi server with docker, importing the template, and making the needed configuration changes to direct the data into the appropriate bucket in your S3 instance. Below is a picture of the included template.

<img src="img/Nifi_Template.png" alt="NiFi Tempalte"/>

## Import Dependencies

In [None]:
import re
import os
import numpy as np
from datetime import datetime, timedelta
import math
import multiprocessing
from multiprocessing import Process, Pool
import fiona
import geopandas as gpd
from zipfile import ZipFile
from io import BytesIO, StringIO
import gzip
import warnings
warnings.filterwarnings("ignore")

### AWS File System Clients and Credentials

In [None]:
from aws import AWSConfig
        
def get_boto_client():
    cfg = AWSConfig()
    return cfg.get_boto_client()

def get_boto_resource():
    cfg = AWSConfig()
    return cfg.get_boto_resource()

def get_fs():
    cfg = AWSConfig()
    return cfg.get_s3fs()

def list_s3_files(folder, filtr=None):
    return AWSConfig().list_s3_files(folder)

# test
#list_s3_files(folder='Raw/AIS/')

### Helper Functions
The following code should only be run if you are reproducing the entire ingest process, starting with NiFi

In [None]:
def trim(s):
    """
    Trim leading/trailing white space from string.
    @param s<str>
    @returns string
    """
    if not s:
        return ''
    return re.sub(r'(^\s+|\s+$)', '', s)

def julian_to_iso(julian):
    """
    Convert Epoch year and Julian data fraction to ISO date format.
    @param julian<Int64> - Epoch year with Julian date decimal.
    @returns string
    """
    if not re.search(r'^\d+\.\d+$', trim(julian)):
        return julian
    
    yr = int(julian[:2])
    yr = (2000+yr) if yr < 21 else (1900+yr)
    day = math.floor(float(julian[2:]))
    fraction = float('.'+julian.split('.')[1])
    dec_hours = fraction*24
    startdate = datetime(year=yr, month=1, day=1)
    startdate += timedelta(days=day)
    startdate += timedelta(hours=dec_hours)
    return startdate.isoformat()

def execute_workers(jobs, process_chunk, num_processes=0):
    """
    Execute multiple processes on a list of jobs.
    @param jobs<List<any>> - Execute over a list of objects.
    @param process_chunk<function> - A closure function.
    @param num_processes<Int> - Number of cores to use. Default = 0 = all cores available.
    """
    if num_processes == 0:
        num_processes = multiprocessing.cpu_count()
        
    print('%i worker processes available.\n' % (num_processes))
            
    list_len = len(jobs)
    n = math.ceil(list_len/num_processes)
    
    chunks = [jobs[i:i + n] for i in range(0, list_len, n)]
    print('%i files will be divided among %i workers.\n' % (list_len, len(chunks)))
    procs = [] 
    
    for chunk in chunks:
        p = Process(target=process_chunk, args=(chunk,))
        procs.append(p)
        p.start()

    for proc in procs:
        proc.join()

def unzip_s3_files(folder='', out_folder='extracted', filtr=None, num_processes=0):
    """
    Unzip all files in a given AWS S3 folder. 
    Note: Requires AWS access keys stored in .aws directory of machine.
    
    @param folder<str> - The source folder to read from. Default is root folder.
    @param out_folder<str> - Where to write unzipped files. Default ='extracted'
    @param s3_bucket<str> - The AWS S3 bucket name. Default='af-vault'
    @param region_name<str> - The AWS region name for the S3 bucket. Default='us-gov-east-1'
    @param filtr<function> - A filter function for granular file path pattern matching. Default=None.
    """
    aws = AWSConfig()
    bucket_name = aws.s3_bucket
    s3r = aws.get_boto_resource()
    afvbucket = aws.get_bucket()
    files = afvbucket.objects.filter(Prefix=folder)
    print(files)
    zip_files = [f for f in files if str(f.key).endswith('.zip')]
    
    if filtr:
        zip_files = [f for f in zip_files if filtr(f.key)]
        
    if len(zip_files) == 0:
        print('The zip file list is empty. Stopping.')
        
    extracted_files = [f.key for f in files if not str(f.key).endswith('.zip')]  
    print('%i zip files were found. Extracting.' % (len(zip_files)))
    
    def process_chunk(zip_files):
        for item in zip_files:
            itemKey = item.key
            print('Reading %s...' % (itemKey))

            zip_obj = s3r.Object(bucket_name=bucket_name, key=itemKey)
            buffer = BytesIO(zip_obj.get()["Body"].read())
            z = ZipFile(buffer)

            for file_path in z.namelist():
                fname = os.path.basename(file_path)
                subfolder = file_path.split('/')[0]
                dest = str.format('{0}/{1}/{2}/{3}', folder, out_folder, subfolder, fname)
                if dest in extracted_files:
                    print('%s already exists. Skipping.' % (dest))
                    continue

                print('Unzipping %s to %s...' % (file_path, dest))
                s3r.meta.client.upload_fileobj(
                    z.open(file_path),
                    Bucket=bucket_name,
                    Key=dest
                )
                print('Unzipped %s.' % (file_path))      
            print('Closed %s.' % (itemKey))
    
    execute_workers(jobs=zip_files, process_chunk=process_chunk, num_processes=num_processes)

# 2. Convert AIS Data
1. Read AIS files from S3.
2. For each AIS file, unzip and read GDB layers in memory stream via Fiona.
3. Convert GDB layers to CSV files via GeoPandas.
4. Get and store min/max BaseDateTime from each Broadcast layers.
5. Write CSV layers back to S3.

## 2a. Extract AIS CSV Files

In [None]:
%%time
ais_filter = lambda filename: re.search(r'\/AIS_\d{4}.*.zip$', filename)
unzip_s3_files(folder='Raw/AIS', filtr=ais_filter, num_processes=0)
print('AIS files extracted.')

## 2b. Extract and Convert AIS GDB Files
Extracting the layers from each GDB file and convertng to CSV takes the longest of all ETL tasks.<br/>
We believe Geopandas is the bottlneck here. It may be possible to write our own converter from the Fiona obejct arrays to CSV files.<br/>This process can take 8 hours or more, but this step will only run the first time since it will skip GDB layer files that have already been extracted.

In [None]:
%%time
def convert_gdb_file(path, layers, out_folder, sample=False):
    """
    Unzip and extract all layers from a GDB file in an S3 bucket.
    
    @param path<str> - File path in S3.
    @layers List[str] - list of layer names to extract.
    @param out_folder<str> - The destination subfolder to extract layers to.
    @param sample<Bool> - If True, writes only the first 10 rows of each layer. use for testing only. Default = False.
    """
    date_range = []
    fs = get_fs()
    
    def _change_filename(s, ext):
        f = s.split('.')[0]
        return str.format('{0}.{1}', f, ext) 
    
    fname = os.path.basename(path)
    gdb_fname = _change_filename(fname, 'gdb')
    print('Reading: %s...\n' % (gdb_fname))

    with fs.open(path, 'rb') as f:
        print('%s opened. Reading buffer into memory stream...\n' % (path))
        buffer = bytes(f.read())
        with fiona.io.ZipMemoryFile(buffer) as z:
            z.seek(0)
            for layer in layers: 
                print('Reading layer %s > %s...\n' % (gdb_fname, layer))
                clean_layer_name = layer if not re.search(r'\_', layer) else layer.split('_')[-1]
                out_filename = gdb_fname.replace('.gdb', str.format('.{0}.csv', clean_layer_name))
                csv_fname = str.format("{0}/{1}", out_folder, out_filename)
                
                # Only process the file if it doesn't already exist.
                if fs.exists(csv_fname):
                    print('%s already exists. Skipping.\n' % (csv_fname))
                    continue
                
                with z.open(gdb_fname, driver='FileGDB', layer=layer) as collection:
                    print('Converting %s and layer %s...\n' % (gdb_fname, layer))
                    if sample:
                        collection = collection[:10]
                        
                    df = gpd.GeoDataFrame.from_features(collection)
                    # Writes CSV directly to S3 folder with Fiona and Boto3 under the hood.
                    df.to_csv(csv_fname, index=None)
                    
                    print('%s written.\n' % (csv_fname))
                    
            z.close()
            
        print('%s closed.\n' % (gdb_fname))
        f.close()

def convert_gdb_files(filelist, out_folder, num_processes=0, sample=False):
    """
    Unzip and extract all layers from a list of GDB files in an S3 bucket.
    
    @param filelist<List[str]> - List of file paths in S3.
    @param out_folder<str> - The destination subfolder to extract layers to.
    @param num_processes<int> - The number of worker processes to utilized. Default=0 which means all CPUs will be used.
    @param sample<Bool> - If True, writes only the first 10 rows of each layer. use for testing only. Default = False.
    @returns Int - Files extarcted.
    """
    print('Converting %i GDB files...\n' % (len(filelist)))
    
    gdb_definitions = {
        'Zone10_2009_01': ['Broadcast', 'Vessel', 'Voyage']
        , 'Zone10_2010_01': ['Voyage', 'Vessel', 'BaseStations', 'Broadcast', 'AttributeUnits']
        , 'Zone10_2011_01': ['Broadcast', 'Vessel', 'Voyage']
        , 'Zone10_2012_01': ['Broadcast', 'Vessel', 'Voyage']
        , 'Zone10_2013_01': ['Zone10_2013_01_Broadcast', 'Zone10_2013_01_Vessel', 'Zone10_2013_01_Voyage']
        , 'Zone10_2014_01': ['Zone10_2014_01_Broadcast', 'Zone10_2014_01_Vessel', 'Zone10_2014_01_Voyage']
    }
    
    def process_chunk(filelist):
        for path in filelist:
            fname = os.path.basename(path).split('.')[0]
            layers = layers = gdb_definitions[fname]
            convert_gdb_file(path, layers, out_folder, sample)
    
    execute_workers(jobs=filelist, process_chunk=process_chunk, num_processes=num_processes) 
        
if __name__ == "__main__":
    aws = AWSConfig()
    filtr = lambda filename: re.search(r'\/Zone\d+\_.*\.zip$', filename)
    filelist = list_s3_files(folder='Raw/AIS', filtr=filtr)
    filelist = [f for f in filelist if f.endswith('.zip') and not re.search(r'AIS_', f)]
    print('Found %i files to extract.' % (len(filelist)))
    
    if len(filelist) == 0:
        print('The AIS GDB file list is empty. Stopping.\n')
    else:
        print('Extracting GDB layers to CSV...\n')
        out_folder = str.format('s3://{0}/Raw/AIS/extracted', aws.s3_bucket)
        convert_gdb_files(filelist=filelist, out_folder=out_folder, num_processes=0, sample=False)
        print('GDB layers were extracted to CSV.\n')

## Convert TLE Data
1. Read TLE files from S3.
2. For each TLE file, unzip in memory stream via S3FS.
3. Convert two-line data to single row, extracting the satellite ID and datetime to new columns.
    <br/>Schema: SatID | BaseDateTime | Line 1 | Line 2
4. Filter TLE rows to match date ranges in AIS data: 2008-2017 in month of January.
5. Write converted TLE files back to S3.

In [None]:
def clean_tle_line(line):
    """
    Clean TLE data row.
    @param line<str> - raw line of TLE text
    @returns str
    """
    nl = '\n' if re.search(r'\n', line) and re.search(r'^2\s', line) else ''
    line = trim(line)
    
    row = []
    if re.search('^1\s', line):  
        line = re.sub(r'\\', '', line)
        
        # col 1 - SatId
        satId = trim( re.sub(r'\D', '', line[2:8]) )
        row.extend([satId, '|'])
        
        # col 5 - EpochYear, Convert Julian date to ISO: 
        julian_date = trim(line[17:32])
        if julian_date:
            julian_date = julian_to_iso(julian_date)
         
        row.extend([julian_date, '|'])     
        row.extend([line, '|'])
    else:
        row.append(line)
        
    return ''.join(row) + nl

def clean_tle_file(path, s3_bucket, filtr=None):
    """
    Clean TLE file. Each data row of a TLE (two-line-element) file is written on two lines.
    This function combines both lines into a delimited single row 
        with new columns for satellite ID and timestamp.
    """
    print('Reading %s...\n' % (path))
    
    tmp = path.split('/')
    fname = tmp[-1]
    filename = 'Raw/TLE/cleaned/'+fname
    
    fs = get_fs()
    
    if fs.exists( str.format('s3://{0}/{1}', s3_bucket, filename) ):
        print(filename, 'already exists. Skipping.\n')
        return True
    
    # Memory stream lines will be written to.
    mem_stream = StringIO()
    
    # Open file directly from AWS S3 bucket and read line by line.
    with fs.open(path, mode='rb') as file:
        print('Opening %s...\n' % (path))
        
        is_valid = False
        while True:
            line = file.readline()
            if not line:
                # End of file. Close stream and exit loop.
                file.close()
                print('Cleaned %s\n' % (filename))
                break

            str_line = line.decode() # Decode binary line to string.
            cleaned_line = clean_tle_line(str_line)
                    
            if filtr:
                # Run filter function on line, if present. Returns True|False.
                is_line_1 = re.search(r'^1\s+', str_line) # Line one of TLE data starts with "1 ..."
                if is_line_1:
                    if filtr(cleaned_line):
                        mem_stream.write(cleaned_line)
                        # If the filter function determines line 1 is valid, 
                        # give go-ahead to write line 2 on the next pass.
                        is_valid = True
                    else: 
                        # Line 1 is invalid. Skip line 2 on the next pass.
                        is_valid = False
                elif is_valid:
                    # Write line 2 when line is valid on the previous iteration.
                    mem_stream.write(cleaned_line)
            else:
                # No filter function present. Write line.
                mem_stream.write(cleaned_line)
                
        file.close()
        print('%s Closed.\n' % (path))
            
    contents = mem_stream.getvalue()
    s3 = get_boto_client()
    print('Writing %s...\n' % (filename))
    res = s3.put_object(Body=contents, Bucket=s3_bucket, Key=filename)
    s3 = None
    mem_stream.close()
    
    print('%s written and stream closed.\n' % (filename))
    status = res['ResponseMetadata']['HTTPStatusCode']
    
    print('HTTP Status Code = %i.\n' % (status))
    if status == 200:
        print('Writing %s succeeded!\n' % (filename))
    else:
        print('Writing %s failed!\n' % (filename))
    
def clean_tle_files(filelist, s3_bucket, filtr=None, num_processes=0):
    """
    Clean TLE data over N logical cores. Save as pipe delimitted datasets.
    @param filelist<List[str]> - List of files in S3 to clean.
    @param num_processes<int> - The number of logical CPU cores on a system. '0' = all cores.
    """
    def process_chunk(filelist):
        for path in filelist:
            clean_tle_file(path=path, s3_bucket=s3_bucket, filtr=filtr)

    execute_workers(jobs=filelist, process_chunk=process_chunk, num_processes=num_processes)  
    
if __name__ == '__main__': 
    aws = AWSConfig()
    s3_bucket = aws.s3_bucket
    
    def _filtr(filename):
        """
        Filter 2008-2018 only since this matches AIS data.
        """
        tmp = filename.split('/')
        fname = tmp[-1]
        n = re.sub(r'\D', '', fname)
        if len(n) == 0:
            return False
        yr = int(n)
        return yr > 2007 and yr < 2019
     
    def _date_filter(line):
        """
        Filter on year between 2009 and 2017 with Dec./Feb. overlap by one day.
            to match AIS date ranges, 2008-12-31 2017-01-31
        """
        vals = line.split('|')
        # e.g. 2004-08-18T05:19:09.399648
        if len(vals) > 2 and re.search(r'^\d{4}\-\d{2}\-\d{2}', vals[1]):
            ds = vals[1]
            dt = datetime.fromisoformat(ds)
            month = dt.month
            day = dt.day
            yr = dt.year
            return (yr >= 2008 and yr <= 2018) \
                and (month==1 or (month==12 and day == 31) or (month==2 and day == 1))
        return True
    
    s3_folder = 'Raw/TLE'
    filelist = list_s3_files(folder=s3_folder, filtr=_filtr)
    #filelist = ['s3://af-vault/Raw/TLE/test.txt']
    
    if len(filelist) == 0:
        print('The TLE file list is empty. Stopping.')
    else: 
        clean_tle_files(filelist=filelist, s3_bucket=s3_bucket, filtr=_date_filter, num_processes=0)
        print('%i TLE files were cleaned.' % (len(filelist)))