## One Day Analysis

This is a sketch of how to do a single day analysis using pyspark mainly based on the scala source code

### Libraries

In [1]:
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.window import Window
#from pyspark.sql import Row

### PWS Information and Globals

This is the infromation of the sensor that performed the measurements. The ID is the one given by wunderground. In this case we use only cinvestav telchac.

In [2]:
#PWS info
pwsID = 'IYUCATNT2'
pwsTz = 'America/Merida'


### Reading the dataset

In [3]:
# Changing the default timezone for the analysis
spark.conf.set("spark.sql.session.timeZone", pwsTz)

# Inferr schema creates a problem when changing the session timezone
#    .option("inferSchema","true") \
    
staticDataFrame = spark.read \
    .format("csv") \
    .option("header","true") \
    .load("12/*.csv")
    #.load("IYUCATNT2-2016-04-18.csv")
    #.load("mini.csv")

staticDataFrame.printSchema()
staticDataFrame.show()

root
 |-- Time: string (nullable = true)
 |-- TemperatureF: string (nullable = true)
 |-- DewpointF: string (nullable = true)
 |-- PressureIn: string (nullable = true)
 |-- WindDirection: string (nullable = true)
 |-- WindDirectionDegrees: string (nullable = true)
 |-- WindSpeedMPH: string (nullable = true)
 |-- WindSpeedGustMPH: string (nullable = true)
 |-- Humidity: string (nullable = true)
 |-- HourlyPrecipIn: string (nullable = true)
 |-- Conditions: string (nullable = true)
 |-- Clouds: string (nullable = true)
 |-- dailyrainin: string (nullable = true)
 |-- SolarRadiationWatts/m^2: string (nullable = true)
 |-- SoftwareType: string (nullable = true)
 |-- DateUTC: string (nullable = true)

+-------------------+------------+---------+----------+-------------+--------------------+------------+----------------+--------+--------------+----------+------+-----------+-----------------------+-------------------+-------------------+
|               Time|TemperatureF|DewpointF|PressureIn|W

### Extract Timestamp and Wind

Extracting only wind and time information.

In [4]:
windDF = staticDataFrame.selectExpr(
    "Time",
    "to_timestamp(Time,'yyyy-MM-dd HH:mm:ss') as TS",
    "cast(WindDirectionDegrees as integer)",
    "cast(WindSpeedMPH as double)") \
    .withColumn("Date",col("TS").cast("date")) \
    .withColumn("TSEpochSec",col("TS").cast("long"))
    #"cast(WindSpeedGustMPH as double)")
# Creating SQL for future analisys
windDF.createOrReplaceTempView("windTable")
windDF.printSchema()
windDF.show()

root
 |-- Time: string (nullable = true)
 |-- TS: timestamp (nullable = true)
 |-- WindDirectionDegrees: integer (nullable = true)
 |-- WindSpeedMPH: double (nullable = true)
 |-- Date: date (nullable = true)
 |-- TSEpochSec: long (nullable = true)

+-------------------+-------------------+--------------------+------------+----------+----------+
|               Time|                 TS|WindDirectionDegrees|WindSpeedMPH|      Date|TSEpochSec|
+-------------------+-------------------+--------------------+------------+----------+----------+
|2012-12-22 00:00:00|2012-12-22 00:00:00|                  51|        21.0|2012-12-22|1356156000|
|2012-12-22 00:05:00|2012-12-22 00:05:00|                  51|        21.0|2012-12-22|1356156300|
|2012-12-22 00:10:00|2012-12-22 00:10:00|                  56|        20.0|2012-12-22|1356156600|
|2012-12-22 00:20:00|2012-12-22 00:20:00|                  40|        21.0|2012-12-22|1356157200|
|2012-12-22 00:25:00|2012-12-22 00:25:00|                  55|  

### Resampling Using Interpolation

We need to a ply a window function to get the next reading together with the current one in order to interpolate values between them.

Drop the last row with a null lead.

In [5]:
windowSpec = Window.partitionBy("Date").orderBy("TS")

windPairDF = windDF \
    .withColumn("nextWindDirectionDegrees",lead("WindDirectionDegrees").over(windowSpec)) \
    .withColumn("nextWindSpeedMPH",lead("WindSpeedMPH").over(windowSpec)) \
    .withColumn("nextTSEpochSec",lead("TSEpochSec").over(windowSpec)) \
    .dropna(subset=["nextTSEpochSec"])

