In [2]:
import os
import pandas as pd
import pickle as pkl
import glob
from process_jtwc import *
from itertools import product
import pymongo
import requests, zipfile, io
import csv
import pdb
from datetime import datetime, timedelta

# JTWC

From https://www.metoc.navy.mil/jtwc/jtwc.html?best-tracks

# Download files for certain years and regions.

In [2]:
#download JTWC data for certain years

years = [ x for x in range(2000, 2001+1)]
regions = ['bio', 'bsh', 'bwp']
def get_tc_by_year_region(year=2018, region='bio'):
    url = make_url(year, region)
    resp = requests.get(url)
    if not resp.status_code // 100 == 2:
        return "Error: Unexpected response {}".format(resp)
    z = zipfile.ZipFile(io.BytesIO(resp.content))
    file_path = make_file_path(year, region)
    z.extractall(file_path)

def make_file_path(year, region, base='/storage/hurricane'):
    filePath = os.path.join(base, region, str(year))
    if not os.path.exists(filePath):
        os.makedirs(filePath)
    return filePath

def make_url(year=2018, region='bio'):
    return f'https://www.metoc.navy.mil/jtwc/products/best-tracks/{year}/{year}s-{region}/{region}{year}.zip'

def download_jtwc_files(years, regions, base='/storage/hurricane'):
    for year, region in product(years, regions):
        make_file_path(year, region, base)
        get_tc_by_year_region(year, region)

In [3]:
download_jtwc_files(years, regions)

## Convert files into a dataframe

In [3]:
df_lst = []
dr = glob.glob('/storage/hurricane/*/*/*.dat') # location of files taken from website.
ulist = []
for fn in dr:
#     print(fn)
    raw = pd.read_table(fn, header=None, delimiter=',', usecols=range(11))
    df_raw = convert_df(raw)
    ulist = ulist + df_raw.ID.unique().tolist()
    df_lst = df_lst + df_raw.to_dict(orient='records')
df = pd.DataFrame(df_lst)
df['source'] = 'JTWC'
df = df.applymap(lambda x: x.replace(' ', '') if isinstance(x, str) else x)

In [4]:
df = df.rename({"ID":'_id', "LONG": 'lon', 'press': 'pres', 'SEASON':'year'}, axis=1)
df.columns = [col.lower() for col in df.columns]
df.year = df.year.astype(np.int64)
df.time = df.time.astype(np.int64)
df.lat = df.lat.astype(np.float64)
df.lon = df.lon.astype(np.float64)
df['geoLocation'] = [ {"type": "point", "coordinates": [lng, lat]} for lng, lat in df[['lon', 'lat']].values]

In [5]:
def is_empty(x):
    if isinstance(x, str):
        return x == ''
    elif isinstance(x, float):
        return np.isnan(x)
    else:
        return False

clean_dict_keys = lambda my_dict: list(filter(lambda k: not is_empty(my_dict[k]), my_dict))
def clean_dict(my_dict):
    new_dict = dict()
    keys = clean_dict_keys(my_dict)
    for key in keys:
        new_dict[key] = my_dict[key]
    return new_dict

def make_docs(df):
    docs = []
    keys = ['_id', 'name', 'num', 'source']
    key_types = {'_id': str, 'name':str , 'num': int, 'source': str}
    cols = [col for col in df.columns if col not in keys]
    for _id, df_id in df.groupby(['_id']):
        df_id.shape
        doc = {}
        for key in keys:
            tpe = key_types[key]
            assert len(df_id[key].unique()) == 1, 'nondistinct id'
            doc[key] = df_id[key].astype(tpe).iloc[0]
            if key == 'num':
                doc[key] = int(doc[key])
        traj_data = df_id[cols].to_dict(orient='records')
        traj_data = [clean_dict(x) for x in traj_data]
        year = int(df_id.year.iloc[0])
        doc['year'] = year
        doc['startDate'] = df_id.timestamp.min()
        doc['endDate'] = df_id.timestamp.max()
        doc['traj_data'] = df_id[cols].to_dict(orient='records')
        doc['_id'] = doc['_id'] + '_' + doc['source']
        if doc['name'] == 'UNNAMED':
            del doc['name']
        docs.append(doc)
    docs = [clean_dict(x) for x in docs]
    return docs

In [6]:
jtwc_docs = make_docs(df)

# HURDAT2
from https://www.nhc.noaa.gov/data/#hurdat

In [8]:
pacific_filename = '/storage/hurricane/hurdat2/pacific.csv'
atlantic_filename = '/storage/hurricane/hurdat2/atlantic.csv'
stormStartCh = ['EP', 'CP', 'AL']

def make_trop_cyc_list(filename):
    startIdx = []
    tcs = []
    with open(filename) as csvfile:
        spamreader = csv.reader(csvfile, delimiter=',')
        for idx, row in enumerate(spamreader):
            if len(row) == 0:
                continue
            if row[0][0:2] in stormStartCh:
                startIdx.append(idx)
                _id = row[0]
                name= row[1]
                num = row[2]
                storm = [_id, name, num]
            else:
                tc = storm + row
                tcs.append(tc[0:11])
    return tcs

