In [46]:

from pyspark.sql.window import Window
from pyspark.sql.functions import mean, stddev, col, abs, date_format, min, max, lag, lead, coalesce, avg, to_date, to_timestamp
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession, dataframe
import pyspark.pandas as ps

import psycopg2
from psycopg2.extras import execute_values

import os
from dotenv import load_dotenv

In [47]:
load_dotenv()

DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_HOST = os.getenv("DB_HOST")

db_params = {
    'dbname': DB_NAME,
    'user': DB_USER,
    'password': DB_PASSWORD,
    'host': DB_HOST
}

conn = psycopg2.connect(**db_params)
conn.autocommit = True
cursor = conn.cursor()

In [49]:
# Initialize Spark Session
spark = SparkSession.builder.master('local').appName("WindTurbineDataPipeline") \
.getOrCreate()

print('Spark Session initialized')

Spark Session initialized


In [53]:
def get_last_times() -> dict:
    cursor.execute('SELECT turbine_id, MAX(timestamp) FROM turbine_data_raw GROUP BY turbine_id')
    # cursor.execute('SELECT turbine_id, timestamp FROM turbine_data_raw')
    times = {turbineid:timestamp for turbineid, timestamp in cursor.fetchall()}
    return times


# Calculate the mean of the nearest non-null values
'''
Reason: The data is sampled every 1 hour, so the nearest non-null values are likely 
to be similar and it is good to keep as many data points as possible for analysis
'''
def fill_nas_with_mean(df: dataframe.DataFrame, colName: str) -> dataframe.DataFrame:
    windowSpec = Window.partitionBy('turbine_id').orderBy('timestamp')
    df = df.withColumn('prev_value', lag(colName, 1).over(windowSpec))
    df = df.withColumn('next_value', lead(colName, 1).over(windowSpec))
    df = df.withColumn('mean_nearest', (col('prev_value') + col('next_value'))/2)
    df = df.withColumn(colName, coalesce(col(colName), col('mean_nearest')))
    df = df.drop('prev_value', 'next_value', 'mean_nearest')
    return df


# Replace outliers with the mean of the nearest non-null values
def replace_outliers_with_mean(df: dataframe.DataFrame, colName: str) -> dataframe.DataFrame:
    windowSpec = Window.partitionBy('turbine_id').orderBy('timestamp')
    df = df.withColumn('mean_nearest', mean(col(colName)).over(windowSpec))
    df = df.withColumn(colName, coalesce(col(colName), col('mean_nearest')))
    df = df.drop('mean_nearest')
    return df


# Calculate the mean, min and max values of each turbine for each day
def calculate_daily_stats(df: dataframe.DataFrame, colName: str) -> dataframe.DataFrame:
    df = df.groupBy('turbine_id', 'date').agg(
        mean(col(colName)).alias('avg_'+colName),
        min(col(colName)).alias('min_'+colName),
        max(col(colName)).alias('max_'+colName)
    )
    return df


# Detect anomalies using the 2-sigma rule
def detect_anomalies(df: dataframe.DataFrame, colName: str) -> dataframe.DataFrame:
    windowSpec = Window.partitionBy('time', 'turbine_id')
    df = df.withColumn('mean_'+colName, mean(col(colName)).over(windowSpec)) \
        .withColumn('stddev_'+colName, stddev(col(colName)).over(windowSpec))
    df = df.withColumn('lower_bound_'+colName, col('mean_'+colName) - 2 * col('stddev_'+colName)) \
        .withColumn('upper_bound_'+colName, col('mean_'+colName) + 2 * col('stddev_'+colName))
    df = df.filter((col(colName) < col('lower_bound_'+colName)) | (col(colName) > col('upper_bound_'+colName)))
    df = df.drop('mean_'+colName, 'stddev_'+colName, 'wind-speed', 'wind-direction')
    return df


