In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, TimestampType
from pyspark.sql.functions import avg, count, col, last, first, lit
import pandas as pd
from pyspark.sql.window import Window
import random
from datetime import datetime, timedelta

# Initialize Spark Session
spark = SparkSession.builder.appName("SampleDataCreation").getOrCreate()
 
# Define schema for the DataFrame
schema = StructType([
    StructField("meter_point_id", IntegerType(), True),
    StructField("ssc", StringType(), True),
    StructField("profile_class", IntegerType(), True),
    StructField("tpr", StringType(), True),
    StructField("eac_id", IntegerType(), True),
    StructField("eac", FloatType(), True),
    StructField("effective_from_date", TimestampType(), True),
    StructField("effective_to_date", TimestampType(), True),
])

# Generate sample data
sample_data = []
num_records = 20
start_date = datetime(2023, 1, 1)

for i in range(num_records):
    meter_point_id = random.randint(1, 5)
    ssc = random.choice(['A', 'B', 'C', None])
    profile_class = random.choice([1, 2, 3, 4, None])
    tpr = random.choice(['TPR1', 'TPR2', 'TPR3'])
    eac_id = i + 1
    eac = round(random.uniform(10.0, 100.0), 2)
    effective_from_date = start_date + timedelta(days=random.randint(0, 100))
    effective_to_date = effective_from_date + timedelta(days=random.randint(1, 100))
    sample_data.append((meter_point_id, ssc, profile_class, tpr, eac_id, eac, effective_from_date, effective_to_date))

# Create DataFrame from sample data
df = spark.createDataFrame(sample_data, schema)

# Show sample data
df.show()

+--------------+----+-------------+----+------+-----+-------------------+-------------------+
|meter_point_id| ssc|profile_class| tpr|eac_id|  eac|effective_from_date|  effective_to_date|
+--------------+----+-------------+----+------+-----+-------------------+-------------------+
|             1|   A|            3|TPR1|     1| 62.0|2023-04-08 00:00:00|2023-06-21 00:00:00|
|             1|   B|            2|TPR3|     2|42.81|2023-03-30 00:00:00|2023-04-15 00:00:00|
|             4|   B|            4|TPR3|     3|91.27|2023-03-12 00:00:00|2023-05-01 00:00:00|
|             5|   C|            2|TPR1|     4|26.04|2023-01-03 00:00:00|2023-02-18 00:00:00|
|             1|   C|         NULL|TPR3|     5|16.16|2023-04-04 00:00:00|2023-04-08 00:00:00|
|             1|NULL|         NULL|TPR2|     6|18.36|2023-02-21 00:00:00|2023-05-05 00:00:00|
|             5|   A|         NULL|TPR2|     7|58.27|2023-02-07 00:00:00|2023-05-10 00:00:00|
|             3|NULL|            2|TPR2|     8|82.78|2023-02

In [0]:
def process_data(df) -> df:
    """
    Process EACs.
    """
    keep_cols = [
        'meter_point_id', 'ssc', 'profile_class', 'tpr', 'eac_id', 'eac', 
        'effective_from_date', 'effective_to_date'
    ]
    df = df.select(keep_cols)


Here, a WindowSpec is defined. A WindowSpec defines the partitioning and ordering of rows for window functions.
partitionBy("meter_point_id") partitions the data by the "meter_point_id" column. This means that window functions will operate on partitions of rows that share the same "meter_point_id" value.
orderBy("effective_from_date") orders the rows within each partition by the "effective_from_date" column.

In [0]:
from pyspark.sql.functions import col
# Assuming df is a Spark DataFrame
df_filled = df.orderBy(['meter_point_id', 'effective_from_date']) \
              .select(['meter_point_id', 'ssc', 'profile_class'])

df_filled.show()

+--------------+---+-------------+
|meter_point_id|ssc|profile_class|
+--------------+---+-------------+
|             1|  B|         NULL|
|             1|  B|            2|
|             1|  C|         NULL|
|             1|  A|            3|
|             2|  B|            2|
|             3|  C|            3|
|             3|  C|            4|
|             3|  C|            4|
|             3|  C|            2|
|             3|  C|            1|
|             4|  A|            2|
|             4|  C|            1|
|             4|  B|            1|
|             4|  B|            2|
|             4|  B|            1|
|             4|  B|            4|
|             5|  C|            2|
|             5|  A|         NULL|
|             5|  A|         NULL|
|             5|  B|            1|
+--------------+---+-------------+



##Forward filling 

In [0]:
window = Window.rowsBetween(float('-inf'),0) ## specifying direction of filling

 # defining the forward-filled column