#||      Date|TSEpochSec

#windPairDF.show(100)

#### Correction of the Next Wind Direction

In case of boundary conditions between 0 and 360 degrees representing the north we will have a problem when doing interpolation. For instance interpolating values between 5 degrees and 355 degrees may take values between these twon numbers which will be incorrect. To overcome this problem we have to calculate what is the shortest angle separation between those 2 readings and then calculate the new next direction based on this.

The one liner for the shortest angle was obtained from a discussion in stack overflow:

Source:
https://stackoverflow.com/questions/2708476/rotation-interpolation/14498790#14498790

In [6]:
windPairFixDirDF = windPairDF \
    .withColumn("shortestAngle",expr("((((nextWindDirectionDegrees-WindDirectionDegrees)%360)+540)%360)-180")) \
    .withColumn("neShortWindDirectionDegrees",expr("WindDirectionDegrees+shortestAngle"))

windPairFixDirDF.show()
# For reference steps:
#    .withColumn("diff",expr("nextWindDirectionDegrees-WindDirectionDegrees")) \
#    .withColumn("mod",expr("(nextWindDirectionDegrees-WindDirectionDegrees)%360")) \
#    .withColumn("p540",expr("((nextWindDirectionDegrees-WindDirectionDegrees)%360)+540")) \
#    .withColumn("mod2",expr("(((nextWindDirectionDegrees-WindDirectionDegrees)%360)+540)%360")) \

+-------------------+-------------------+--------------------+------------+----------+----------+------------------------+----------------+--------------+-------------+---------------------------+
|               Time|                 TS|WindDirectionDegrees|WindSpeedMPH|      Date|TSEpochSec|nextWindDirectionDegrees|nextWindSpeedMPH|nextTSEpochSec|shortestAngle|neShortWindDirectionDegrees|
+-------------------+-------------------+--------------------+------------+----------+----------+------------------------+----------------+--------------+-------------+---------------------------+
|2012-12-22 00:00:00|2012-12-22 00:00:00|                  51|        21.0|2012-12-22|1356156000|                      51|            21.0|    1356156300|            0|                         51|
|2012-12-22 00:05:00|2012-12-22 00:05:00|                  51|        21.0|2012-12-22|1356156300|                      56|            20.0|    1356156600|            5|                         56|
|2012-12-22 00:

#### Calculating Slopes and Intercept for Linear Equations

For linear interpolation we have to get a line:

f(t) = m*t + b

We need to calculate those values given the current and next (reading,timestamp) pair

"""
    .withColumn("m",
      expr("nextWindSpeedMPH-WindSpeedMPH")
        /(col("nextTSEpochSec").cast("long")-col("TSEpochSec").cast("long"))
    ) \
    .withColumn("b",
      (col("TSEpochSec").cast("long")*col("nextWindSpeedMPH")-col("nextTSEpochSec").cast("long")*col("WindSpeedMPH"))
        /(col("TSEpochSec").cast("long")-col("nextTSEpochSec").cast("long"))
    )
"""

In [None]:
# Custom Column functions to calculate slope and intercept
def slopeExpr(x1,y1,x2,y2) :
    return (
        (y2-y1) 
        /(x2-x1)
    )

def interceptExpr(x1,y1,x2,y2) :
    return (
      (x1*y2-x2*y1)
        /(x1-x2)
    )

windLinearDF = windPairFixDirDF \
    .withColumn("mSpeed",
                slopeExpr(col("TSEpochSec"),col("WindSpeedMPH"),col("nextTSEpochSec"),col("nextWindSpeedMPH"))) \
    .withColumn("bSpeed",
                interceptExpr(col("TSEpochSec"),col("WindSpeedMPH"),col("nextTSEpochSec"),col("nextWindSpeedMPH"))) \
    .withColumn("mDir",
                slopeExpr(col("TSEpochSec"),col("WindDirectionDegrees"),col("nextTSEpochSec"),col("neShortWindDirectionDegrees"))) \
    .withColumn("bDir",
                interceptExpr(col("TSEpochSec"),col("WindDirectionDegrees"),col("nextTSEpochSec"),col("neShortWindDirectionDegrees"))) \



