#### load data cframe into spoark

In [None]:
import org.apache.spark.sql.SparkSession

// Create SparkSession
val spark = SparkSession.builder()
  .appName("SparkCSVExample")
  .getOrCreate()

// Load CSV file into DataFrame
val df = spark.read
  .option("header", "true")
  .csv("path/to/T1.csv")

// Show the schema of the DataFrame
df.printSchema()

// Display first few rows of the DataFrame
df.show()

// Perform any operations you need on the DataFrame, such as filtering, aggregations, etc.
// For example, to filter rows where Wind Speed is greater than 10:
val filteredDF = df.filter($"Wind Speed" > 10)
filteredDF.show()

// Perform other transformations or analyses as required
// Remember to handle any necessary type conversions or data cleaning as needed


##### publish the data from your CSV file into Apache Kafka in a streaming fashion using Apache Spark, 
##### you can use the kafka sink provided by Spark Structured Streaming.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_json, struct
from pyspark.sql.types import StructType

# Create SparkSession
spark = SparkSession.builder \
    .appName("CSVToKafka") \
    .getOrCreate()

# Define schema for the CSV file
schema = StructType().add("Date/Time", "string") \
                     .add("LV ActivePower", "double") \
                     .add("Wind Speed", "double") \
                     .add("Theo_Power_Curve", "double") \
                     .add("Wind Direction", "double")

# Path to the CSV file
csv_file_path = "path/to/T1.csv"

# Read CSV file into DataFrame
df = spark.readStream \
    .option("header", "true") \
    .schema(schema) \
    .csv(csv_file_path)

# Define Kafka parameters
kafka_bootstrap_servers = "localhost:9092"
kafka_topic = "your_topic_name"

# Define the Kafka sink
kafka_sink = df.selectExpr("to_json(struct(*)) AS value") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", kafka_topic) \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .start()

# Start the Kafka sink
kafka_sink.awaitTermination()


#### read data from Kafka in a streaming fashion using PySpark, you can use the readStream 
#### method along with the Kafka source.

In [None]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
    .appName("KafkaToSpark") \
    .getOrCreate()

# Define Kafka parameters
kafka_bootstrap_servers = "localhost:9092"
kafka_topic = "your_topic_name"

# Define the Kafka source
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load()

# Print the schema of the streaming DataFrame
df.printSchema()
'''
# Convert value column from binary to string
df = df.selectExpr("CAST(value AS STRING)")
'''


# Start the streaming query
query = df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Wait for the termination of the streaming query
query.awaitTermination()


#### write the data received from Kafka into a Delta table

### note how to make delta table




In [None]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
    .appName("CreateDeltaTable") \
    .getOrCreate()

# Create a sample DataFrame
data = [(1, 'Alice'), (2, 'Bob'), (3, 'Charlie')]
columns = ['id', 'name']
df = spark.createDataFrame(data, columns)

# Define the path for the Delta table
delta_table_path = "path/to/delta_table"

# Write the DataFrame as a Delta table
df.write.format("delta").save(delta_table_path)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, to_date, current_date, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DateType, TimestampType, MapType

# Create SparkSession
spark = SparkSession.builder \
    .appName("KafkaToDelta") \
    .getOrCreate()

# Define Kafka parameters
kafka_bootstrap_servers = "localhost:9092"
kafka_topic = "your_topic_name"

# Define schema for the DataFrame
schema = StructType([
    StructField("Date/Time", StringType(), True),
    StructField("LV ActivePower", StringType(), True),
    StructField("Wind Speed", StringType(), True),
    StructField("Theo_Power_Curve", StringType(), True),
    StructField("Wind Direction", StringType(), True)
])

# Define Kafka source
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load()

# Convert value column from binary to string and apply schema
df = df.selectExpr("CAST(value AS STRING) as value") \
    .select(from_json("value", schema).alias("data")) \
    .selectExpr("data.*")

# Convert 'Date/Time' column to DateType
df = df.withColumn("signal_date", to_date("Date/Time", "yyyy-MM-dd"))

# Define signals map
signals_map = {
    "LV ActivePower": "LV ActivePower",
    "Wind Speed": "Wind Speed",
    "Theo_Power_Curve": "Theo_Power_Curve",
    "Wind Direction": "Wind Direction"
}

# Create signals map column
df = df.withColumn("signals", struct([df[col].alias(col) for col in signals_map]))

