In [1]:
from pyspark.sql.functions import to_timestamp,hour,minute,when,col,current_timestamp,date_format,lit,unix_timestamp,expr,abs
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import TrainValidationSplit
import datetime

In [2]:
rawdata_s3 = 's3a://solarpowerbackend/rawdata/'
processed_s3 = 's3a://solarpowerbackend/processed/'
model_s3 = 's3a://solarpowerbackend/model/'

In [4]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("test").getOrCreate()
spark

In [8]:
p1_gen_file_location  = f'{rawdata_s3}Plant_1_Generation_Data.csv'
p1_weather_file_location = f'{rawdata_s3}Plant_1_Weather_Sensor_Data.csv'
p2_gen_file_location = f'{rawdata_s3}Plant_2_Generation_Data.csv'
p2_weather_file_location = f'{rawdata_s3}Plant_2_Weather_Sensor_Data.csv'

In [9]:

file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","



In [10]:

# The applied options are for CSV files. For other file types, these will be ignored.
p1_weather_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(p1_weather_file_location)


In [11]:
p1_gen_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(p1_gen_file_location)

In [12]:
display(p1_gen_df.summary())

DataFrame[summary: string, DATE_TIME: string, PLANT_ID: string, SOURCE_KEY: string, DC_POWER: string, AC_POWER: string, DAILY_YIELD: string, TOTAL_YIELD: string]

In [13]:
display(p1_gen_df.groupBy('SOURCE_KEY').count())

DataFrame[SOURCE_KEY: string, count: bigint]

In [14]:
display(p1_gen_df)

DataFrame[DATE_TIME: string, PLANT_ID: int, SOURCE_KEY: string, DC_POWER: double, AC_POWER: double, DAILY_YIELD: double, TOTAL_YIELD: double]

In [15]:
p1_gen_df = p1_gen_df.withColumn('DATE_TIME',to_timestamp(p1_gen_df.DATE_TIME,'dd-MM-yyyy HH:mm'))
p1_weather_df = p1_weather_df.withColumn('DATE_TIME',to_timestamp(p1_weather_df.DATE_TIME,'yyyy-MM-dd HH:mm:ss'))

In [16]:
display(p1_weather_df)

DataFrame[DATE_TIME: timestamp, PLANT_ID: int, SOURCE_KEY: string, AMBIENT_TEMPERATURE: double, MODULE_TEMPERATURE: double, IRRADIATION: double]

In [17]:
p1_raw_df = p1_weather_df.select('DATE_TIME','AMBIENT_TEMPERATURE','MODULE_TEMPERATURE','IRRADIATION').join(p1_gen_df, on = 'DATE_TIME',how = 'inner')
p1_raw_df = p1_raw_df.withColumn('TIME',date_format('DATE_TIME','HH:mm:ss'))

p1_df = p1_raw_df.filter(col('TIME').between('06:00:00','18:15:00'))
p1_plantoff_df = p1_raw_df.filter((col('TIME')>=lit('18:30:00')) | (col('TIME')<=lit('05:45:00')))

In [18]:
display(p1_df)

DataFrame[DATE_TIME: timestamp, AMBIENT_TEMPERATURE: double, MODULE_TEMPERATURE: double, IRRADIATION: double, PLANT_ID: int, SOURCE_KEY: string, DC_POWER: double, AC_POWER: double, DAILY_YIELD: double, TOTAL_YIELD: double, TIME: string]

In [19]:
p1_train_df ,p1_test_df= p1_df.randomSplit([0.8,0.2])

In [20]:
display(p1_df)

DataFrame[DATE_TIME: timestamp, AMBIENT_TEMPERATURE: double, MODULE_TEMPERATURE: double, IRRADIATION: double, PLANT_ID: int, SOURCE_KEY: string, DC_POWER: double, AC_POWER: double, DAILY_YIELD: double, TOTAL_YIELD: double, TIME: string]

In [21]:
display(p1_df.filter(col('DC_POWER')==0))

DataFrame[DATE_TIME: timestamp, AMBIENT_TEMPERATURE: double, MODULE_TEMPERATURE: double, IRRADIATION: double, PLANT_ID: int, SOURCE_KEY: string, DC_POWER: double, AC_POWER: double, DAILY_YIELD: double, TOTAL_YIELD: double, TIME: string]

In [22]:
display(p1_df)

DataFrame[DATE_TIME: timestamp, AMBIENT_TEMPERATURE: double, MODULE_TEMPERATURE: double, IRRADIATION: double, PLANT_ID: int, SOURCE_KEY: string, DC_POWER: double, AC_POWER: double, DAILY_YIELD: double, TOTAL_YIELD: double, TIME: string]

