# Predict loads

In [0]:
%pip install python-dateutil

In [0]:
%sql CREATE DATABASE IF NOT EXISTS asset_life;
CREATE TABLE IF NOT EXISTS asset_life.predictions (
  TURBINE_SK_FK INT,
  MONTH TIMESTAMP,
  STD_MYTB_del_m4 FLOAT,
  MEAN_MYTB_del_m4 FLOAT,
  STD_MZTB_del_m4 FLOAT,
  MEAN_MZTB_del_m4 FLOAT,
  STD_MYTM_del_m4 FLOAT,
  MEAN_MYTM_del_m4 FLOAT,
  STD_MZTM_del_m4 FLOAT,
  MEAN_MZTM_del_m4 FLOAT,
  STD_MYTT_del_m4 FLOAT,
  MEAN_MYTT_del_m4 FLOAT,
  STD_MZTT_del_m4 FLOAT,
  MEAN_MZTT_del_m4 FLOAT
)

In [0]:
%sql CREATE
OR REPLACE VIEW asset_life_data AS
SELECT
  t2.TIME_STAMP,
  t2.SITE_SK_FK,
  t2.TURBINE_SK_FK,
  t2.Amb_WindSpeed_Avg,
  t2.Amb_WindSpeed_Max,
  t2.Blds_BladeA_Load,
  t2.Blds_BladeA_MaxLoad,
  t2.Blds_BladeB_Load,
  t2.Blds_BladeB_MaxLoad,
  t2.Blds_BladeC_Load,
  t2.Blds_BladeC_MaxLoad,
  t2.Rtr_RPM_Avg,
  t4.Rtr_RPM_Max,
  t2.Gen_RPM_Avg,
  t3.Gen_RPM_Max,
  t2.Grd_Prod_Pwr_Avg,
  t3.Grd_Prod_Pwr_Max,
  t2.Blds_PitchAngle_Avg,
  t2.Blds_PitchAngle_Std,
  t4.TowerAcc_X_direction_StdXAcc,
  t4.TowerAcc_X_direction_MaxXAcc,
  t4.TowerAcc_Y_direction_StdYAcc,
  t4.TowerAcc_Y_direction_MaxYAcc
FROM
  vestas.t_vestas_10min_signals_2 t2
  INNER JOIN vestas.t_vestas_10min_signals_3 t3 ON (
    t2.TIME_STAMP = t3.TIME_STAMP
    AND t2.TURBINE_SK_FK = t3.TURBINE_SK_FK
  )
  INNER JOIN vestas.t_vestas_10min_signals_4 t4 ON (
    t2.TIME_STAMP = t4.TIME_STAMP
    AND t2.TURBINE_SK_FK = t4.TURBINE_SK_FK
  )

In [0]:
%sql CACHE TABLE asset_life_data;

In [0]:
%sql CREATE
OR REPLACE TEMPORARY VIEW turbine_timeframes AS
SELECT
  TURBINE_SK_FK,
  SITE_SK_FK,
  MIN(TIME_STAMP) as FROM_TIMESTAMP,
  MAX(TIME_STAMP) as TO_TIMESTAMP
FROM
  asset_life_data
GROUP BY
  TURBINE_SK_FK,
  SITE_SK_FK
ORDER BY
  TURBINE_SK_FK

In [0]:
%sql CREATE
OR REPLACE TEMPORARY VIEW turbine_timeframes_oerken AS
SELECT
 *
FROM
  turbine_timeframes
WHERE
  SITE_SK_FK = 272

In [0]:
df = spark.sql("select * from turbine_timeframes_oerken")

In [0]:
turbines = df.collect()

In [0]:
display(turbines)

In [0]:
import datetime
import dateutil.relativedelta

def toMonths(turbine):
  aMonth = dateutil.relativedelta.relativedelta(months=1)
  currentDate = turbine.FROM_TIMESTAMP
  endDate = turbine.TO_TIMESTAMP
  
  monthlyTimestamps = []
  while currentDate < endDate:
    monthlyTimestamps.append(currentDate)
    currentDate += aMonth
    
  return (turbine.TURBINE_SK_FK, monthlyTimestamps)

In [0]:
turbineMonthMap = map(toMonths, turbines)
turbineMonths = list(turbineMonthMap)
display(turbineMonths)

In [0]:
def needsPrediction(turbineId, monthTimestamp):
  query = f"SELECT * FROM asset_life.predictions WHERE TURBINE_SK_FK={turbineId} AND YEAR(MONTH)={monthTimestamp.year} AND MONTH(MONTH)={monthTimestamp.month}"
  alreadyPredicted = spark.sql(query)
      
  return not bool(alreadyPredicted.count() > 0)