filled_column_1 = last(df_filled['ssc'], ignorenulls=True).over(window)


# replacing the columns with forward-filled columns
df=df_filled.withColumn('ssc', filled_column_1)
df.show()

+--------------+---+-------------+
|meter_point_id|ssc|profile_class|
+--------------+---+-------------+
|             1|  B|         NULL|
|             1|  B|            2|
|             1|  C|         NULL|
|             1|  A|            3|
|             2|  B|            2|
|             3|  C|            3|
|             3|  C|            4|
|             3|  C|            4|
|             3|  C|            2|
|             3|  C|            1|
|             4|  A|            2|
|             4|  C|            1|
|             4|  B|            1|
|             4|  B|            2|
|             4|  B|            1|
|             4|  B|            4|
|             5|  C|            2|
|             5|  A|         NULL|
|             5|  A|         NULL|
|             5|  B|            1|
+--------------+---+-------------+



##BACKWARD FILLING


In [0]:
window = Window.rowsBetween(0,float('inf')) ## specifying direction of filling


 # defining the forward-filled column
filled_column_1 = first(df['ssc'], ignorenulls=True).over(window)


# replacing the columns with forward-filled columns
df=df.withColumn('ssc', filled_column_1)
df.show()

+--------------+---+-------------+
|meter_point_id|ssc|profile_class|
+--------------+---+-------------+
|             1|  B|         NULL|
|             1|  B|            2|
|             1|  C|         NULL|
|             1|  A|            3|
|             2|  B|            2|
|             3|  C|            3|
|             3|  C|            4|
|             3|  C|            4|
|             3|  C|            2|
|             3|  C|            1|
|             4|  A|            2|
|             4|  C|            1|
|             4|  B|            1|
|             4|  B|            2|
|             4|  B|            1|
|             4|  B|            4|
|             5|  C|            2|
|             5|  A|         NULL|
|             5|  A|         NULL|
|             5|  B|            1|
+--------------+---+-------------+



In [0]:

# Now, use fillna to fill remaining null values in other columns
for col in df_filled.columns:
    # Check if the column exists in 'df' before filling
    if col in df.columns:
        # Extract the scalar value from df_filled and fillna
        fill_value = df_filled.select(col).first()[col]
        if fill_value is not None:
            df = df.fillna(fill_value, subset=[col])

# Show the DataFrame after filling remaining null values
df.show()

+--------------+---+-------------+
|meter_point_id|ssc|profile_class|
+--------------+---+-------------+
|             1|  B|         NULL|
|             1|  B|            2|
|             1|  C|         NULL|
|             1|  A|            3|
|             2|  B|            2|
|             3|  C|            3|
|             3|  C|            4|
|             3|  C|            4|
|             3|  C|            2|
|             3|  C|            1|
|             4|  A|            2|
|             4|  C|            1|
|             4|  B|            1|
|             4|  B|            2|
|             4|  B|            1|
|             4|  B|            4|
|             5|  C|            2|
|             5|  A|         NULL|
|             5|  A|         NULL|
|             5|  B|            1|
+--------------+---+-------------+



In [0]:
df.display()

meter_point_id,ssc,profile_class
1,B,
1,B,2.0
1,C,
1,A,3.0
2,B,2.0
3,C,3.0
3,C,4.0
3,C,4.0
3,C,2.0
3,C,1.0


In [0]:
dbutils.help()

In [0]:
dbutils.fs.help()

In [0]:
dbutils.fs.mkdirs('dbfs/FileStore/sql')

True

In [0]:
dbutils.notebook.help()

In [0]:
Window Specification for Forward Fill:
Window.orderBy("id"): Creates a window specification that orders rows by the id column.
.rowsBetween(Window.unboundedPreceding, 0): Defines the window frame to include all rows from the start of the partition (Window.unboundedPreceding) up to the current row (0). This means the window includes all preceding rows and the current row, which is necessary for forward filling.

In [0]:
# Apply forward fill using last function ignoring nulls
df_ffill = df.withColumn("ffilled_value", F.last("value", ignorenulls=True).over(window_spec_ffill))


# Apply forward fill using last function ignoring nulls
df_ffill = df.withColumn("ffilled_value", F.last("value", ignorenulls=True).over(window_spec_ffill))
#Apply Forward Fill:
df.withColumn("ffilled_value", ...) :

 Adds a new column ffilled_value to the DataFrame df.
F.last("value", ignorenulls=True).over(window_spec_ffill):

 Uses the last function to get the last non-null value in the specified window (window_spec_ffill), effectively performing a forward fill.
