# Wind Analysis Of Northern Yucatan Coast for Kiteboarding

## Intro

### Libraries

This is the list of libraries that are needed. Most of these libraries are available in Anaconda and Databricks with exception of calmap for calendar plots and astral for sunrise and sunset times. They have to be installed to run this notebook.

In [1]:
%matplotlib inline
# Dataframe:
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.dataframe import DataFrame

# Daylight UDF:
import astral
from datetime import datetime
from pyspark.sql.types import TimestampType

# Plot related
import matplotlib.pyplot as plt
import calendar
import calmap

# Math
import math
import numpy as np

### Constants

In [2]:
FIVE_MINUTES_IN_SECS = (5*60) # Seconds
TWO_HOURS_IN_5MINS = (2*60)//5 # This iw hos many 5 min periods there is in 2 hours in 
MIN_SAMPLE_COUNT = 100
DUMP_CSV=True

### Transform

In [3]:
def transform(self, f):
    return f(self)

DataFrame.transform = transform

## The Data

### Data Source

The weather station (PWS) used for this analysis is CINVESTAV Telchac located in the Northern Yucantan coast. Coordinates of the station are latitude 21.341108 and longitude -89.305756. This is a single PWS project so spark will be configured in the timeszone of the project and there will be a location object for the PWS.

In [4]:
pwsID = 'IYUCATNT2'
pwsTz = 'America/Merida'
sensorLoc = astral.Location(("Telchac", "Mexico", 21.341108, -89.305756, pwsTz, 0))
spark.conf.set("spark.sql.session.timeZone", pwsTz)

The information was gathered from Weather Underground web site that provides a free API call to gather the information of a particular day. The weather station ID for this location is IYUCATNT2. The response comes almost in a CSV format except for the insertion of br html tags.

For instance the following URL will provide the data for January 1st, 2016:

https://www.wunderground.com/weatherstation/WXDailyHistory.asp?ID=IYUCATNT2&month=1&day=1&year=2016&format=1

A crawler script (with some wait between request) was created to dump the sensor information and do the simple task of converting it to a CSV (removing the inserted tags). The data is organized in subdirectories for the station ID, year and month for easy management.

In [5]:
# Databricks location:
#fileStorePath = "/FileStore/tables/test1"
    
fileStorePath = "./"

unformattedWindDF = spark.read \
    .format("csv") \
    .option("header","true") \
    .load("{}/dailyHistory/{}/*/*/*.csv".format(fileStorePath,pwsID))

#unformattedWindDF.show()

### Dataset

The sensor has historical data only from the middle of 2008 so the range for this analysis is from 2009 to 2018.

The table that Weather Underground returns has a lot of columns that represent measurements from the PWS like pressure, temperature, dew point, etc. but for this analysis we only focused on the following:

__Time__ : This is the timestamp at which the sample is taken in the local timezone of the PWS. There is another column that has the time in UTC but not used in this analysis. This column is kept as a string for comparison.

__WindDirectionDegrees__ : This is the wind direction in degrees. The cardinal directions map as follow: North is 0, East is 90, South is 180 and West is 270. This is casted to integer.

__WindSpeedMPH__ : Wind speed measured in miles per hour (mph). There is also a gust wind speed column but not used in this analysis. This is casted to a double.

The following columns are derived from the previous:

__TS__ : Conversion of the _Time_ string to a timestamp. Casting from string to timestamp was the best method posible found to handle timestamps in a the PWS timezone from the local timezone in spark.

__Date__ : Extraction of the date portion of the timestamp.

__TSEpochSec__ : Number of seconds since the UNIX epoch time of the current timestamp. Obtained by casting the timestamp to a long.


In [6]:
windFormattedDF = unformattedWindDF.selectExpr(
    "Time",
    "to_timestamp(Time,'yyyy-MM-dd HH:mm:ss') as TS",
    "cast(WindDirectionDegrees as integer)",
    "cast(WindSpeedMPH as double)") \
    .withColumn("Date",col("TS").cast("date")) \
    .withColumn("TSEpochSec",col("TS").cast("long"))

windFormattedDF.printSchema()

