## Feature Pipeline for the `exploded_swells` feature group

This feature pipeline can be run on a schedule using github actions (see github repo for the example file).

### Requirements

 * hopsworks

In [None]:
#!pip install -U hopsworks

In [1]:
import os
import urllib.request  
import re
from itertools import chain
import pandas as pd
import numpy as np
import hopsworks
from datetime import datetime, timedelta
import sys

### Not app.hopsworks.ai ?

If you are running your own Hopsworks cluster (not app.hopsworks.ai):

 * uncomment the cell below
 * fill in details for your cluster
 * run the cel

In [2]:
#key=""
#with open("api-key.txt", "r") as f:
#    key = f.read().rstrip()
#os.environ['HOPSWORKS_PROJECT']="cjsurf"
#os.environ['HOPSWORKS_HOST']="35.187.178.84"
#os.environ['HOPSWORKS_API_KEY']=key    

### Backfill the feature group 

If you set `BACKFILL` to `True` in the cell below, and continue running all the cells, you will insert swell predictions from the `swells-clean.csv` file into the feature group.

When `BACKFILL` is `False`, it will download the latest predictions from the NOA 62081 Buoy and insert them into the feature group.

In [4]:
BACKFILL=False
if os.environ.get('BACKFILL') == "False":
    BACKFILL=False
hours=119
version=1
backfill_url="https://repo.hops.works/master/hopsworks-tutorials/data/cjsurf/swells-clean.csv"
buoy="62081"

In [7]:
def get_latest_url(today):
    pred_date = today.strftime("%Y%m%d")

    # There are 4 predictions per day at hours: "00", "06", "12", "18",
    h=int(today.strftime("%H"))
    found = False
    test_url = ""
    attempted_date = pred_date
    while not found:
        pred_hour = "00"
        if h > 5:
            pred_hour = "06"
        if h > 11:
            pred_hour = "12" 
        if h > 17:
            pred_hour = "18"
        test_url = "https://ftpprd.ncep.noaa.gov/data/nccf/com/gfs/prod/gfs." + attempted_date + \
      "/" + pred_hour + "/wave/station/bulls.t" + pred_hour + "z/gfswave." + buoy + ".bull"
        try:
            urllib.request.urlopen(test_url)
            found = True
        except urllib.error.HTTPError as e: 
            # assume 404, URL not found. Try previous time.
            h = h - 6
            if h < 0:
                attempted_date = attempted_date - timedelta(days=days_to_subtract)
                if (pred_date - attempted_date > 1):
                    sys.exit("ERROR: Could not download url: " + test_url) 
    print(test_url)
    return test_url, pred_hour

### Understand the Features

We store 119*4=476 columns in the `swell_predictions` feature group. It is 119 different swell predictions, one for each hour from hour=0, hour=2, ..., hour=238.  Each prediction is made using the `height`, `period`, and `direction` features. The `hits_at` feature is used to estimate the time at which the swell arrives at Lahinch beach.

In [8]:
secondary_columns=[]
for i in range(1,hours):
    j=i*2
    secondary_columns.append("height" + str(j))
    secondary_columns.append("period" + str(j))
    secondary_columns.append("direction" + str(j))
    secondary_columns.append("hits_at" + str(j))

secondary_columns