F.last("value", ignorenulls=True): Returns the last non-null value from the column value within the window.
.over(window_spec_ffill): Applies the window specification defined earlier for forward fill.


Window Specification for Backward Fill:
Window.orderBy("id"): Again, creates a window specification that orders rows by the id column.
.rowsBetween(0, Window.unboundedFollowing): Defines the window frame to include all rows from the current row (0) to the end of the partition (Window.unboundedFollowing). This means the window includes the current row and all subsequent rows, which is necessary for backward filling.


In [0]:
# Apply backward fill using first function ignoring nulls
df_filled = df_ffill.withColumn("filled_value", F.first("ffilled_value", ignorenulls=True).over(window_spec_bfill))


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StringType
from pyspark.sql.functions import pandas_udf
import pandas as pd

# Initialize SparkSession
spark = SparkSession.builder.appName("FillNA").getOrCreate()

# Sample DataFrame
data = [
    (1, None),
    (2, None),
    (3, "a"),
    (4, None),
    (5, "b"),
    (6, None)
]
df = spark.createDataFrame(data, ["id", "value"])



Apply Backward Fill:
df_ffill.withColumn("filled_value", ...): Adds a new column filled_value to the DataFrame df_ffill (which already includes the forward-filled values).
F.first("ffilled_value", ignorenulls=True).over(window_spec_bfill): Uses the first function to get the first non-null value in the specified window (window_spec_bfill), effectively performing a backward fill.
F.first("ffilled_value", ignorenulls=True): Returns the first non-null value from the column ffilled_value within the window.
.over(window_spec_bfill): Applies the window specification defined earlier for backward fill.

In [0]:
df_filled.show()


Show the Result:
df_filled.show(): Displays the content of the resulting DataFrame df_filled with the columns ffilled_value (forward-filled values) and filled_value (forward and backward-filled values).
Summary
Define Window Specifications:

window_spec_ffill: For forward filling, includes all rows up to the current row.
window_spec_bfill: For backward filling, includes the current row and all subsequent rows.
Apply Forward Fill:

Use F.last with ignorenulls=True to get the last non-null value within the forward fill window.
Apply Backward Fill:

Use F.first with ignorenulls=True to get the first non-null value within the backward fill window.
Display the Result:

Show the DataFrame with the new columns showing the forward and backward filled values.


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder.appName("FillNA").getOrCreate()

# Sample DataFrame
data = [
    (1, None, None),
    (2, None, 2),
    (3, "a", 3),
    (4, None, 1),
    (5, "b", None),
    (6, None, 2)
]
df = spark.createDataFrame(data, ["id", "value", "value2"])

df.show()


+---+-----+------+
| id|value|value2|
+---+-----+------+
|  1| NULL|  NULL|
|  2| NULL|     2|
|  3|    a|     3|
|  4| NULL|     1|
|  5|    b|  NULL|
|  6| NULL|     2|
+---+-----+------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window


# Define window specifications for forward fill
window_spec_ffill = Window.orderBy("id").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Apply forward fill using last function ignoring nulls
df_ffill = df.withColumn("value", F.last("value", ignorenulls=True).over(window_spec_ffill)) \
             .withColumn("value2", F.last("value2", ignorenulls=True).over(window_spec_ffill))

df_ffill.show()

+---+-----+------+
| id|value|value2|
+---+-----+------+
|  1| NULL|  NULL|
|  2| NULL|     2|
|  3|    a|     3|
|  4|    a|     1|
|  5|    b|     1|
|  6|    b|     2|
+---+-----+------+



In [0]:
# Define window specifications for backward fill
window_spec_bfill = Window.orderBy("id").rowsBetween(Window.currentRow, Window.unboundedFollowing)

# Apply backward fill using first function ignoring nulls
df_bfill = df_ffill.withColumn("value", F.first("value", ignorenulls=True).over(window_spec_bfill)) \
             .withColumn("value2", F.first("value2", ignorenulls=True).over(window_spec_bfill))

df_bfill.show()

+---+-----+------+
| id|value|value2|
+---+-----+------+
|  1|    a|     2|
|  2|    a|     2|
|  3|    a|     3|
|  4|    a|     1|
|  5|    b|     1|
|  6|    b|     2|
+---+-----+------+



In [0]:

# Select the original columns to return
df_filled1 = df_bfill.select("id", "value", "value2")
df_filled1.show()

+---+-----+------+
| id|value|value2|
+---+-----+------+
|  1|    a|     2|
|  2|    a|     2|
|  3|    a|     3|
|  4|    a|     1|
|  5|    b|     1|
|  6|    b|     2|
+---+-----+------+