# Filter out the data that has already been uploaded
def filter_for_new_data(df: dataframe.DataFrame, last_upload_times: dict) -> dataframe.DataFrame:
    # Convert last upload times to a PySpark DataFrame
    last_upload_df = spark.createDataFrame(last_upload_times.items(), ['turbine_id2', 'last_timestamp'])
    
    # Join the new data with the last upload timestamps to filter out old rows
    df_filtered = df.join(last_upload_df, ((df['turbine_id'] == last_upload_df['turbine_id2']) & (df['timestamp'] == last_upload_df['last_timestamp'])), 'left')
           # .filter((df['timestamp'] > last_upload_df['last_timestamp']) | (last_upload_df['last_timestamp'].isNull()))
    df_filtered = df_filtered.filter(df_filtered['last_timestamp'].isNull())
    df_filtered = df_filtered.sort('timestamp')

    #print(df_filtered.show())
        
    
    # Drop the last_timestamp column
    df_filtered = df_filtered.drop('last_timestamp', 'turbine_id2')
    return df_filtered


def upload_data_to_sql(df: dataframe.DataFrame, table_name: str) -> None:
    df.write \
        .format("jdbc") \
        .option("url", f"jdbc:postgresql://{DB_HOST}/{DB_NAME}") \
        .option("dbtable", table_name) \
        .option("user", DB_USER) \
        .option("password", DB_PASSWORD) \
        .option("driver", "org.postgresql.Driver") \
        .save(mode='append')
    


In [51]:
# Read the CSV files into a DataFrames
sdf1 = spark.read.csv('./raw_data1/data_group_1.csv', header=True, inferSchema=True)
sdf2 = spark.read.csv('./raw_data1/data_group_2.csv', header=True, inferSchema=True)
sdf3 = spark.read.csv('./raw_data1/data_group_3.csv', header=True, inferSchema=True)
print('CSV files read into DataFrames')

# Join the DataFrames
sdf = sdf1.union(sdf2).union(sdf3)
print('DataFrames joined')

# Check if there are any null values and fill them with the mean of the nearest non-null values
clean_df = fill_nas_with_mean(sdf, 'wind_speed')
print('Null values filled')

# Check outliers and fill them with the mean of the nearest non-null values
clean_df = replace_outliers_with_mean(clean_df, 'wind_speed')
print('Outliers replaced')

# Add a column for the date and time
sdf = sdf.withColumn('date', (col('timestamp')).cast('date'))
sdf = sdf.withColumn('time', to_timestamp('timestamp', 'HH:mm:ss'))
clean_df = clean_df.withColumn('date', (col('timestamp')).cast('date'))
clean_df = clean_df.withColumn('time', to_timestamp('timestamp', 'HH:mm:ss'))
print('Date and time columns added')

# Get the last uploaded times for each turbine
last_uploaded_dates = get_last_times()

# Calculate the anomalies for each turbine
anomalies_df = detect_anomalies(clean_df, 'power_output')
print('Anomalies detected')

CSV files read into DataFrames
DataFrames joined
Null values filled
Outliers replaced
Date and time columns added
Anomalies detected


In [57]:
get_last_times()

{4: datetime.datetime(2022, 3, 25, 23, 0),
 14: datetime.datetime(2022, 3, 19, 23, 0),
 3: datetime.datetime(2022, 3, 25, 23, 0),
 10: datetime.datetime(2022, 3, 19, 23, 0),
 9: datetime.datetime(2022, 3, 19, 23, 0),
 7: datetime.datetime(2022, 3, 19, 23, 0),
 13: datetime.datetime(2022, 3, 19, 23, 0),
 1: datetime.datetime(2022, 3, 25, 23, 0),
 5: datetime.datetime(2022, 3, 25, 23, 0),
 2: datetime.datetime(2022, 3, 25, 23, 0),
 15: datetime.datetime(2022, 3, 19, 23, 0),
 6: datetime.datetime(2022, 3, 19, 23, 0),
 12: datetime.datetime(2022, 3, 19, 23, 0),
 8: datetime.datetime(2022, 3, 19, 23, 0),
 11: datetime.datetime(2022, 3, 19, 23, 0)}

In [56]:
filtered_raw_df = filter_for_new_data(sdf, last_uploaded_dates)
filtered_raw_df.show()