# Define Delta table schema
delta_schema = StructType([
    StructField("signal_date", DateType(), True),
    StructField("signal_tc", TimestampType(), True),
    StructField("create_date", DateType(), True),
    StructField("create_ts", TimestampType(), True),
    StructField("signals", MapType(StringType(), StringType()), True)
])

# Write the DataFrame to Delta table
query = df.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .start("path/to/delta_table")

# Wait for the termination of the streaming query
query.awaitTermination()


In [None]:
## alternate for the above schema

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, to_date, to_timestamp, current_date, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DateType, TimestampType, MapType

# Create SparkSession
spark = SparkSession.builder \
    .appName("KafkaToDelta") \
    .getOrCreate()

# Define Kafka parameters
kafka_bootstrap_servers = "localhost:9092"
kafka_topic = "your_topic_name"

# Define schema for the DataFrame
schema = StructType([
    StructField("Date/Time", StringType(), True),
    StructField("LV ActivePower", StringType(), True),
    StructField("Wind Speed", StringType(), True),
    StructField("Theo_Power_Curve", StringType(), True),
    StructField("Wind Direction", StringType(), True)
])

# Define Kafka source
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load()

# Convert value column from binary to string and apply schema
df = df.selectExpr("CAST(value AS STRING) as value") \
    .select(from_json("value", schema).alias("data")) \
    .selectExpr("data.*")

# Convert 'Date/Time' column to DateType and signal_tc to TimestampType
df = df.withColumn("signal_date", to_date("Date/Time", "dd MM yyyy HH:mm")) \
    .withColumn("signal_tc", to_timestamp("Date/Time", "dd MM yyyy HH:mm"))

# Define signals map
signals_map = {
    "LV ActivePower": "LV ActivePower",
    "Wind Speed": "Wind Speed",
    "Theo_Power_Curve": "Theo_Power_Curve",
    "Wind Direction": "Wind Direction"
}

# Create signals map column
df = df.withColumn("signals", struct([df[col].alias(col) for col in signals_map]))

# Define Delta table schema
delta_schema = StructType([
    StructField("signal_date", DateType(), True),
    StructField("signal_tc", TimestampType(), True),
    StructField("create_date", DateType(), True),
    StructField("create_ts", TimestampType(), True),
    StructField("signals", MapType(StringType(), StringType()), True)
])

# Write the DataFrame to Delta table
query = df.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .start("path/to/delta_table")

# Wait for the termination of the streaming query
query.awaitTermination()


#### read data from a Delta Lake table using Spark, you can use the ' read ' 

In [None]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
    .appName("ReadDeltaLake") \
    .getOrCreate()

# Path to the Delta table
delta_table_path = "path/to/delta_table"

# Read Delta table as DataFrame
df = spark.read.format("delta").load(delta_table_path)

# Show the DataFrame
df.show()

# Perform any further operations on the DataFrame as needed



To calculate the distinct 'signal_tc' datapoints per day 

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format

# Create SparkSession
spark = SparkSession.builder \
    .appName("DistinctSignalTcPerDay") \
    .getOrCreate()

# Path to the Delta table
delta_table_path = "path/to/delta_table"

# Read Delta table as DataFrame
df = spark.read.format("delta").load(delta_table_path)

# Extract date from 'signal_tc'
df = df.withColumn("date", date_format(col("signal_tc"), "yyyy-MM-dd"))

# Calculate distinct 'signal_tc' datapoints per day
distinct_signal_tc_per_day = df.groupBy("date").agg({"signal_tc": "count"}).orderBy("date")

# Show the results
distinct_signal_tc_per_day.show()


average value of all the signals per hour,

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour

# Create SparkSession
spark = SparkSession.builder \
    .appName("AverageSignalPerHour") \
    .getOrCreate()

# Path to the Delta table
delta_table_path = "path/to/delta_table"

# Read Delta table as DataFrame
df = spark.read.format("delta").load(delta_table_path)

# Extract hour from 'signal_tc'
df = df.withColumn("hour", hour(col("signal_tc")))

# Group by hour and calculate average for each signal
average_per_hour = df.groupBy("hour").agg(
    {"LV ActivePower": "avg", 
     "Wind Speed": "avg", 
     "Theo_Power_Curve": "avg", 
     "Wind Direction": "avg"}
).orderBy("hour")

# Show the results
average_per_hour.show()