root
 |-- Time: string (nullable = true)
 |-- TS: timestamp (nullable = true)
 |-- WindDirectionDegrees: integer (nullable = true)
 |-- WindSpeedMPH: double (nullable = true)
 |-- Date: date (nullable = true)
 |-- TSEpochSec: long (nullable = true)



## Data Cleanup

### Sample Set Cleanup

#### Bad Readings

It was noticed that readings where the WindSpeedMPH is 0.0 come with a very high negative WindDirectionDegrees. This could add some noise when resampling so they are converted to 0.

There are also some -999.0 values for WindSpeedMPH which seem to be the way the sensor reports some NA or lack of a good reading, they also come with a very high positive WindDirectionDegrees (more than 360). These readings are dropped.

Finally a few readings come with a very high WindDirectionDegrees. Again these are dropped.

In [7]:
windNoBadReadingsDF = windFormattedDF \
    .withColumn(
        "WindDirectionDegrees",
        when(col("WindDirectionDegrees")==-737280,0).otherwise(col("WindDirectionDegrees"))) \
    .filter(col("WindSpeedMPH")>=0.0) \
    .filter(col("WindDirectionDegrees").between(0,360))

#### Small Daily Sample Set

After getting rid of the bad readings the next step is to find the days that don’t have enough readings and then drop them. The threshold used for this notebook is 100 but it is parametrized in the constant section.

In [8]:
def enoughSamplesPerDayTrans(srcDF, minSampleCount) :
    return ( srcDF
        .groupBy("Date")
        .agg(count(lit(1)).alias("SampleCount"))
        .filter(col("SampleCount")>=lit(minSampleCount)) )
    
windDF = windNoBadReadingsDF.join(
    enoughSamplesPerDayTrans(windFormattedDF,MIN_SAMPLE_COUNT),
    ["Date"],
    "left_semi")

#windDF.count()

### Resampling

The minimum difference between readings of the same day is intended to be 5 minutes but there are situations when the reading for the next 5 min is not taken or the period shifts or in rare occasions is less than 5 min.

For our final analysis we would like to find periods of contiguous readings. For this it is easier if the readings are equally spaced in 5 min samples. This will be done by resampling and interpolating the unknown intermediate values from the known ones.

Spark currently doesn’t have the same support as R with regards of time series so this has to be done in different stages.

#### Combining Current and Next Reading with a Window

We need to apply a window function to get the next reading (lead) together with the current one in order to interpolate values between them. The last value of the window will have a null and will be dropped since it is not crucial for this analysis (most likely dropped later).

In [9]:
windowSpec = Window.partitionBy("Date").orderBy("TS")

windPairDF = windDF \
    .withColumn("nextWindDirectionDegrees",lead("WindDirectionDegrees").over(windowSpec)) \
    .withColumn("nextWindSpeedMPH",lead("WindSpeedMPH").over(windowSpec)) \
    .withColumn("nextTSEpochSec",lead("TSEpochSec").over(windowSpec)) \
    .dropna(subset=["nextTSEpochSec"])

#### Next Wind Direction With Shortest Angle

In case of boundary conditions between 0 and 360 degrees representing the north we will have a problem when doing interpolation. For instance interpolating values between 5 degrees and 355 degrees may take values between these two numbers which will be incorrect. To overcome this problem we have to calculate what is the shortest angle separation between those 2 readings and then calculate the new next direction based on this.

The one liner for the shortest angle was obtained from a discussion in stack overflow:

$$( ( (\theta-\phi) \% 2\pi ) + 3\pi ) \% 2\pi ) - \pi$$

Sources:
https://stackoverflow.com/questions/2708476/rotation-interpolation/14498790#14498790
https://math.stackexchange.com/questions/2144234/interpolating-between-2-angles


In [10]:
def shortestAngleExpr(fromAngle, toAngle) :
    return ( ( ( ( (toAngle-fromAngle) % lit(360) ) + lit(540) ) % lit(360) ) - lit(180) )

def shortestNextAngleExpr(fromAngle, toAngle) :
    return fromAngle + shortestAngleExpr(fromAngle, toAngle)

windPairFixDirDF = windPairDF \
    .withColumn("oldNextWindDeg",
                col("nextWindDirectionDegrees")) \
    .withColumn("nextWindDirectionDegrees",
                shortestNextAngleExpr(col("WindDirectionDegrees"), col("nextWindDirectionDegrees")))

