In [None]:
import asyncio
import json
import logging
import time
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional

import aiohttp
import numpy as np
import pandas as pd
import pendulum
import requests
from sqlalchemy import (BigInteger, Boolean, Column, Float, Integer, String,
                        and_, create_engine, select, text)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import declarative_base, sessionmaker
# from utils.models import SlSpots
from utils.sl_models import SlSpots, SlRatings
from utils.schemas import SlApiEndpoints, SlApiParams
from utils.sl_data import SurflineSpots, SpotForecast
from utils.utils import LOCAL_AIRFLOW_PG_URI, LOCAL_PG_URI

In [None]:
import nest_asyncio
nest_asyncio.apply()

In [None]:
logging.basicConfig(level=logging.INFO)

In [None]:
Base = declarative_base()

In [None]:
engine = create_engine(LOCAL_AIRFLOW_PG_URI)
SessionLocal = sessionmaker(bind=engine)

## Note:
See `240220_sl_surf_spots.ipynb` for spot getter

In [None]:
response = requests.get("https://services.surfline.com/taxonomy?type=taxonomy&id=58f7ed51dadb30820bb3879c&maxDepth=0")

* You will not get Surfline forecast data without a valid Surfline premium login. Add your credentials to `.env.development`:
  ```
  SURFLINE_EMAIL=xxx
  SURFLINE_PASSWORD=yyy
  ```

##### Requests

`https://services.surfline.com/kbyg/spots/forecasts/{type}?{params}`


Type|Data
----|----
rating|array of human-readable and numeric (0-6) ratings
wave|array of min/max sizes & optimal scores
wind|array of wind directions/speeds & optimal scores
tides|array of types & heights
weather|array of sunrise/set times, array of temperatures/weather conditions

Param|Values|Effect
-----|------|------
spotId|string|Surfline spot id that you want data for. A typical Surfline URL is `https://www.surfline.com/surf-report/venice-breakwater/590927576a2e4300134fbed8` where `590927576a2e4300134fbed8` is the `spotId`
days|integer|Number of forecast days to get (Max 6 w/o access token, Max 17 w/ premium token)
intervalHours|integer|Minimum of 1 (hour)
maxHeights|boolean|`true` seems to remove min & optimal values from the wave data output
sds|boolean|If true, use the new LOTUS forecast engine
accesstoken|string|Auth token to get premium data access (optional)

Anywhere there is an `optimalScore` the value can be interpreted as follows:

Value|Meaning
-----|-------
0|Suboptimal
1|Good
2|Optimal


In [None]:
types = ["rating", "wave", "wind", "tides", "weather"]
params = ["spotId", "days", "intervalHours", "maxHeights", "sds", "accesstoken"]
base = "https://services.surfline.com/kbyg/spots/forecasts"

In [None]:
datapath = Path('./data')


In [None]:
df = pd.read_csv(datapath/'spot_list.csv')

In [None]:
df.head()

Get the spot `id` for 1st Street Jetty in Va Beach

In [None]:
jetty_id = df[df['names'].str.contains('1st Street Jetty', case=False, na=False)]['ids'].values[0]
jetty_id

In [None]:
ex_params = {params[0]: jetty_id}
ex_params

Surfline seems to change their spot IDs periodically. Check a spot on the website and pass the objectId from the url as a param to debug if this is the case. If they've changed you'll need to run the notebook `240220_sl_surf_spots.ipynb` as mentioned above to refresh the spots dataset

In [None]:
debug_params = {params[0]: "584204214e65fad6a7709ce7"}

In [None]:
res = requests.get(f"{base}/{types[0]}", params=ex_params)
res.status_code

In [None]:
type(res.json())

In [None]:
rating_json = res.json()

In [None]:
four_day_json = res.json()
if 'data' in four_day_json and 'rating' in four_day_json['data']:
    four_day_json['data']['rating'] = four_day_json['data']['rating'][:24]

In [None]:
def cull_extra_days(full_json):
    if 'data' in full_json and 'rating' in full_json['data']:
        full_json['data']['rating'] = full_json['data']['rating'][:24]

Drop extra days of forecast

In [None]:
cull_extra_days(four_day_json)

In [None]:
len(four_day_json['data']['rating'])

Convert a unix timestamp -> utc

In [None]:
pendulum.from_timestamp(rating_json['data']['rating'][0]['timestamp'], 'UTC')

In [None]:
pendulum.from_timestamp(rating_json['data']['rating'][int(72 / 3)-1]['timestamp'], 'UTC')

In [None]:
rating_json

The `utcOffset` field seems to be aware that I'm working in EST currently. Either that or it's the time coding for the spot itself.