Py4JJavaError: An error occurred while calling o1090.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 17) (10.164.207.254 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:701)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:745)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:698)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:663)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:639)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:585)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:543)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 34 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:701)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:745)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:698)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:663)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:639)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:585)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:543)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 34 more


In [None]:
# Filter out the data that has already been uploaded
filtered_raw_df = filter_for_new_data(sdf, last_uploaded_dates)
filtered_clean_df = filter_for_new_data(clean_df, last_uploaded_dates)
filtered_anomalies_df = filter_for_new_data(anomalies_df, last_uploaded_dates)
filtered_anomalies_df = filtered_anomalies_df.drop('wind_speed', 'wind_direction')

print(filtered_anomalies_df.show())
print('New data filtered')


# Upload the raw and processed dataframes to the database
upload_data_to_sql(filtered_raw_df, 'turbine_data_raw')
upload_data_to_sql(filtered_clean_df, 'turbine_data_cleaned')
print('Data uploaded')

# Calculate the daily statistics for each turbine
daily_stats_df = calculate_daily_stats(clean_df, 'power_output')
print('Daily statistics calculated')

# Upload the daily statistics and anomalies to the database
upload_data_to_sql(daily_stats_df, 'summary_statistics')
print('Daily statistics uploaded')

# Upload the anomalies to the database
upload_data_to_sql(filtered_anomalies_df, 'anomalies')
print('Anomalies uploaded')

# Close the connection and cursor
cursor.close()
conn.close()

# Stop Spark Session
spark.stop()


In [3]:
# Initialize Spark Session
spark = SparkSession.builder.appName("WindTurbineDataPipeline").getOrCreate()

# Read the CSV file into a DataFrame
sdf1 = spark.read.csv('./raw_data/data_group_1.csv', header=True, inferSchema=True)
sdf2 = spark.read.csv('./raw_data/data_group_2.csv', header=True, inferSchema=True)
sdf3 = spark.read.csv('./raw_data/data_group_3.csv', header=True, inferSchema=True)
raw_sdf = sdf1.union(sdf2).union(sdf3)

In [39]:
raw_sdf.show()

+-------------------+----------+----------+--------------+------------+
|          timestamp|turbine_id|wind_speed|wind_direction|power_output|
+-------------------+----------+----------+--------------+------------+
|2022-03-01 00:00:00|         1|      11.8|           169|         2.7|
|2022-03-01 00:00:00|         2|      11.6|            24|         2.2|
|2022-03-01 00:00:00|         3|      13.8|           335|         2.3|
|2022-03-01 00:00:00|         4|      12.8|           238|         1.9|
|2022-03-01 00:00:00|         5|      11.4|           103|         3.5|
|2022-03-01 01:00:00|         1|      11.6|           152|         4.4|
|2022-03-01 01:00:00|         2|      12.8|            35|         4.2|
|2022-03-01 01:00:00|         3|      10.4|           169|         1.9|
|2022-03-01 01:00:00|         4|      13.9|           170|         2.4|
|2022-03-01 01:00:00|         5|      12.1|           165|         4.0|
|2022-03-01 02:00:00|         1|      13.8|            73|      

In [40]:
raw_sdf = raw_sdf.withColumn('date', (col('timestamp')).cast('date'))
raw_sdf = raw_sdf.withColumn('time', date_format('timestamp', 'HH:mm:ss'))
raw_sdf.show(4)

+-------------------+----------+----------+--------------+------------+----------+--------+
|          timestamp|turbine_id|wind_speed|wind_direction|power_output|      date|    time|
+-------------------+----------+----------+--------------+------------+----------+--------+
|2022-03-01 00:00:00|         1|      11.8|           169|         2.7|2022-03-01|00:00:00|
|2022-03-01 00:00:00|         2|      11.6|            24|         2.2|2022-03-01|00:00:00|
|2022-03-01 00:00:00|         3|      13.8|           335|         2.3|2022-03-01|00:00:00|
|2022-03-01 00:00:00|         4|      12.8|           238|         1.9|2022-03-01|00:00:00|
+-------------------+----------+----------+--------------+------------+----------+--------+
only showing top 4 rows



