In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pipelines.util.platform import start_spark
from pipelines.util.storage import read_csv,read_csvfolder,write_partby_parquet,write_parquet,read_parquet_filter,read_parquet
from pipelines.loading.load_wtdata import get_latestvalidated_wtdata

In [2]:
sps, logger, conf = start_spark(app_name='process_wtdata')

local


In [3]:
sch = T.StructType([
    T.StructField("timestamp", T.TimestampType(), True),
    T.StructField("turbine_id", T.IntegerType(), True),
    T.StructField("wind_speed", T.DoubleType(), True),
    T.StructField("wind_direction", T.IntegerType(), True),
    T.StructField("power_output", T.DoubleType(), True),
    T.StructField("outputdate", T.DateType(), True),
    T.StructField("is_missing", T.BooleanType(), True),
    T.StructField("wind_speed_cleaned", T.DoubleType(), True),
    T.StructField("power_output_cleaned", T.DoubleType(), True),
    T.StructField("wind_direction_cleaned", T.IntegerType(), True)
])
pqoptions = {
    "schema": sch,
    "inferSchema": "false"
}

In [4]:
wtdata_validated = get_latestvalidated_wtdata(sps, pqoptions)

In [5]:
wtdata_validated.printSchema()
wtdata_validated.count()

root
 |-- timestamp: timestamp (nullable = true)
 |-- wind_speed: double (nullable = true)
 |-- wind_direction: integer (nullable = true)
 |-- power_output: double (nullable = true)
 |-- outputdate: date (nullable = true)
 |-- turbine_id: integer (nullable = true)
 |-- is_missing: boolean (nullable = true)
 |-- wind_speed_cleaned: double (nullable = true)
 |-- power_output_cleaned: double (nullable = true)
 |-- wind_direction_cleaned: integer (nullable = true)



11160

In [6]:
wtdata_validated.createOrReplaceTempView("wtdata_validated")

In [7]:
location = "'{storage_path}/WTDATA'".format(storage_path = sps.conf.get('storage.datamart'))

sps.sql(f"CREATE TABLE IF NOT EXISTS WTDATA ( \
                TIMESTAMP TIMESTAMP, \
                TURBINE_ID INT, \
                WIND_SPEED DOUBLE, \
                WIND_DIRECTION INT, \
                POWER_OUTPUT DOUBLE, \
                OUTPUTDATE DATE, \
                IS_MISSING BOOLEAN, \
                WIND_SPEED_CLEANED DOUBLE, \
                WIND_DIRECTION_CLEANED INT, \
                POWER_OUTPUT_CLEANED DOUBLE ) \
        USING DELTA \
        LOCATION {location}")

DataFrame[]

In [8]:
sps.sql(f"MERGE INTO WTDATA \
        USING ( \
            SELECT \
                wtdata_validated.timestamp, \
                wtdata_validated.turbine_id, \
                wtdata_validated.wind_speed, \
                wtdata_validated.wind_direction, \
                wtdata_validated.power_output, \
                wtdata_validated.outputdate, \
                wtdata_validated.is_missing, \
                wtdata_validated.wind_speed_cleaned, \
                wtdata_validated.wind_direction_cleaned, \
                wtdata_validated.power_output_cleaned \
            FROM wtdata_validated \
        ) AS wtdata_validated \
        ON WTDATA.TIMESTAMP = wtdata_validated.timestamp \
            AND WTDATA.TURBINE_ID = wtdata_validated.turbine_id \
        WHEN NOT MATCHED THEN \
            INSERT ( \
                TIMESTAMP, \
                TURBINE_ID, \
                WIND_SPEED, \
                WIND_DIRECTION, \
                POWER_OUTPUT, \
                OUTPUTDATE, \
                IS_MISSING, \
                WIND_SPEED_CLEANED, \
                WIND_DIRECTION_CLEANED, \
                POWER_OUTPUT_CLEANED \
            ) VALUES ( \
                wtdata_validated.timestamp, \
                wtdata_validated.turbine_id, \
                wtdata_validated.wind_speed, \
                wtdata_validated.wind_direction, \
                wtdata_validated.power_output, \
                wtdata_validated.outputdate, \
                wtdata_validated.is_missing, \
                wtdata_validated.wind_speed_cleaned, \
                wtdata_validated.wind_direction_cleaned, \
                wtdata_validated.power_output_cleaned \
            )")

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [3]:
sps.sql("SELECT * FROM WTDATA").show(truncate=False)

+-------------------+----------+----------+--------------+------------+----------+----------+------------------+----------------------+--------------------+
|TIMESTAMP          |TURBINE_ID|WIND_SPEED|WIND_DIRECTION|POWER_OUTPUT|OUTPUTDATE|IS_MISSING|WIND_SPEED_CLEANED|WIND_DIRECTION_CLEANED|POWER_OUTPUT_CLEANED|
+-------------------+----------+----------+--------------+------------+----------+----------+------------------+----------------------+--------------------+
|2022-03-08 00:00:00|1         |12.9      |306           |3.3         |2022-03-08|false     |12.9              |306                   |3.3                 |
|2022-03-08 01:00:00|1         |14.1      |153           |2.1         |2022-03-08|false     |14.1              |153                   |2.1                 |
|2022-03-08 02:00:00|1         |10.1      |185           |2.4         |2022-03-08|false     |10.1              |185                   |2.4                 |
|2022-03-08 03:00:00|1         |10.2      |284           |

In [None]:
sps.sql("SELECT count(*) FROM WTDATA").show(truncate=False)

In [5]:
sps.sql("DESCRIBE TABLE WTDATA").show(truncate=False)

+----------------------+---------+-------+
|col_name              |data_type|comment|
+----------------------+---------+-------+
|TIMESTAMP             |timestamp|NULL   |
|TURBINE_ID            |int      |NULL   |
|WIND_SPEED            |double   |NULL   |
|WIND_DIRECTION        |int      |NULL   |
|POWER_OUTPUT          |double   |NULL   |
|OUTPUTDATE            |date     |NULL   |
|IS_MISSING            |boolean  |NULL   |
|WIND_SPEED_CLEANED    |double   |NULL   |
|WIND_DIRECTION_CLEANED|int      |NULL   |
|POWER_OUTPUT_CLEANED  |double   |NULL   |
+----------------------+---------+-------+