['height2',
 'period2',
 'direction2',
 'hits_at2',
 'height4',
 'period4',
 'direction4',
 'hits_at4',
 'height6',
 'period6',
 'direction6',
 'hits_at6',
 'height8',
 'period8',
 'direction8',
 'hits_at8',
 'height10',
 'period10',
 'direction10',
 'hits_at10',
 'height12',
 'period12',
 'direction12',
 'hits_at12',
 'height14',
 'period14',
 'direction14',
 'hits_at14',
 'height16',
 'period16',
 'direction16',
 'hits_at16',
 'height18',
 'period18',
 'direction18',
 'hits_at18',
 'height20',
 'period20',
 'direction20',
 'hits_at20',
 'height22',
 'period22',
 'direction22',
 'hits_at22',
 'height24',
 'period24',
 'direction24',
 'hits_at24',
 'height26',
 'period26',
 'direction26',
 'hits_at26',
 'height28',
 'period28',
 'direction28',
 'hits_at28',
 'height30',
 'period30',
 'direction30',
 'hits_at30',
 'height32',
 'period32',
 'direction32',
 'hits_at32',
 'height34',
 'period34',
 'direction34',
 'hits_at34',
 'height36',
 'period36',
 'direction36',
 'hits_at36',
 'height

Parse the data in the URL managed by NOA containing the predictions for the Buoy:

https://ftpprd.ncep.noaa.gov/data/nccf/com/gfs/prod/gfs.DATE/HOUR/wave/station/bulls.tHOURz/gfswave.62081.bull 


In [9]:
def process_url(buoy_url):
    out = []
    for line in urllib.request.urlopen(buoy_url):
        l = line.decode('utf-8') #utf-8 or iso8859-1 or whatever the page encoding scheme is
        row=[]
        if "Cycle" in l:
            regex = re.findall(r'Cycle.*:\s+([0-9]+)\s+([0-9]+)\s+UTC.*', l)
            if len(regex):
                thedate=regex[0]
        else:
            res = re.match(r'.*[|]\s+([0-9]+)\s+([0-9]+)\s+[|].*', l)
            waves = re.findall(r'[|]\s+([0-9\.]+)\s+([0-9\.]+)\s+([0-9]+)\s+[|]', l)
            if res is not None:
                row.append(thedate)
                row.append(res.groups())
            if len(waves):
                if len(waves) > 3:
                    # print("found > 3 waves, reduce to 3")
                    waves = waves[:3]
                b = []
                list(b.extend(item) for item in waves)
                row.append(b)
                my = tuple(chain.from_iterable(row))
                out.append(my)
    return out, thedate

### Feature engineering - select the best swell for Lahinch

There are between zero and 6 different swells. Extract the swell that is gives the expected highest surf at Lahinch, based on the angle of the swell direction (Lahinch has a swell direction window of around 20 degrees to 120 degrees.

In [10]:
primary_columns=['pred_dtime', 'hour', 'pred_day', 'pred_hour', 'height1', 'period1', 'direction1', 'height2', 
         'period2', 'direction2', 'height3', 'period3', 'direction3'] 

def is_valid_swell_direction(direction):
    if int(direction) > 180 or int(direction) < 20:
        return False
    return True

def best_height(row):
    best_secondary=2
    # Check which is best secondary swell - swell 2 or swell 3?
    if row['direction3'] != None:
        if is_valid_swell_direction(row['direction3']):
            if is_valid_swell_direction(row['direction2']) == False :
                best_secondary=3    
    best_direction = "direction" + str(best_secondary)
    best=1
    # Check which is best of swell 1 and secondary swell ?
    if row[best_direction] != None and is_valid_swell_direction(row[best_direction]) == True:
        if is_valid_swell_direction(row['direction1']) == False:
            best=best_secondary
                
    height = row['height' + str(best)]
    period = row['period' + str(best)]
    direction = row['direction' + str(best)]
        
    return pd.Series([height, period, direction])

# feature engineering - estimate the time at which the swell arrives at Lahinch from buoy
def estimate_hits_at(row):
    # baseline estimate
    hits_at = row['pred_dtime'] + row['hour_offset'] + timedelta(hours=8) 
    
    if float(row['direction']) < 80 and float(row['direction']) > 66:
        hits_at = hits_at - timedelta(hours=1)
    if float(row['direction']) <= 66 and float(row['direction']) > 50:
        hits_at = hits_at - timedelta(hours=2)
    if float(row['direction']) <= 50 and float(row['direction']) > 20:
        hits_at = hits_at - timedelta(hours=3)
    if float(row['period']) > 12:
        hits_at = hits_at - timedelta(hours=1)
    
    return pd.Series([hits_at])
    

if BACKFILL == True:
    df = pd.read_csv(backfill_url, parse_dates=['hits_at', 'pred_dtime'])
    num_rows = df.shape[0]
    print("num_rows: " + str(num_rows))
    rows = []
    for i in range(1, num_rows):
        row=[]
        for j in range(0, len(secondary_columns)):
            row.append("")
        if i % 2 == 0:
            rows.append(row)
    df_secondary = pd.DataFrame(rows, columns=secondary_columns)
    df = pd.concat([df, df_secondary],axis=1, join="outer")    
    
else: # BACKFILL == False
    today = datetime.now()
    url, pred_hour = get_latest_url(today)
    print(url)
    res,thedate=process_url(url)
    df = pd.DataFrame(res, columns=primary_columns)
    df['pred_dtime'] = pd.to_datetime(df['pred_dtime'], format='%Y%m%d')
    df.insert(loc=0, column="hour_offset", value=(df.reset_index().index*2))
    df['hour_offset'] = df.hour_offset.astype('timedelta64[h]')
    df['pred_dtime'] = df['pred_dtime'] + df.hour.astype('timedelta64[h]')


https://ftpprd.ncep.noaa.gov/data/nccf/com/gfs/prod/gfs.20220901/06/wave/station/bulls.t06z/gfswave.62081.bull
https://ftpprd.ncep.noaa.gov/data/nccf/com/gfs/prod/gfs.20220901/06/wave/station/bulls.t06z/gfswave.62081.bull


In [11]:
if BACKFILL == False:
    df[['height','period','direction']]=df.apply(best_height, axis=1)
    df[['hits_at']]=df.apply(estimate_hits_at, axis=1)
    df['beach_id'] = 1
    df.drop(['height1', 'period1', 'direction1', 'height2', 'period2', 'direction2', 'hour_offset',
              'height3', 'period3', 'direction3','hour', 'pred_day', 'pred_hour'], axis=1, inplace=True) 

df['height'] = pd.to_numeric(df['height'] , errors='coerce').astype(np.float64)
df['period'] = pd.to_numeric(df['period'] , errors='coerce').astype(np.float64)
df['direction'] = pd.to_numeric(df['direction'] , errors='coerce').astype(np.int64)


In [12]:
matches = ["height", "period", "direction", "hits_at"]

if BACKFILL == False:
    entry = []
    data = []
    for index, row in df.iterrows():
        if (index==0):
            data.append(row['beach_id'])
            data.append(row['pred_dtime'])
        if (index < hours):
            for m in matches:
                data.append(row[m])

    entry.append(data)
    first_columns=['beach_id', 'pred_dtime', 'height', 'period', 'direction', 'hits_at']    
    all_columns = first_columns + secondary_columns
    df2 = pd.DataFrame(entry, columns=all_columns)
else:    
    df2=df

for i in range(1,hours):
    for j in matches:
      df2[j+str(i*2)] = pd.to_numeric(df2[j+str(i*2)]).astype(np.float64)
df2

Unnamed: 0,beach_id,pred_dtime,height,period,direction,hits_at,height2,period2,direction2,hits_at2,...,direction232,hits_at232,height234,period234,direction234,hits_at234,height236,period236,direction236,hits_at236
0,1,2022-09-01 06:00:00,0.47,8.6,92,2022-09-01 14:00:00,0.49,8.1,94.0,1.662048e+18,...,15.0,1.662876e+18,2.45,9.5,41.0,1.662872e+18,2.26,9.2,42.0,1.66288e+18


### Connect to your Hopsworks cluster

If you only set the HOPSWORKS_API_KEY, it will assume you are connecting to app.hopsworks.ai.
Set HOPSWORKS_HOST and HOPSWORKS_PROJECT environment variables to connect to a different Hopsworks cluster.

In [13]:
project = hopsworks.login()
fs = project.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/398
Connected. Call `.close()` to terminate connection gracefully.


Write your features to the `swells_exploded` feature group.

In [14]:
swells_fg = fs.get_or_create_feature_group(name="swells_exploded",
                version=version,
                primary_key=["beach_id"],
                event_time="hits_at",
                description="Buoy surf height predictions",
                online_enabled=True,
                statistics_config={"enabled": True, "histograms": True, "correlations": True}
                )
swells_fg.insert(df2)
    

Uploading Dataframe: 0.00% |          | Rows 0/1 | Elapsed Time: 00:00 | Remaining Time: ?

Launching offline feature group backfill job...
Backfill Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/398/jobs/named/swells_exploded_1_offline_fg_backfill/executions


(<hsfs.core.job.Job at 0x7fe8bde54a00>, None)