In [35]:
def get_last_times() -> dict:
    cursor.execute('SELECT turbineid, MAX(timestamp) FROM turbine_data_raw GROUP BY turbineid')
    dates = {a:b for a,b in cursor.fetchall()}
    return dates

# Calculate the mean of the nearest non-null values
def fill_nas_with_mean(df: dataframe.DataFrame, colName: str) -> dataframe.DataFrame:
    windowSpec = Window.partitionBy('turbine_id').orderBy('timestamp')
    df = df.withColumn('prev_value', lag(colName, 1).over(windowSpec))
    df = df.withColumn('next_value', lead(colName, 1).over(windowSpec))
    df = df.withColumn('mean_nearest', (col('prev_value') + col('next_value'))/2)
    df = df.withColumn(colName, coalesce(col(colName), col('mean_nearest')))
    df = df.drop('prev_value', 'next_value', 'mean_nearest')
    return df



def calculate_daily_stats(df: dataframe.DataFrame, colName: str) -> dataframe.DataFrame:
    df = df.groupBy('turbine_id', 'date').agg(
        mean(col(colName)).alias('avg_'+colName),
        min(col(colName)).alias('min_'+colName),
        max(col(colName)).alias('max_'+colName)
    )
    return df


def detect_anomalies(df: dataframe.DataFrame, colName: str) -> dataframe.DataFrame:
    windowSpec = Window.partitionBy('time', 'turbine_id')
    df = df.withColumn('mean_'+colName, mean(col(colName)).over(windowSpec)) \
        .withColumn('stddev_'+colName, stddev(col(colName)).over(windowSpec))
    df = df.withColumn('lower_bound_'+colName, col('mean_'+colName) - 2 * col('stddev_'+colName)) \
        .withColumn('upper_bound_'+colName, col('mean_'+colName) + 2 * col('stddev_'+colName))
    
    # TODO: MAKE IT FILTER FOR DATES THAT ARE NOT IN THE DATABASE
    df = df.filter((col(colName) < col('lower_bound_'+colName)) | (col(colName) > col('upper_bound_'+colName)))
    df = df.drop('mean_'+colName, 'stddev_'+colName)
    return df


def filter_for_new_data(df: dataframe.DataFrame, last_upload_times: dict) -> dataframe.DataFrame:
    # Convert last upload times to a PySpark DataFrame
    last_upload_df = spark.createDataFrame(last_upload_times.items(), ['TurbineID', 'LastTimestamp'])
    
    # Join the new data with the last upload timestamps to filter out old rows
    df_filtered = df.join(last_upload_df, 'TurbineID', 'left_anti') \
        .filter((df['Timestamp'] > last_upload_df['LastTimestamp']) | (last_upload_df['LastTimestamp'].isNull()))
    return df_filtered



    
# # Join the new data with the last upload timestamps to filter out old rows
# df_filtered = df.join(last_upload_df, 'TurbineID', 'left_anti') \
#     .filter((df['Timestamp'] > last_upload_df['LastTimestamp']) | (last_upload_df['LastTimestamp'].isNull()))


# # Add columns for mean and standard deviation of power output
# df = df.withColumn('mean_power', mean('PowerOutput').over(windowSpec)) \
#        .withColumn('stddev_power', stddev('PowerOutput').over(windowSpec))

# # Calculate the bounds for detecting anomalies
# df = df.withColumn('lower_bound', col('mean_power') - 2 * col('stddev_power')) \
#        .withColumn('upper_bound', col('mean_power') + 2 * col('stddev_power'))

# # Detect anomalies by checking if the power output is outside the bounds
# df_anomalies = df.filter((col('PowerOutput') < col('lower_bound')) | (col('PowerOutput') > col('upper_bound')))

# # Select only the relevant columns to display
# df_anomalies = df_anomalies.select('Date', 'Time', 'TurbineID', 'PowerOutput', 'mean_power', 'stddev_power', 'lower_bound', 'upper_bound')



In [66]:
print(f"\n{'_'*60}")


____________________________________________________________