Let's check a west coast spot to confirm how this is handled

In [None]:
df

In [None]:
la_jolla_id = df[df['names'].str.contains("La Jolla", case=False, na=False)]['ids'].values[0]
la_jolla_dict = {params[0]: la_jolla_id}

In [None]:
la_jolla_dict

In [None]:
pendulum.now("utc")

In [None]:
new_dict = {"spot_id": "test", "spot_name": "test_2", "date": pendulum.now("utc"), "forecast": four_day_json}

In [None]:
def fetch_from_sl_api(endpoint: SlApiEndpoints, param_type: SlApiParams, param: str):
    base_url = "https://services.surfline.com/kbyg/spots/forecasts"
    res = requests.get(f"{base_url}/{endpoint}", params={param_type: param})
    data = res.json()
    return data

In [None]:
test_res = fetch_from_sl_api(SlApiEndpoints.RATING.value, SlApiParams.SPOT_ID.value, param=jetty_id)

In [None]:
test_res

In [None]:
spot_ratings = []
for spot_id, spot_name in df[['ids', 'names']][:3].values:
    res = requests.get(f"{base}/rating", params={'spotId': spot_id})
    data = res.json()
    cull_extra_days(data)
    current_date = pendulum.now("utc")
    utc_date = current_date.strftime("%Y-%m-%d")
    data['spot_id'] = spot_id
    data['spot_name'] = spot_name
    data['utc_fetch_date'] = utc_date
    spot_ratings.append(data)
    # time.sleep()

In [None]:
# def fetch_spot_ratings(df, ):

In [None]:
pendulum.from_timestamp(rating_json['data']['rating'][0]['timestamp'], 'UTC')

In [None]:
ratings_df = pd.json_normalize(spot_ratings, record_path=['data', 'rating'], meta=['spot_id', 'spot_name', 'utc_fetch_date'] )

In [None]:
ratings_df

In [None]:
ratings_df['timestamp_utc'] = ratings_df['timestamp'].apply(lambda x: pendulum.from_timestamp(x).to_datetime_string())

Alright, so it looks like each spot's forecast starts at 12am *local time*, with the timestamp for that time in unix. To figure out the flat `UTC` time for each spot you can just apply the `utcOffset` that is included in response. 

In [None]:
ratings_df['timestamp_utc'] = pd.to_datetime(ratings_df['timestamp_utc'])

In [None]:
ratings_df

In [None]:
ratings_df['timestamp_utc'] = ratings_df.apply(lambda row: row['timestamp_utc'] + pd.Timedelta(hours=row['utcOffset']), axis=1)

In [None]:
ratings_df

In [None]:
ratings_df.dtypes

In [None]:
with SessionLocal() as db:
    matching_spots = []
    for spot in df['names']:
        stmt = text("""select * from spots where spot_name like :spot""")
        result = db.execute(stmt, {"spot": spot}).fetchall()
        if len(result) > 0:
            matching_spots.append(result)
    

In [None]:
len(matching_spots)

In [None]:
jetty_waves = fetch_from_sl_api(SlApiEndpoints.WAVE.value, SlApiParams.SPOT_ID.value, jetty_id)

In [None]:
jetty_waves['associated']

In [None]:
cull_extra_days(jetty_waves)

In [None]:
jetty_waves['data']['wave'] = jetty_waves['data']['wave'][:24]

In [None]:
len(jetty_waves['data']['wave'])

In [None]:
jetty_waves['associated']['spotId'] = jetty_id

In [None]:
jetty_waves['data']['spotId'] = jetty_id

In [None]:
jetty_meta_df = pd.json_normalize(jetty_waves)

In [None]:
jetty_meta_df.drop(['permissions.violations', 'permissions.data', 'data.wave', 'data.spotId'], inplace=True, axis=1)

In [None]:
jetty_meta_df

In [None]:
jetty_wave_df = pd.json_normalize(
    jetty_waves, record_path=["data", "wave"], meta=[["data", "spotId"]]
)
jetty_wave_df.drop("swells", inplace=True, axis=1)
jetty_wave_df.rename(columns={"power": "wave_power"}, inplace=True)

In [None]:
# jetty_wave_df['timestamp'] = jetty_wave_df['timestamp'].apply(lambda x: pendulum.from_timestamp(x).to_datetime_string())
# jetty_wave_df['timestamp'] = pd.to_datetime(jetty_wave_df['timestamp'])
# jetty_wave_df['timestamp_utc'] = jetty_wave_df.apply(lambda row: row['timestamp'] + pd.Timedelta(hours=row['utcOffset']), axis=1)