In [0]:
turbineMonthsToPredict = list(map(lambda turbineMonth: (turbineMonth[0], list(filter(lambda month: needsPrediction(turbineMonth[0], month), turbineMonth[1]))), turbineMonths))

In [0]:
display(turbineMonthsToPredict)

In [0]:
def withCorrectData(turbineId, monthTimestamp):
  try:
    return dbutils.notebook.run("Sanity_checking_v1", 0, {"Turbine": turbineId, "Year": monthTimestamp.year, "Month": monthTimestamp.month})
  except:
    return False

In [0]:
for i in range(len(turbineMonthsToPredict)):
 turbineId, month = turbineMonthsToPredict[i]
 turbineMonthsToPredict[i] = (turbineId, list(filter(lambda month: withCorrectData(turbineId, month), month)))

In [0]:
display(turbineMonthsToPredict)

In [0]:
import pyspark.sql.functions as f


def savePredictions(df):
    df.select("TURBINE_SK_FK", "TIME_STAMP", "STD_MYTB_del_m4", "MEAN_MYTB_del_m4", "STD_MZTB_del_m4", "MEAN_MZTB_del_m4", "STD_MYTM_del_m4", "MEAN_MYTM_del_m4", "STD_MZTM_del_m4", "MEAN_MZTM_del_m4", "STD_MYTT_del_m4", "MEAN_MYTT_del_m4", "STD_MZTT_del_m4", "MEAN_MZTT_del_m4").withColumnRenamed("TIME_STAMP", "MONTH").write.insertInto('asset_life.predictions', overwrite = False)
    
    
def predict(turbineId, monthTimestamp):
  return dbutils.notebook.run("process_data_and_predict", 0, {"Turbine": turbineId, "Year": monthTimestamp.year, "Month": monthTimestamp.month})


def predictForTurbineMonth(turbineId, monthTimestamp):
  tableName = predict(turbineId, monthTimestamp)
  global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
  
  df = table(f"{global_temp_db}.{tableName}").select("*")
  
  statistics = df.select(*[f.mean(c).alias(f'MEAN_{c}') for c in df.columns], *[f.stddev(c).alias(f'STD_{c}') for c in df.columns])
  statistics = statistics.withColumn("TURBINE_SK_FK", f.lit(turbineId))
  statistics = statistics.withColumn("TIME_STAMP", f.lit(monthTimestamp))
  
  return statistics


def predictAndSave(turbineId, monthTimestamp):
  try:
    print(f"Predicting loads for {turbineId} in month {monthTimestamp.date()}")
    predictions = predictForTurbineMonth(turbineId, monthTimestamp)
    savePredictions(predictions)
  except:
    print(f"Could not predict loads for {turbineId} in month {monthTimestamp.date()}")
  
  
def predictAndSaveIfNecessary(turbineId, monthTimestamp):
  if needsPrediction(turbineId, monthTimestamp):
     predictAndSave(turbineId, monthTimestamp)
      
      
def predictDataForTurbine(currentTurbineId, turbineMonths): 
  for month in turbineMonths:
    predictAndSave(currentTurbineId, month)

In [0]:
from concurrent.futures import ThreadPoolExecutor

def predictDataThreaded(turbineMonths):
   with ThreadPoolExecutor(max_workers = 4) as executor:
      for turbineId, months in turbineMonths:
        for month in months:
          executor.submit(predictAndSave, turbineId, month)

predictDataThreaded(turbineMonthsToPredict)

# Calculate RUL

In [0]:
%sql OPTIMIZE asset_life.predictions

path,metrics
,"List(0, 0, List(0, 0, 0.0, 0, 0), List(0, 0, 0.0, 0, 0), 0, null, 0, 1, 1, true)"


In [0]:
turbinesDf = table("asset_life.predictions")
turbines = turbinesDf.select("TURBINE_SK_FK").distinct().collect()

In [0]:
display(turbines)

TURBINE_SK_FK
4709
4704
4705
4708
4706
4710
4707


In [0]:
def calculateRul(turbineId):
  return dbutils.notebook.run("calc_damage_multiple_sections", 0, {"Turbine": turbineId})

In [0]:
from pyspark.sql.functions import lit

def toTurbineIdWithRulView(turbine):
  return (turbine, calculateRul(turbine.TURBINE_SK_FK))

def toTurbineIdWithDataframe(turbineViewAndId):
  turbineId, rulViewName = turbineViewAndId
  global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
  return (turbineId, (table(f"{global_temp_db}.{rulViewName}")))