pacific_tcs = make_trop_cyc_list(pacific_filename)
atlantic_tcs = make_trop_cyc_list(atlantic_filename)

In [9]:
cols = ['_id', 'name', 'num', 'date', 'time', 'l', 'class', 'lat', 'lon', 'wind', 'pres']
def convert_lat_lon(strL, postiveDir='N'):
    L = float(strL[:-1].replace(' ', ''))
    if not postiveDir in strL:
        L *= -1
    return L

def convert_time(time):
    hour = (int(time)/100) %12 
    return hour

def make_cyc_df(tcs):
    df = pd.DataFrame(tcs, columns=cols)
    df['year'] = df.date.apply(lambda x: int(x[0:4]))
    df = df[df['year'] >= 2000]
    df = df.dropna(axis=0, how='any', subset=['lat', 'lon'])
    df = df.applymap(lambda x: x.replace(' ', '') if isinstance(x, str) else x)
    df.lon = df.lon.apply(lambda lon: convert_lat_lon(lon, 'E')).astype(np.float64)
    df.lat = df.lat.apply(lambda lat: convert_lat_lon(lat, 'N')).astype(np.float64)
    df.pres = df.pres.astype(np.int64)
    df.pres = df.pres.replace(-999, np.nan)
    df.wind = df.wind.astype(np.int64)
    df.num = df.num.astype(np.int64)
    df.date = df.date.apply(lambda x: x[0:4] + '-' + x[4:6] + '-' + x[6:8])
    datetimes = df.date.values + ' ' + df.time.values
    df['timestamp'] = pd.to_datetime(datetimes, format='%Y-%m-%d %H%M')
    df['source'] = 'HURDAT2'
    df['geoLocation'] = [ {"type": "point", "coordinates": [lng, lat]} for lng, lat in df[['lon', 'lat']].values]
    df.time = df.time.astype(np.int64)
    return df

df_pacific = make_cyc_df(pacific_tcs)
df_atlantic = make_cyc_df(atlantic_tcs)

In [10]:
hurdat2_docs = make_docs(df_pacific) + make_docs(df_atlantic)

In [11]:
df_lane = df_pacific[(df_pacific['name'] == 'LANE') & (df_pacific['year'] == 2018)]
df_lane.head()

Unnamed: 0,_id,name,num,date,time,l,class,lat,lon,wind,pres,year,timestamp,source,geoLocation
28126,EP142018,LANE,64,2018-08-13,1200,,LO,10.9,-114.7,20,1009.0,2018,2018-08-13 12:00:00,HURDAT2,"{'type': 'point', 'coordinates': [-114.7, 10.9]}"
28127,EP142018,LANE,64,2018-08-13,1800,,LO,11.0,-115.7,20,1009.0,2018,2018-08-13 18:00:00,HURDAT2,"{'type': 'point', 'coordinates': [-115.7, 11.0]}"
28128,EP142018,LANE,64,2018-08-14,0,,LO,11.1,-116.6,25,1009.0,2018,2018-08-14 00:00:00,HURDAT2,"{'type': 'point', 'coordinates': [-116.6, 11.1]}"
28129,EP142018,LANE,64,2018-08-14,600,,LO,11.1,-117.4,25,1009.0,2018,2018-08-14 06:00:00,HURDAT2,"{'type': 'point', 'coordinates': [-117.4, 11.1]}"
28130,EP142018,LANE,64,2018-08-14,1200,,LO,11.1,-118.4,25,1009.0,2018,2018-08-14 12:00:00,HURDAT2,"{'type': 'point', 'coordinates': [-118.4, 11.1]}"


In [12]:
test_doc = make_docs(df_lane)

# add docs to mongoDB database

In [13]:
def create_collection(dbName, collectionName, init_collection):
    dbUrl = 'mongodb://localhost:27017/'
    client = pymongo.MongoClient(dbUrl)
    db = client[dbName]
    coll = db[collectionName]
    coll = init_collection(coll)
    return coll

def init_tc_collection(coll):
    coll.create_index([('name', pymongo.DESCENDING)])
    coll.create_index([('startDate', pymongo.DESCENDING)])
    coll.create_index([('endDate', pymongo.DESCENDING)])
    coll.create_index([('startDate', pymongo.DESCENDING), ('endDate', pymongo.DESCENDING)])
    return coll

def init_tc_traj_collection(coll):
    return coll

In [20]:
#insert docs
dbName='argo'
collectionName = 'tc'
coll = create_collection(dbName, collectionName, init_tc_collection)
coll.drop()
coll.insert_many(jtwc_docs)
coll.insert_many(hurdat2_docs)

BulkWriteError: batch op errors occurred