In [4]:
# Initialize Spark Session
spark = SparkSession.builder.appName("WindTurbineDataPipeline").config('spark.jars', "./jdbc_driver/postgresql-42.7.1.jar").getOrCreate()

tdf = spark.read.csv('./raw_data/test.csv', header=True, inferSchema=True)

tdf.columns

['timestamp', 'turbine_id', 'wind_speed', 'wind_direction', 'power_output']

In [44]:
calculate_daily_stats(raw_sdf, 'power_output').show(4)

+----------+----------+--------------------+----------------+----------------+
|turbine_id|      date|average_power_output|min_power_output|max_power_output|
+----------+----------+--------------------+----------------+----------------+
|         3|2022-03-23|   2.795833333333333|             1.5|             4.2|
|         3|2022-03-21|   3.341666666666667|             1.5|             4.4|
|         3|2022-03-16|   2.904166666666667|             1.8|             4.3|
|         5|2022-03-06|   3.008333333333334|             1.5|             4.4|
+----------+----------+--------------------+----------------+----------------+
only showing top 4 rows



In [5]:
tdf = tdf.na.drop()
tdf = tdf.withColumn('date', (col('timestamp')).cast('date'))
tdf = tdf.withColumn('time', date_format('timestamp', 'HH:mm:ss'))
tdf.show(1)

+-------------------+----------+----------+--------------+------------+----------+--------+
|          timestamp|turbine_id|wind_speed|wind_direction|power_output|      date|    time|
+-------------------+----------+----------+--------------+------------+----------+--------+
|2022-03-01 00:00:00|        11|       9.1|           269|         2.9|2022-03-01|00:00:00|
+-------------------+----------+----------+--------------+------------+----------+--------+
only showing top 1 row



In [33]:
tdf = tdf.withColumn('date', (col('timestamp')).cast('date'))
tdf = tdf.withColumn('time', date_format('timestamp', 'HH:mm:ss'))

# tdf.write \
#         .format("jdbc") \
#         .option("url", f"jdbc:postgresql://{DB_HOST}/{DB_NAME}") \
#         .option("dbtable", 'turbine_data_raw') \
#         .option("user", DB_USER) \
#         .option("password", DB_PASSWORD) \
#         .option("driver", "org.postgresql.Driver") \
#         .save(mode='append')

# # Here we assume df has columns ['Date', 'Time', 'TurbineID', 'WindSpeed', 'WindDirection', 'PowerOutput']
# # Convert Spark DataFrame to a list of tuples
# data_to_insert = [[row.date, row.time, row.turbine_id, row.wind_speed, row.wind_direction, row.power_output, row.timestamp] for row in tdf.collect()]
data_to_insert = [[item for item in row] for row in tdf.collect()]

insert_query = f"""
    INSERT INTO turbine_data_raw ({', '.join(tdf.columns)})
    VALUES %s
"""

# # Insert data
execute_values(cursor, insert_query, data_to_insert)





In [31]:

data_to_insert = [[item for item in row] for row in tdf.collect()]
data_to_insert

