### Reading data from raw container

In [0]:
df = spark.read.format("parquet").option("header", "true").load("/mnt/raw")

### Casting Columns to Date Datatype

In [0]:
from pyspark.sql.functions import col

df = df.withColumn("Open_Date", col("Open_Date").cast("Date"))
df = df.withColumn("Last_Consulted_Date", col("Last_Consulted_Date").cast("Date"))
df = df.withColumn("DOB", col("DOB").cast("Date"))

### Adding new column age

In [0]:
from pyspark.sql.functions import datediff, current_date, floor

df = df.withColumn("age", floor(datediff(current_date(), col("DOB")) / 365))

### Adding new column days_since_last_consulted

In [0]:
from pyspark.sql.functions import datediff, current_date

df = df.withColumn("days_since_last_consulted", datediff(current_date(), col("Last_Consulted_Date")))

In [0]:
distinct_countries = df.select("country").distinct().rdd.map(lambda row: row[0]).collect()

### Consider the latest record for a customer

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

# Define window spec
windowSpec = Window.partitionBy("Customer_Id").orderBy(col("Last_Consulted_Date").desc())

# Add a row number for each row within each partition of Customer_Id
df_with_row_number = df.withColumn("row_number", row_number().over(windowSpec))

# Filter rows to get the most recent Last_Consulted_Date for each Customer_Id
df_recent = df_with_row_number.filter(col("row_number") == 1).drop("row_number")

# Filter rows to get older records for each Customer_Id
df_older = df_with_row_number.filter(col("row_number") > 1).drop("row_number")

### writing data to individual country directories in ADLS 

In [0]:
for country in distinct_countries:
    country_df = df_recent.filter(df_recent['country'] == country)
    country_path = f"/mnt/processed/{country}"
    country_df.write.mode("overwrite").parquet(country_path)

df_older.write.mode("overwrite").parquet("/mnt/processed/older")