In [1]:
# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder \
      .appName("task01") \
      .getOrCreate()

In [6]:
# Given a PySpark DataFrame named “Billing” as below:
data = [
    ('Bob', '2022-01-31', 100),
    ('Bob', '2022-02-25', 140),
    ('Bob', '2022-03-15', 120),
    ('Lee', '2022-01-15', 150),
    ('Lee', '2022-02-28', 135),
    ('Nok', '2022-04-13', 200),
]
columns = ["Customer","PaidDate", "PaidAmount"]

# Create DataFrame
df_billing = spark.createDataFrame(data=data, schema=columns)
# Show DataFrame
df_billing.show()

+--------+----------+----------+
|Customer|  PaidDate|PaidAmount|
+--------+----------+----------+
|     Bob|2022-01-31|       100|
|     Bob|2022-02-25|       140|
|     Bob|2022-03-15|       120|
|     Lee|2022-01-15|       150|
|     Lee|2022-02-28|       135|
|     Nok|2022-04-13|       200|
+--------+----------+----------+



In [7]:
# Given “ExtraBills” as below
extra_data = [
    ('Nok', '2022-05-31', 201),
    ('Bob', '2022-03-25', 160),
]

df_extra_bills = spark.createDataFrame(data=extra_data, schema=columns)
# Show DataFrame
df_extra_bills.show()

+--------+----------+----------+
|Customer|  PaidDate|PaidAmount|
+--------+----------+----------+
|     Nok|2022-05-31|       201|
|     Bob|2022-03-25|       160|
+--------+----------+----------+



In [8]:
# Combine the two DataFrames
df_combined = df_billing.union(df_extra_bills)
# Show combined DataFrame
df_combined.show()

+--------+----------+----------+
|Customer|  PaidDate|PaidAmount|
+--------+----------+----------+
|     Bob|2022-01-31|       100|
|     Bob|2022-02-25|       140|
|     Bob|2022-03-15|       120|
|     Lee|2022-01-15|       150|
|     Lee|2022-02-28|       135|
|     Nok|2022-04-13|       200|
|     Nok|2022-05-31|       201|
|     Bob|2022-03-25|       160|
+--------+----------+----------+



In [12]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Convert PaidDate to date type if not already
df_billing = df_billing.withColumn("PaidDate", F.to_date("PaidDate"))

# Define window partitioned by Customer and ordered by PaidDate
w = Window.partitionBy("Customer").orderBy("PaidDate")

# Calculate the difference in days between consecutive PaidDate for each Customer
df_billing_with_diff = df_billing.withColumn(
    "PrevPaidDate", F.lag("PaidDate").over(w)
).withColumn(
    "DaysBetween", F.datediff("PaidDate", "PrevPaidDate")
)

# Remove the PrevPaidDate column if not needed
df_billing_with_diff = df_billing_with_diff.drop("PrevPaidDate")

df_billing_with_diff.show()

+--------+----------+----------+-----------+
|Customer|  PaidDate|PaidAmount|DaysBetween|
+--------+----------+----------+-----------+
|     Bob|2022-01-31|       100|       NULL|
|     Bob|2022-02-25|       140|         25|
|     Bob|2022-03-15|       120|         18|
|     Lee|2022-01-15|       150|       NULL|
|     Lee|2022-02-28|       135|         44|
|     Nok|2022-04-13|       200|       NULL|
+--------+----------+----------+-----------+