def toTurbineRulDataframe(turbineDataframeAndId):
  turbineId, rulDataframe = turbineDataframeAndId
  return rulDataframe.withColumn("turbine_sk_fk", lit(turbineId.TURBINE_SK_FK))

turbineRuls = list(map(toTurbineRulDataframe, map(toTurbineIdWithDataframe, map(toTurbineIdWithRulView, turbines))))

In [0]:
from functools import reduce
from pyspark.sql import DataFrame

turbineRulsDf = reduce(DataFrame.union, turbineRuls)

In [0]:
display(turbineRulsDf)

section_height,month,mean_y,lwr_y,upr_y,mean_z,lwr_z,upr_z,slope,estimated_EOL,turbine_sk_fk
137.6,2013-02-01T00:10:00.000+0000,19.999999022478864,19.999998963676056,19.99999907826245,19.999897303953325,19.999891049123995,19.99990323341609,1.1717888418474007e-05,2034-01-27,4709
137.6,2013-03-01T00:10:00.000+0000,19.99999800568406,19.999997902556213,19.999998101701717,19.99982145266282,19.999813512029768,19.999828929759644,2.037271971238055e-05,2034-01-27,4709
137.6,2013-04-01T00:10:00.000+0000,19.99999679837393,19.99999666175426,19.999996926160023,19.999757132083545,19.999747519765524,19.999766137722176,2.771186659665652e-05,2034-01-27,4709
137.6,2013-05-01T00:10:00.000+0000,19.99999526234184,19.999995069393563,19.99999544140079,19.99954630381581,19.999533029095307,19.99955901952108,5.17679251966708e-05,2034-01-27,4709
137.6,2013-06-01T00:10:00.000+0000,19.99999376989262,19.99999355622865,19.999993969810504,19.999332954221053,19.99931518634958,19.99935012286426,7.611167382598785e-05,2034-01-27,4709
137.6,2013-07-01T00:10:00.000+0000,19.99999189507613,19.999991656913267,19.99999212485417,19.999077175357943,19.999054557769945,19.99909893496709,0.00010529671331661552,2034-01-27,4709
137.6,2013-08-01T00:10:00.000+0000,19.999990327470645,19.999990078097483,19.999990571796356,19.99880171132468,19.99877741833897,19.998825190186075,0.00013672788237938823,2034-01-27,4709
137.6,2013-09-01T00:10:00.000+0000,19.99998870096392,19.99998844360004,19.99998895014334,19.998564913603275,19.99854065225456,19.99858850962006,0.0001637471237912832,2034-01-27,4709
137.6,2013-10-01T00:10:00.000+0000,19.999987131641923,19.99998682425053,19.99998743498139,19.99849736308135,19.99847276112789,19.99852156990173,0.0001714548156077997,2034-01-27,4709
137.6,2013-11-01T00:10:00.000+0000,19.999986017718516,19.999985693840213,19.99998633128368,19.99844638609957,19.998420989208007,19.99847143952437,0.0001772714230019526,2034-01-27,4709


In [0]:
orderedTurbineRulsDf = turbineRulsDf.select("turbine_sk_fk", "month","section_height","lwr_y", "mean_y", "upr_y", "lwr_z", "mean_z", "upr_z","slope","estimated_EOL")

In [0]:
display(orderedTurbineRulsDf)