#### Calculating Slopes and y Intercept for Linear Equations

The general formula for a linear equation is:

$$y=mx+b$$

Given 2 points (x1,y1) and (x2,y2) we can calculate the slope (m) and the y intercept (b). 

The value used as x is TSEpochSec and the y’s are WindDirectionDegrees and nextWindSpeedMPH (We need 2 independent linear equations).


In [11]:
# Custom Column functions to calculate slope and intercept given 2 points:
def slopeExpr(x1,y1,x2,y2) :
    return (
        (y2-y1) 
        /(x2-x1)
    )

def interceptExpr(x1,y1,x2,y2) :
    return (
      (x1*y2-x2*y1)
        /(x1-x2)
    )

def linearParamTrans(srcDF, paramColName, indColName) :
    """Transformation to create linear parameters for a set of colums."""
    return srcDF \
        .withColumn("m"+paramColName,
                    slopeExpr(col(indColName),col(paramColName),
                              col("next"+indColName),col("next"+paramColName))) \
        .withColumn("b"+paramColName,
                    interceptExpr(col(indColName),col(paramColName),
                                  col("next"+indColName),col("next"+paramColName)))

windLinearDF = windPairFixDirDF \
    .transform(lambda df : linearParamTrans(df,"WindSpeedMPH","TSEpochSec")) \
    .transform(lambda df : linearParamTrans(df,"WindDirectionDegrees","TSEpochSec"))


#### Resampling Points Creation

The next step is to create the desired sample points given the range. For instance if the data set contains 2 sets of sample points one at 1:02 and another one at 1:12, the desired sample points after resampling between those two readings will be 1:05 and 1:10.

Starting in spark version 2.4.0 there is a sequence operation that can be applied to a 2 columns returning an array. For older versions it is required to create a UDF that provides the same functionality using the python range operator. The data set is then exploded on the column with the array of resampling values. For Sequence the end of the range has to be greater than the start for the python range is not required.

In [12]:
from distutils.version import StrictVersion
from pyspark.sql.types import ArrayType, LongType

# To be more consistent with spark 2.4 the new version of the UDF is just a wrapper for range in python
def rangeWrapper(start, end, step) :
    # In python the end is not included in the range, in spark/scala it is
    return list(range(start,end+1,step))

sequenceUdf = udf(rangeWrapper,ArrayType(LongType()))

#Finds the next multiple of 5 min to start the range
def rangeStartExpr(start,step) :
    return (((start+step-lit(1))/step).cast("long")*step)

def rangeEndExpr(end) :
    return (end-lit(1))

# Wrapper for spark version UDF vs native
def sequenceVer(start, end, step) :
    if StrictVersion(spark.version) >= StrictVersion('2.4.0') :
        return sequence(start, end, step)
    else :
        return sequenceUdf(start, end, step)

windResampleDF = windLinearDF \
    .withColumn("rangeStart",rangeStartExpr(col("TSEpochSec"),lit(FIVE_MINUTES_IN_SECS))) \
    .withColumn("rangeEnd",rangeEndExpr(col("nextTSEpochSec"))) \
    .filter(col("rangeEnd")>=col("rangeStart")) \
    .withColumn("range",
                sequenceVer(
                    col("rangeStart"),
                    col("rangeEnd"),
                    lit(FIVE_MINUTES_IN_SECS)))

windResampleExpDF = windResampleDF \
    .withColumn("interTSEpochSec",explode(col("range")))

windResampleExpDF.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Time: string (nullable = true)
 |-- TS: timestamp (nullable = true)
 |-- WindDirectionDegrees: integer (nullable = true)
 |-- WindSpeedMPH: double (nullable = true)
 |-- TSEpochSec: long (nullable = true)
 |-- nextWindDirectionDegrees: integer (nullable = true)
 |-- nextWindSpeedMPH: double (nullable = true)
 |-- nextTSEpochSec: long (nullable = true)
 |-- oldNextWindDeg: integer (nullable = true)
 |-- mWindSpeedMPH: double (nullable = true)
 |-- bWindSpeedMPH: double (nullable = true)
 |-- mWindDirectionDegrees: double (nullable = true)
 |-- bWindDirectionDegrees: double (nullable = true)
 |-- rangeStart: long (nullable = true)
 |-- rangeEnd: long (nullable = true)
 |-- range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- interTSEpochSec: long (nullable = true)