In [23]:
va = VectorAssembler().setInputCols(['IRRADIATION','AMBIENT_TEMPERATURE']).setOutputCol('features')
p1_train_df = va.transform(p1_train_df)

In [24]:
p1_train_df = p1_train_df.withColumnRenamed('AC_POWER','label')
display(p1_train_df)

DataFrame[DATE_TIME: timestamp, AMBIENT_TEMPERATURE: double, MODULE_TEMPERATURE: double, IRRADIATION: double, PLANT_ID: int, SOURCE_KEY: string, DC_POWER: double, label: double, DAILY_YIELD: double, TOTAL_YIELD: double, TIME: string, features: vector]

In [25]:
lr  = LinearRegression().setFeaturesCol('features').setLabelCol('label')

In [26]:
lr_model = lr.fit(p1_train_df)

In [27]:
lr_model.summary.meanAbsoluteError

44.789351035597

In [29]:
# lr_model.setPredictionCol('ACPower_Predicted')

In [46]:
lr_model.write().overwrite().save(f'{model_s3}LR_FaultDetectionModel')

In [32]:
pred_p1_train_df

DataFrame[DATE_TIME: timestamp, AMBIENT_TEMPERATURE: double, MODULE_TEMPERATURE: double, IRRADIATION: double, PLANT_ID: int, SOURCE_KEY: string, DC_POWER: double, label: double, DAILY_YIELD: double, TOTAL_YIELD: double, TIME: string, features: vector, prediction: double]

In [33]:
pred_p1_train_df = lr_model.transform(p1_train_df)
pred_p1_train_df = pred_p1_train_df.withColumn('residuals',col('prediction') - col('label') )
train_fault_instances_df =  pred_p1_train_df.filter(col('residuals')>=pred_p1_train_df.stat.approxQuantile('residuals',[0.995],0.00)[0])

In [34]:
display(pred_p1_train_df)

DataFrame[DATE_TIME: timestamp, AMBIENT_TEMPERATURE: double, MODULE_TEMPERATURE: double, IRRADIATION: double, PLANT_ID: int, SOURCE_KEY: string, DC_POWER: double, label: double, DAILY_YIELD: double, TOTAL_YIELD: double, TIME: string, features: vector, prediction: double, residuals: double]

In [35]:
outlier_residual_limit = pred_p1_train_df.stat.approxQuantile('residuals',[0.95],0.05)

In [36]:
display(p1_test_df.filter(col('DC_POWER')!=0))

DataFrame[DATE_TIME: timestamp, AMBIENT_TEMPERATURE: double, MODULE_TEMPERATURE: double, IRRADIATION: double, PLANT_ID: int, SOURCE_KEY: string, DC_POWER: double, AC_POWER: double, DAILY_YIELD: double, TOTAL_YIELD: double, TIME: string]

In [37]:
p1_test_df = va.transform(p1_test_df)
p1_test_df = p1_test_df.withColumnRenamed('AC_POWER','label')


In [40]:

pred_p1_test_df = lr_model.transform(p1_test_df)
pred_p1_test_df = pred_p1_test_df.withColumn('residuals',col('prediction')-col('label'))
fault_instances_df=  pred_p1_test_df.filter(col('residuals')>=pred_p1_test_df.stat.approxQuantile('residuals',[0.995],0.05)[0])

In [41]:
pred_p1_df = pred_p1_train_df.union(pred_p1_test_df)

In [42]:
p1_plantoff_df = va.transform(p1_plantoff_df).withColumnRenamed('AC_POWER','label').withColumn('ACPower_Predicted',lit(0)).withColumn('residuals',lit(0)).withColumn('Fault_Flag',lit(0))

In [44]:
pred_p1_df = pred_p1_df.withColumn('Fault_Flag',expr(f"case when abs(residuals) > {pred_p1_df.stat.approxQuantile(('residuals'),[0.995],0.00)[0]} then 1 else 0 end "))
p1_rul_raw_df = pred_p1_df.union(p1_plantoff_df)
p1_rul_raw_df=  p1_rul_raw_df.select('DATE_TIME','AMBIENT_TEMPERATURE','MODULE_TEMPERATURE','IRRADIATION','PLANT_ID','SOURCE_KEY','DC_POWER','label','DAILY_YIELD','TOTAL_YIELD','Fault_Flag')
p1_rul_raw_df = p1_rul_raw_df.withColumnRenamed('label','AC_POWER')
p1_rul_raw_df.write.mode('overwrite').option('overwriteSchema','true').save(f'{processed_s3}rul_dataset')