In [None]:
jetty_wave_df

In [None]:
jetty_swell_df = pd.json_normalize(
    jetty_waves,
    record_path=['data', 'wave', 'swells'],
    meta=[['data', 'wave', 'timestamp'], ['data', 'spotId']]
)

jetty_swell_df['swells_idx'] = jetty_swell_df.groupby('data.wave.timestamp').cumcount()

In [None]:
jetty_swell_df.head()

In [None]:
jetty_swell_df.rename({"power": 'swell_power'}, inplace=True)

In [None]:
jetty_swell_df['data.wave.timestamp'].value_counts()

In [None]:
jetty_meta_df.head()

In [None]:
jetty_wave_df.head()

In [None]:
jetty_swell_df.head(n=10)

In [None]:
combined_waves_df = pd.merge(
    jetty_wave_df,
    jetty_swell_df,
    how="inner",
    left_on=["timestamp", "data.spotId"],
    right_on=["data.wave.timestamp", 'data.spotId'],
)

In [None]:
len(combined_waves_df)

In [None]:
ratings_df

In [None]:
combined_waves_df

In [None]:
ratings_df

In [None]:
combined_waves_ratings_df = pd.merge(combined_waves_df, ratings_df, how='left', left_on=['timestamp', 'data.spotId'], right_on=['timestamp', 'spot_id']) 

In [None]:
combined_waves_ratings_df

In [None]:
combined_waves_ratings_df['timestamp'].value_counts

In [None]:
ratings_df['rating.value'].value_counts

In [None]:
ratings_df.head()

In [None]:
combined_df

In [None]:
combined_df = pd.merge(jetty_meta_df, combined_waves_df, how='cross')

In [None]:
combined_df

In [None]:
engine = create_engine(LOCAL_PG_URI)
SessionLocal = sessionmaker(bind=engine)

In [None]:
with SessionLocal() as db:
    stmt = select(SlSpots.spot_id)
    spots = db.execute(stmt).scalars().all()

In [None]:
def transform_sl_wave_data(data: Dict) -> pd.DataFrame:
    if not data:
        raise ValueError("Data is empty")

    meta_df = pd.json_normalize(data)
    meta_df.drop(
        ["permissions.violations", "permissions.data", "data.wave", "data.spotId"],
        inplace=True,
        axis=1,
    )

    wave_df = pd.json_normalize(
        jetty_waves, record_path=["data", "wave"], meta=[["data", "spotId"]]
    )
    wave_df.drop("swells", inplace=True, axis=1)
    wave_df.rename(columns={"power": "wave_power"}, inplace=True)

    swell_df = pd.json_normalize(
        jetty_waves,
        record_path=["data", "wave", "swells"],
        meta=[["data", "wave", "timestamp"], ["data", "spotId"]],
    )

    swell_df.rename(columns={"power": "swell_power"}, inplace=True)
    swell_df["swells_idx"] = swell_df.groupby("data.wave.timestamp").cumcount()

    combined_waves_df = pd.merge(
        wave_df,
        swell_df,
        how="inner",
        left_on=["timestamp", "data.spotId"],
        right_on=["data.wave.timestamp", "data.spotId"],
    )

    combined_df = pd.merge(meta_df, combined_waves_df, how='cross')

    return combined_df

In [None]:
data = []
for spot in spots[:2]:
    result = fetch_from_sl_api(SlApiEndpoints.WAVE.value, SlApiParams.SPOT_ID.value, param=spot)
    if result.get("associated"):
        result['associated']['spotId'] = spot
        result['data']['spotId'] = spot
    data.append(result)


In [None]:
full_df = pd.concat([transform_sl_wave_data(entry) for entry in data])

In [None]:
full_df

In [None]:
full_df.columns

In [None]:
full_df.dtypes

In [None]:
# class SlRatings(Base):
#     __tablename__ = 'sl_ratings'