"""
    .withColumn("m",
      (col("nextWindSpeedMPH")-col("WindSpeedMPH"))
        /(col("nextTSEpochSec")-col("TSEpochSec"))
    ) \
    .withColumn("b",
      (col("TSEpochSec")*col("nextWindSpeedMPH")-col("nextTSEpochSec")*col("WindSpeedMPH"))
        /(col("TSEpochSec")-col("nextTSEpochSec"))
    )
"""

"""
    .withColumn("m", lit(0)) \
    .withColumn("b",col("WindSpeedMPH"))
"""

"""
    .withColumn("m",
      expr("nextWindSpeedMPH-WindSpeedMPH")
        /(col("nextTSEpochSec").cast("long")-col("TSEpochSec").cast("long"))
    ) \
    .withColumn("b",
      (col("TSEpochSec").cast("long")*col("nextWindSpeedMPH")-col("nextTSEpochSec").cast("long")*col("WindSpeedMPH"))
        /(col("TSEpochSec").cast("long")-col("nextTSEpochSec").cast("long"))
    )
"""

windLinearDF.show(100)

In [29]:
#(spark.version) >= '2.13.1'
from distutils.version import StrictVersion
StrictVersion(spark.version) >= StrictVersion('2.4.0')
#(spark.version) >= '2.13.1'

False

#### Range UDF to Create The Resampling

We will need help from a UDF and 

In [None]:
from pyspark.sql.types import ArrayType, LongType
#def inter5Range(tsX:Timestamp,tsY:Timestamp) : Seq[Timestamp] = {
  #  val FIVE_MINUTES_IN_MILLIS : Long = 5*60*1000;//millisecs
  #  val x = tsX.getTime()
  #  val y = tsY.getTime()
  #  (((x+FIVE_MINUTES_IN_MILLIS-1)/FIVE_MINUTES_IN_MILLIS)*FIVE_MINUTES_IN_MILLIS to (y-1) by FIVE_MINUTES_IN_MILLIS).map(new Timestamp(_))
  #}

def epoch5MinRange(epochSecStart, epochSecEnd) :
    FIVE_MINUTES_IN_SECS = 5*60 # Seconds
    # range doesn't include the end
    #[x for x in range( ((start+five-1)//five)*five ,end,five)]
    #return (epochSecEnd-epochSecStart)
    #return 100
    return list(range(
        ((epochSecStart+FIVE_MINUTES_IN_SECS-1)//FIVE_MINUTES_IN_SECS)*FIVE_MINUTES_IN_SECS,
        epochSecEnd,
        FIVE_MINUTES_IN_SECS))

epoch5MinRangeUdf = udf(epoch5MinRange,ArrayType(LongType()))
#epoch5MinRangeUdf = udf(epoch5MinRange)


#list(epoch5MinRange(1483164120,1483164720))

arrDF = windLinearDF.withColumn("range",epoch5MinRangeUdf(col("TSEpochSec"),col("nextTSEpochSec")))
#arrDF.printSchema()
#arrDF.show()
expDF = arrDF \
    .withColumn("interTSEpochSec",explode(col("range"))) \
    .withColumn("interWindSpeedMPH",expr("(interTSEpochSec * mSpeed) + bSpeed"))
#expDF.show()
#windLinearDF.withColumn("range",(col("TSEpochSec")-col("nextTSEpochSec"))).show()

#cleanDF = expDF.selectExpr("Time","TS","cast(TSEpochSec as timestamp)")
#cleanDF.printSchema()
#cleanDF.show(100)

#cleanDF = expDF.selectExpr("Time","TS","cast(TSEpochSec as timestamp)")

#windExtractDF = expDF.selectExpr("Date as LocalDate","cast(interTSEpochSec as timestamp) AS Time","interWindSpeedMPH","TS","WindSpeedMPH")

windExtractDF = expDF.selectExpr("Date as LocalDate","cast(interTSEpochSec as timestamp) AS Time","interWindSpeedMPH")
windExtractDF.show(100)

#windExtractDF.coalesce(1).write.option("header","true").csv("output")
windExtractDF.sort("Time").coalesce(1).write.option("header","true").csv("output")