In [16]:
coll = create_collection('argo-express-test', 'tc', init_tc_collection)
coll.drop()
coll.insert_many(test_doc)

<pymongo.results.InsertManyResult at 0x7f8015eb4a08>

# add southern ocean storms

In [17]:
def make_so_cyc_df(df):
    df['year'] = df.timestamp.apply(lambda x: x.year)
    df['time'] = df.timestamp.apply(lambda x: x.hour*100)
    df['num'] = 0
    df['class'] = 'Southern Hemisphere Cylone'
    df['pres'] = 0
    df['wind'] = 0
    df.lon = df.lon.astype(np.float64)
    df.lat = df.lat.astype(np.float64)
    df.date = df.timestamp.apply(lambda x: datetime.strftime(x, '%y-%m-%d'))
    df['source'] = 'Priestley'
    df['geoLocation'] = [ {"type": "point", "coordinates": [lng, lat]} for lng, lat in df[['lon', 'lat']].values]
    return df

def parse_file(file):
    with open(file) as f:
        lines = f.readlines()
    sdxs = []
    storm = False
    for idx, line in enumerate(lines):
        if 'TRACK_ID' in line:
            sdx = idx
            sdxs.append(idx)
    linesIdx = [ (sdxs[idx], sdxs[idx+1]) for idx in range(len(sdxs)-1)]
    storms = [lines[start:end] for start, end in linesIdx]    
    return storms

def create_storm_df(storm, basedate, name):
    #create base_id
    trackNumber = storm.pop(0).split()[1]
    pointNumber = storm.pop(0).split()[1]
    _id = '_'.join((name, year, trackNumber, pointNumber))

    #parse thru storm lines for time, lat, lon
    rows = []
    for line in storm:
        rows.append([float(x) for x in line.split()])
    df = pd.DataFrame(rows, columns=('timestamp', 'lon', 'lat', 'intensity'))
    #convert timestamp
    df['timestamp'] = df['timestamp'].apply(lambda x: basedate + timedelta(hours=deltaT*x))
    df['_id'] = _id
    df['name'] = name
    return df

def parse_filename(file):
    year = file.split('/')[-1].split('_')[1][0:4]
    name = '_'.join(file.split('/')[4:6])
    basedate = datetime.strptime(year + '-01-01', '%Y-%m-%d')
    return year, name, basedate

def insert_one_by_one(docs, coll):
    for doc in docs:
        nDup = 0
        try:    
            coll.insert_one(doc)
        except pymongo.helpers.DuplicateKeyError:
            nDup += 1
            pass
        except Exception as err:
            pdb.set_trace()
            print(err)   
    print(f'{nDup} duplicates found')

def insert_many_tc_docs(docs, coll):
    try:
        coll.insert_many(docs)
    except Exception as err:
        print(err)
        print('trying to add one at a time')
        insert_one_by_one(docs, coll)

Collection(Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), 'argo-express-test'), 'tc')

In [21]:
sonFiles = glob.glob('/storage/hurricane/priestley/SON/SH_FILT/ERA5_20*_TRACKS_FILTERED_neg')
djfFiles = glob.glob('/storage/hurricane/priestley/DJF/SH_FILT/ERA5_20*_TRACKS_FILTERED_neg')
deltaT = 6 #hours
files = djfFiles + sonFiles

# dbName='argo'
# collectionName = 'tc'
# coll = create_collection(dbName, collectionName, init_tc_collection)
print(f'{len(files)} files')
docs = []
totalDocs = 0
for file in files:
    storms = parse_file(file)
    #get base date
    year, name, basedate = parse_filename(file)
    for storm in storms:
        df = create_storm_df(storm, basedate, name)
        df = make_so_cyc_df(df)
        docs += make_docs(df)
    totalDocs += len(docs)
    print(f'inserting {len(docs)} docs')
    
    insert_many_tc_docs(docs, coll)
    docs = []
print(f'attempted {totalDocs} inserts')




38 files


  # Remove the CWD from sys.path while we load stuff.


inserting 393 docs
inserting 405 docs
inserting 383 docs
inserting 398 docs
inserting 404 docs
inserting 433 docs
inserting 426 docs
inserting 386 docs
inserting 446 docs
inserting 375 docs
inserting 379 docs
inserting 375 docs
inserting 377 docs
inserting 394 docs
inserting 408 docs
inserting 411 docs
inserting 406 docs
inserting 404 docs
inserting 436 docs
inserting 432 docs
inserting 445 docs
inserting 452 docs
inserting 472 docs
inserting 452 docs
inserting 463 docs
inserting 461 docs
inserting 457 docs
inserting 457 docs
inserting 456 docs
inserting 446 docs
inserting 441 docs
inserting 447 docs
inserting 465 docs
inserting 473 docs
inserting 459 docs
inserting 448 docs
inserting 453 docs
inserting 461 docs
attempted 16279 inserts