turbine_sk_fk,month,section_height,lwr_y,mean_y,upr_y,lwr_z,mean_z,upr_z,slope,estimated_EOL
4709,2013-02-01T00:10:00.000+0000,137.6,19.999998963676056,19.999999022478864,19.99999907826245,19.999891049123995,19.999897303953325,19.99990323341609,1.1717888418474007e-05,2034-01-27
4709,2013-03-01T00:10:00.000+0000,137.6,19.999997902556213,19.99999800568406,19.999998101701717,19.999813512029768,19.99982145266282,19.999828929759644,2.037271971238055e-05,2034-01-27
4709,2013-04-01T00:10:00.000+0000,137.6,19.99999666175426,19.99999679837393,19.999996926160023,19.999747519765524,19.999757132083545,19.999766137722176,2.771186659665652e-05,2034-01-27
4709,2013-05-01T00:10:00.000+0000,137.6,19.999995069393563,19.99999526234184,19.99999544140079,19.999533029095307,19.99954630381581,19.99955901952108,5.17679251966708e-05,2034-01-27
4709,2013-06-01T00:10:00.000+0000,137.6,19.99999355622865,19.99999376989262,19.999993969810504,19.99931518634958,19.999332954221053,19.99935012286426,7.611167382598785e-05,2034-01-27
4709,2013-07-01T00:10:00.000+0000,137.6,19.999991656913267,19.99999189507613,19.99999212485417,19.999054557769945,19.999077175357943,19.99909893496709,0.00010529671331661552,2034-01-27
4709,2013-08-01T00:10:00.000+0000,137.6,19.999990078097483,19.999990327470645,19.999990571796356,19.99877741833897,19.99880171132468,19.998825190186075,0.00013672788237938823,2034-01-27
4709,2013-09-01T00:10:00.000+0000,137.6,19.99998844360004,19.99998870096392,19.99998895014334,19.99854065225456,19.998564913603275,19.99858850962006,0.0001637471237912832,2034-01-27
4709,2013-10-01T00:10:00.000+0000,137.6,19.99998682425053,19.999987131641923,19.99998743498139,19.99847276112789,19.99849736308135,19.99852156990173,0.0001714548156077997,2034-01-27
4709,2013-11-01T00:10:00.000+0000,137.6,19.999985693840213,19.999986017718516,19.99998633128368,19.998420989208007,19.99844638609957,19.99847143952437,0.0001772714230019526,2034-01-27


In [0]:
sqlUser = dbutils.secrets.get("asset-life", "rul-db-user")
sqlPassword = dbutils.secrets.get("asset-life", "rul-db-password")

driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
url = f"jdbc:sqlserver://renewables-dev-datalakesql.database.windows.net:1433;database=ASSETLIFE;user={sqlUser};password={sqlPassword};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

orderedTurbineRulsDf.write.format("jdbc")\
  .option("dbtable", "rul")\
  .option("driver", driver)\
  .option("url", url)\
  .option("user", sqlUser)\
  .option("password", sqlPassword)\
  .save(mode="overwrite")

In [0]:
rulTable = spark.read.format("jdbc")\
  .option("dbtable", "rul")\
  .option("driver", driver)\
  .option("url", url)\
  .option("user", sqlUser)\
  .option("password", sqlPassword)\
  .load()

display(rulTable)
print(rulTable.count())

turbine_sk_fk,month,section_height,lwr_y,mean_y,upr_y,lwr_z,mean_z,upr_z,slope,estimated_EOL
4709,2020-08-01T00:10:00.000+0000,54.08,19.9374772922094,19.938056801373897,19.938613392475094,13.861317889762828,13.922678481585956,13.982712721012714,0.6934383332362831,2034-01-27
4709,2020-09-01T00:10:00.000+0000,54.08,19.936718405462766,19.937295527461625,19.93785651855401,13.805856590068933,13.86756264048914,13.92759018433285,0.6997271953393144,2034-01-27
4709,2020-10-01T00:10:00.000+0000,54.08,19.935757695951004,19.93633423935072,19.93689859138201,13.709054702484815,13.770866470302792,13.83103732752037,0.7107604821057272,2034-01-27
4709,2020-11-01T00:10:00.000+0000,54.08,19.93500205947676,19.93557959104948,19.936145352906816,13.633834874317287,13.695352226488236,13.75580599841044,0.7193768554879321,2034-01-27
4709,2020-12-01T00:10:00.000+0000,54.08,19.934061469707995,19.93463937552861,19.935208528120658,13.58077920928008,13.64229185364418,13.70287122809672,0.7254311832694051,2034-01-27
4709,2021-01-01T00:10:00.000+0000,54.08,19.933351866291225,19.93393375644585,19.93450311157363,13.514729933740654,13.5769731937609,13.637928794307827,0.7328842137700045,2034-01-27
4709,2021-02-01T00:10:00.000+0000,54.08,19.932635918458107,19.93321881915799,19.933789656576497,13.430231026767416,13.492669265494207,13.553924957981629,0.742503513213912,2034-01-27
4709,2021-04-01T00:10:00.000+0000,54.08,19.9318317945356,19.932415787526136,19.932991040631723,13.325982670387752,13.391148109340632,13.455428734696731,0.7540873435408167,2034-01-27
4709,2021-05-01T00:10:00.000+0000,54.08,19.931098985210163,19.93168365914469,19.93225790889734,13.290963823054788,13.356285820647924,13.42061934997809,0.7580652221655735,2034-01-27
4709,2021-06-01T00:10:00.000+0000,54.08,19.930507284134627,19.931093915197952,19.931671261832747,13.261645678881202,13.326704677760894,13.391381729379004,0.7614405081952255,2034-01-27