#### Interpolation of Resampling Points

At this point the resampling points are located in the column in interTSEpochSec. The next step is to apply the corresponding linear equation to each point to get new interpolated values in columns interWindSpeedMPH and interWindDirectionDegrees. The naming convention of the columns was carefully chosen to be able to replicate the same operations to different columns only using string operations.

In [17]:
def linearEquationExpr(x,m,b) :
    return (m*x + b)

def linearInterpolationTrans(srcDF, paramColName, indColName) :
    """Transformation to interpolate values."""
    return srcDF \
        .withColumn("inter"+paramColName,
                    linearEquationExpr(col(indColName), col("m"+paramColName), col("b"+paramColName)))

interDF = windResampleExpDF \
    .transform(lambda df : linearInterpolationTrans(df,"WindSpeedMPH","interTSEpochSec")) \
    .transform(lambda df : linearInterpolationTrans(df,"WindDirectionDegrees","interTSEpochSec"))


# TODO: Remove this cell it is just for backward compat
DF = expDF.selectExpr("Date as LocalDate","cast(interTSEpochSec as timestamp) AS Time","interWindSpeedMPH","TS","WindSpeedMPH")

windExtractDF = interDF.selectExpr("Date as LocalDate","cast(interTSEpochSec as timestamp) AS Time","interWindSpeedMPH")
#windExtractDF.filter(col("LocalDate")=='2012-03-23').show(545)
windExtractDF.show(100)

#windExtractDF.coalesce(1).write.option("header","true").csv("output")
windExtractDF.sort("Time").coalesce(1).write.option("header","true").csv("output/windExtractSortDF")

#### Clean Dataset

Now it is time to extract only the columns of interest. All the interpolated columns are renamed as if they were the original and intermediate calculations dropped. The interpolated angles could be negative so they have to be converted to a positive version of it.

In [19]:
windCleanDF = interDF.selectExpr(
    "Date",
    "cast(interTSEpochSec as timestamp) as TS",
    "interTSEpochSec as TSEpochSec",
    "interWindSpeedMPH as WindSpeedMPH",
    "pmod(interWindDirectionDegrees,360) as WindDirectionDegrees"
)
windCleanDF.show()

+----------+-------------------+----------+------------+--------------------+
|      Date|                 TS|TSEpochSec|WindSpeedMPH|WindDirectionDegrees|
+----------+-------------------+----------+------------+--------------------+
|2012-10-06|2012-10-06 00:05:00|1349499900|         3.0|               149.0|
|2012-10-06|2012-10-06 00:10:00|1349500200|         2.5|               149.5|
|2012-10-06|2012-10-06 00:15:00|1349500500|         2.0|               150.0|
|2012-10-06|2012-10-06 00:20:00|1349500800|         1.0|               150.0|
|2012-10-06|2012-10-06 00:25:00|1349501100|         1.0|               150.0|
|2012-10-06|2012-10-06 00:30:00|1349501400|         1.5|               149.5|
|2012-10-06|2012-10-06 00:35:00|1349501700|         2.0|               149.0|
|2012-10-06|2012-10-06 00:40:00|1349502000|         1.5|               149.5|
|2012-10-06|2012-10-06 00:45:00|1349502300|         1.0|               150.0|
|2012-10-06|2012-10-06 00:50:00|1349502600|         1.0|        

## Data Analysis

### Daylight Only

This analysis is finding the good days for kiteboarding which is a sport that can only be practice during the daylight. It is required to drop any measurements when there is no light to practice the sport.

A UDF is used in this case to take advantage of the astral module of python. The use of a UDF for this is not optimal but it is simple. Other approaches could be to use other datasets with daylight information or calculating it natively with formulas but these was more complicated.

Once this readings are excluded is a good place to persist the dataframe and create a view for SQL querries.

In [25]:
# Creating UDF for sunrise and sunset:
getSunriseUdf = udf(lambda date: sensorLoc.sunrise(date),TimestampType())
getSunsetUdf = udf(lambda date: sensorLoc.sunset(date),TimestampType())