[[datetime.datetime(2022, 3, 1, 0, 0),
  11,
  9.1,
  269,
  2.9,
  datetime.date(2022, 3, 1),
  '00:00:00'],
 [datetime.datetime(2022, 3, 1, 0, 0),
  12,
  11.3,
  316,
  2.5,
  datetime.date(2022, 3, 1),
  '00:00:00'],
 [datetime.datetime(2022, 3, 1, 0, 0),
  13,
  11.2,
  148,
  3.7,
  datetime.date(2022, 3, 1),
  '00:00:00'],
 [datetime.datetime(2022, 3, 1, 0, 0),
  14,
  10.7,
  97,
  1.6,
  datetime.date(2022, 3, 1),
  '00:00:00'],
 [datetime.datetime(2022, 3, 1, 0, 0),
  15,
  11.0,
  81,
  4.4,
  datetime.date(2022, 3, 1),
  '00:00:00'],
 [datetime.datetime(2022, 3, 1, 1, 0),
  11,
  12.3,
  245,
  1.8,
  datetime.date(2022, 3, 1),
  '01:00:00'],
 [datetime.datetime(2022, 3, 1, 1, 0),
  12,
  11.0,
  293,
  2.2,
  datetime.date(2022, 3, 1),
  '01:00:00'],
 [datetime.datetime(2022, 3, 1, 1, 0),
  13,
  11.4,
  270,
  1.9,
  datetime.date(2022, 3, 1),
  '01:00:00'],
 [datetime.datetime(2022, 3, 1, 1, 0),
  14,
  10.4,
  140,
  2.3,
  datetime.date(2022, 3, 1),
  '01:00:00'],
 [da

In [9]:
# get_last_times()
print(f"""
    INSERT INTO tol ({', '.join(tdf.columns)})
    VALUES %s
    """)


    INSERT INTO tol (timestamp, turbine_id, wind_speed, wind_direction, power_output, date, time)
    VALUES %s
    


In [28]:
sdf_raw.filter((col('turbine_id') == 1) & (col('time') == '03:00:00')).show()

+-------------------+----------+----------+--------------+------------+----------+--------+
|          timestamp|turbine_id|wind_speed|wind_direction|power_output|      date|    time|
+-------------------+----------+----------+--------------+------------+----------+--------+
|2022-03-01 03:00:00|         1|      10.5|            61|         1.8|2022-03-01|03:00:00|
|2022-03-02 03:00:00|         1|      10.0|           256|         4.4|2022-03-02|03:00:00|
|2022-03-03 03:00:00|         1|      13.1|            80|         1.9|2022-03-03|03:00:00|
|2022-03-04 03:00:00|         1|      12.3|           237|         4.0|2022-03-04|03:00:00|
|2022-03-05 03:00:00|         1|      13.5|            51|         2.9|2022-03-05|03:00:00|
|2022-03-06 03:00:00|         1|      14.3|           248|         1.6|2022-03-06|03:00:00|
|2022-03-07 03:00:00|         1|      13.4|            96|         3.5|2022-03-07|03:00:00|
|2022-03-08 03:00:00|         1|      10.2|           284|         3.8|2022-03-0

In [29]:
detdf = detectAnomalies(sdf, 'power_output')
detdf.show()

+-------------------+----------+----------+--------------+------------+----------+--------+------------------------+------------------------+
|          timestamp|turbine_id|wind_speed|wind_direction|power_output|      date|    time|lower_bound_power_output|upper_bound_power_output|
+-------------------+----------+----------+--------------+------------+----------+--------+------------------------+------------------------+
|2022-03-08 00:00:00|        10|      11.3|            56|         4.3|2022-03-08|00:00:00|       1.333915010562099|       4.259633376534676|
|2022-03-10 00:00:00|        10|      11.5|           314|         4.5|2022-03-10|00:00:00|       1.333915010562099|       4.259633376534676|
|2022-03-07 00:00:00|        15|      12.9|           318|         1.5|2022-03-07|00:00:00|       1.531087632359274|       5.010847851511695|
|2022-03-13 01:00:00|         4|      13.8|            79|         4.5|2022-03-13|01:00:00|      0.8914865721928433|       4.441846761140489|
|2022-

In [30]:
cdf = calculateDailyStats(sdf, 'power_output')
cdf[cdf['turbine_id'] == 1].orderBy('date').show()

+----------+----------+--------------------+----------------+----------------+
|turbine_id|      date|average_power_output|min_power_output|max_power_output|
+----------+----------+--------------------+----------------+----------------+
|         1|2022-03-01|  2.9749999999999996|             1.6|             4.4|
|         1|2022-03-02|  3.2375000000000003|             1.9|             4.5|
|         1|2022-03-03|  2.9250000000000007|             1.6|             4.4|
|         1|2022-03-04|  2.9875000000000007|             1.5|             4.4|
|         1|2022-03-05|  3.2458333333333322|             1.6|             4.3|
|         1|2022-03-06|  2.9583333333333335|             1.5|             4.4|
|         1|2022-03-07|   3.283333333333333|             1.9|             4.3|
|         1|2022-03-08|  3.2916666666666665|             1.7|             4.5|
|         1|2022-03-09|  2.8375000000000004|             1.5|             4.5|
|         1|2022-03-10|   3.120833333333333|        

In [209]:
columns = ['wind_speed', 'wind_direction', 'power_output']
for column in columns:
    print(column)
    fillNAsWithMean(sdf, column).show(2)

wind_speed
+-------------------+----------+----------+--------------+------------+----------+--------+
|          timestamp|turbine_id|wind_speed|wind_direction|power_output|      date|    time|
+-------------------+----------+----------+--------------+------------+----------+--------+
|2022-03-01 00:00:00|         1|      11.8|           169|         2.7|2022-03-01|00:00:00|
|2022-03-01 01:00:00|         1|      11.6|           152|         4.4|2022-03-01|01:00:00|
+-------------------+----------+----------+--------------+------------+----------+--------+
only showing top 2 rows

wind_direction
+-------------------+----------+----------+--------------+------------+----------+--------+
|          timestamp|turbine_id|wind_speed|wind_direction|power_output|      date|    time|
+-------------------+----------+----------+--------------+------------+----------+--------+
|2022-03-01 00:00:00|         1|      11.8|         169.0|         2.7|2022-03-01|00:00:00|
|2022-03-01 01:00:00|        

In [160]:
df = sdf.pandas_api()

In [187]:
type(sdf)

pyspark.sql.dataframe.DataFrame

In [143]:
df.columns

Index(['timestamp', 'turbine_id', 'wind_speed', 'wind_direction',
       'power_output', 'date', 'time'],
      dtype='object')

In [154]:
# Data Cleaning
df_clean = sdf.na.drop()  # Drops rows with missing values
# Assuming wind_speed and power_output are the columns of interest
stats = df_clean.select(
    mean(col('power_output')).alias('mean'),
    stddev(col('power_output')).alias('stddev')
).collect()

mean_power = stats[0]['mean']
stddev_power = stats[0]['stddev']

# Anomaly Detection
df_anomalies = df_clean.withColumn('z_score', (col('power_output') - mean_power) / stddev_power)
df_anomalies = df_anomalies.filter(abs(col('z_score')) > 2)

# Summary Statistics
df_summary = df_clean.groupBy('turbine_id', 'date').agg(
    mean(col('power_output')).alias('average_power'),
    min(col('power_output')).alias('min_power'),
    max(col('power_output')).alias('max_power')
)

df_summary.show()
mean_power
# # Store Processed Data
# # Replace `your_table` with your actual table name and configure the database settings
# df_clean.write.format('jdbc').option('url', 'jdbc:postgresql://dbserver').option('dbtable', 'your_table').save()

# # Store Summary Statistics
# df_summary.write.format('jdbc').option('url', 'jdbc:postgresql://dbserver').option('dbtable', 'summary_table').save()

# # Close Spark Session
# spark.stop()


+----------+----------+------------------+---------+---------+
|turbine_id|      date|     average_power|min_power|max_power|
+----------+----------+------------------+---------+---------+
|         3|2022-03-23| 2.795833333333333|      1.5|      4.2|
|         2|2022-03-26|2.9875000000000007|      1.5|      4.5|
|         3|2022-03-21| 3.341666666666667|      1.5|      4.4|
|         3|2022-03-16| 2.904166666666667|      1.8|      4.3|
|         5|2022-03-06| 3.008333333333334|      1.5|      4.4|
|         2|2022-03-31| 3.229166666666666|      1.6|      4.4|
|         1|2022-03-28|3.0750000000000006|      1.6|      4.5|
|         1|2022-03-12| 2.820833333333334|      1.6|      4.2|
|         5|2022-03-03| 2.941666666666667|      1.6|      4.3|
|         3|2022-03-24| 3.158333333333333|      1.9|      4.4|
|         2|2022-03-09|            2.9625|      1.5|      4.4|
|         5|2022-03-08|3.2791666666666663|      2.1|      4.5|
|         4|2022-03-02| 3.108333333333333|      1.5|   

2.988575268817212

In [67]:
# close spark session
spark.stop()

ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it