#     id = Column(Integer, primary_key=True, autoincrement=True)
#     associated_units_temperature = Column(String)
#     associated_units_tideHeight = Column(String)
#     associated_units_swellHeight = Column(String)
#     associated_units_waveHeight = Column(String)
#     associated_units_windSpeed = Column(String)
#     associated_units_pressure = Column(String)
#     associated_utcOffset = Column(Integer)
#     associated_location_lon = Column(Float)
#     associated_location_lat = Column(Float)
#     associated_forecastLocation_lon = Column(Float)
#     associated_forecastLocation_lat = Column(Float)
#     associated_offshoreLocation_lon = Column(Float)
#     associated_offshoreLocation_lat = Column(Float)
#     associated_runInitializationTimestamp = Column(BigInteger)
#     associated_spotId = Column(String)
#     timestamp = Column(String)
#     probability = Column(Float)
#     utcOffset = Column(Integer)
#     wave_power = Column(Float)
#     surf_min = Column(Integer)
#     surf_max = Column(Integer)
#     surf_plus = Column(Boolean)
#     surf_humanRelation = Column(String)
#     surf_raw_min = Column(Float)
#     surf_raw_max = Column(Float)
#     surf_optimalScore = Column(Integer)
#     data_spotId = Column(String)
#     height = Column(Float)
#     period = Column(Integer)
#     impact = Column(Float)
#     swell_power = Column(Float)
#     direction = Column(Float)
#     directionMin = Column(Float)
#     optimalScore = Column(Integer)
#     data_wave_timestamp = Column(String)
#     swells_idx = Column(Integer)


In [None]:
from utils.models import SlRatings, create_tables

In [None]:
create_tables()

In [None]:
# class SpotForecast:
#     def __init__(self, database_uri):
#         self.spots = []
#         self.engine = create_engine(database_uri)
#         self.SessionLocal = sessionmaker(bind=engine)


#     def get_session(self):
#         return self.SessionLocal()


#     def fetch_all_forecasts(self) -> List[Dict[Any, Any]]:
#         data = []
#         for spot in self.spots[:2]:
#             result = self.fetch_forecast(
#                 SlApiEndpoints.WAVE.value, SlApiParams.SPOT_ID.value, param=spot
#             )
#             if result.get("associated"):
#                 result["associated"]["spotId"] = spot
#                 result["data"]["spotId"] = spot
#             data.append(result)
#         return data


#     def fetch_forecast(self, endpoint: SlApiEndpoints, param_type: SlApiParams, param: str) -> Dict[Any, Any]:
#         base_url = "https://services.surfline.com/kbyg/spots/forecasts"
#         res = requests.get(f"{base_url}/{endpoint}", params={param_type: param})
#         data = res.json()
#         return data


#     def fetch_spots_from_db(self) -> None:
#         with self.get_session() as db:
#             stmt = select(SlSpots.spot_id)
#             self.spots = db.execute(stmt).scalars().all()


#     def transform_wave_data(self, data: Dict) -> List[Dict[Any, Any]]:
#         if not data:
#             raise ValueError("Data is empty")

#         meta_df = pd.json_normalize(data, sep="_")
#         meta_df.drop(
#             ["permissions_violations", "permissions_data", "data_wave", "data_spotId"],
#             inplace=True,
#             axis=1,
#         )

#         wave_df = pd.json_normalize(
#             jetty_waves, record_path=["data", "wave"], meta=[["data", "spotId"]], sep="_"
#         )
#         wave_df.drop("swells", inplace=True, axis=1)
#         wave_df.rename(columns={"power": "wave_power"}, inplace=True)

#         swell_df = pd.json_normalize(
#             jetty_waves,
#             record_path=["data", "wave", "swells"],
#             meta=[["data", "wave", "timestamp"], ["data", "spotId"]],
#             sep="_",
#         )

#         swell_df.rename(columns={"power": "swell_power"}, inplace=True)
#         swell_df["swells_idx"] = swell_df.groupby("data_wave_timestamp").cumcount()

#         combined_waves_df = pd.merge(
#             wave_df,
#             swell_df,
#             how="inner",
#             left_on=["timestamp", "data_spotId"],
#             right_on=["data_wave_timestamp", "data_spotId"],
#         )

#         combined_df = pd.merge(meta_df, combined_waves_df, how="cross")
#         dict_record = combined_df.to_dict("records")

#         return dict_record


#     def load_to_pg(self, dict_record: List[Dict[Any, Any]]) -> None:
#         with self.get_session() as db:
#             db.bulk_insert_mappings(SlRatings, dict_record)
#             db.commit()

#     def process_all_spot_ratings(self):
#         self.fetch_spots_from_db()
#         data = self.fetch_all_forecasts()
#         for spot in data:
#             record = self.transform_wave_data(spot)
#             self.load_to_pg(record)


    

In [None]:
spot_forecast = SpotForecast(LOCAL_PG_URI)

In [None]:
spot_forecast.process_all_spot_ratings()

In [None]:
test_insert = spot_forecast.transform_wave_data(jetty_waves)

In [None]:
test_insert

In [None]:
test_insert_dict = test_insert.to_dict('records')

In [None]:
type(test_insert_dict[0])

In [None]:
with SessionLocal() as db:
    db.bulk_insert_mappings(SlRatings, test_insert_dict)
    db.commit()