In [0]:
from pyspark.sql import SparkSession
import random
from datetime import datetime, timedelta

spark = SparkSession.builder.appName("PracticeDF").getOrCreate()

customers = ["C001","C002","C003","C004","C005","C006","C007","C008"]
products = ["Laptop","Mobile","Tablet","Headphone","TV"]
cities = ["Pune","Mumbai","Delhi","Bangalore","Chennai"]

data = []

start_date = datetime(2024,1,1)

for i in range(150):   # 150 records
    record = (
        i,  # transaction_id
        random.choice(customers),
        random.choice(products),
        random.randint(1,5),   # quantity
        random.randint(1000,50000),  # price
        random.choice(cities),
        start_date + timedelta(days=random.randint(0,365))
    )
    data.append(record)

columns = ["txn_id","customer_id","product","quantity","price","city","txn_date"]

df = spark.createDataFrame(data, columns)

df.show()

Find Duplicate Transactions

In [0]:
df.groupBy(df.columns).count().filter("count > 1").show()

 Remove Duplicates & Keep Latest Record

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

window_spec = Window.partitionBy("txn_id").orderBy(desc("txn_date"))

df_clean = df.withColumn("rn", row_number().over(window_spec)) \
             .filter("rn = 1") \
             .drop("rn")

df_clean.show()

Check Null Values


In [0]:
from pyspark.sql.functions import col, count, when

df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()


In [0]:
Replace Null Price with Average

In [0]:
from pyspark.sql.functions import avg

avg_price = df.select(avg("price")).collect()[0][0]

df = df.fillna({"price": avg_price})
df.display()

In [0]:
df.display()