def dayligthExpr(timestamp, date) :
     return (col("TS") > getSunriseUdf(col("Date"))) & (col("TS") < getSunsetUdf(col("Date")))
 
dayligthWindDF = windCleanDF.filter(dayligthExpr(col("TS"), col("Date")))

dayligthWindDF.persist()
dayligthWindDF.createOrReplaceTempView("dayligthWindTable")
#dayligthWindDF.show(1000)

### Monthly Wind Roses

The first analysis done after cleaning the date was to get a windrose diagram to see the wind speed and direction distribution for the years we are analyzing. It is divided by month since we are interested on finding the good season for kiting.

A windrose diagram is just a 2D histogram in polar coordinates. One of the axis will be a direction range in angles and the other will be the ranges of wind magnitude. This is the SQL query that will create the normalized 2D histogram (Converted to pandas dataframe for plotting):

In [39]:
normedWindRoseDF = spark.sql("""
  SELECT *
  FROM (
    SELECT
      month(Date) as Month,
      (CAST(((WindDirectionDegrees*16)/360) + 0.5 AS INT) ) % 16 AS WindDirectionGroup,
      CASE
        WHEN WindSpeedMPH < 15 THEN 'r_0_15'
        WHEN WindSpeedMPH < 20 THEN 'r15_20'
        WHEN WindSpeedMPH < 25 THEN 'r_20_25'
        ELSE 'r_25_inf'
      END AS WindSpeedGroup
    FROM dayligthWindTable
  )
  PIVOT (
    COUNT(1)
    FOR WindSpeedGroup IN ('r_0_15', 'r_15_20', 'r_20_25', 'r_25_inf')
  )
""")

"""
normedWindRoseDF = spark.sql(
SELECT
  Month,
  WindDirectionGroup,
  IFNULL(r_0_15,0)/Total AS `0 to 15`,
  IFNULL(r_15_20,0)/Total AS `15 to 20`,
  IFNULL(r_20_25,0)/Total AS `20 to 25`,
  IFNULL(r_25_inf,0)/Total AS `25 or over`
FROM (
  SELECT *
  FROM (
    SELECT
      month(Date) as Month,
      (CAST(((WindDirectionDegrees*16)/360) + 0.5 AS INT) ) % 16 AS WindDirectionGroup,
      CASE
        WHEN WindSpeedMPH < 15 THEN 'r_0_15'
        WHEN WindSpeedMPH < 20 THEN 'r_15_20'
        WHEN WindSpeedMPH < 25 THEN 'r_20_25'
        ELSE 'r_25_inf'
      END AS WindSpeedGroup
    FROM dayligthWindTable
  )
  PIVOT (
    COUNT(1)
    FOR WindSpeedGroup IN ('r_0_15', 'r_15_20', 'r_20_25', 'r_25_inf')
  )
) AS windRoseTable
JOIN (
  SELECT month(Date) AS MGroup, COUNT(1) as Total
  FROM dayligthWindTable
  GROUP BY month(Date)
) AS totalByMonthTable
ON windRoseTable.Month = totalByMonthTable.MGroup
ORDER BY Month DESC, WindDirectionGroup
)
"""

normedWindRoseDF.show()

#normedWindRosePD = normedWindRoseDF.toPandas()

#spark.catalog.dropTempView("windRoseTable")
#spark.catalog.dropTempView("dayligthWindTable")

ParseException: "\nmismatched input 'FROM' expecting <EOF>(line 3, pos 2)\n\n== SQL ==\n\n  SELECT *\n  FROM (\n--^^^\n    SELECT\n      month(Date) as Month,\n      (CAST(((WindDirectionDegrees*16)/360) + 0.5 AS INT) ) % 16 AS WindDirectionGroup,\n      CASE\n        WHEN WindSpeedMPH < 15 THEN 'r_0_15'\n        WHEN WindSpeedMPH < 20 THEN 'r_15_20'\n        WHEN WindSpeedMPH < 25 THEN 'r_20_25'\n        ELSE 'r_25_inf'\n      END AS WindSpeedGroup\n    FROM dayligthWindTable\n  )\n  PIVOT (\n    COUNT(1)\n    FOR WindSpeedGroup IN ('r_0_15', 'r_15_20', 'r_20_25', 'r_25_inf')\n  )\n"