# Import necess lib and Create Spark Session #

In [0]:
# Import pyspark
import pyspark

# Import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# Import Function as F
import pyspark.sql.functions as F

In [0]:
# Create SparkSession
spark = SparkSession.builder.master("local[*]") \
                            .appName("ManipulateDataframe") \
                            .getOrCreate()

# Print SparkVersion
print("Spark version: ", spark.version)

Spark version:  3.5.0


## 1. Read dataframe into CSV ##

In [0]:
# Define file
file_path = "dbfs:/FileStore/tables/car_data/car_data.csv"
file_type = "csv"

# Read dataset into Spark Dataframe
df_car = spark.read.format(file_type) \
                   .option("header", True) \
                   .option("inferSchema", True) \
                   .load(file_path)

# Display df_car
display(df_car)

Car_Name,Year,Selling_Price,Present_Price,Kms_Driven,Fuel_Type,Seller_Type,Transmission,Owner
ritz,2014,3.35,5.59,27000,Petrol,Dealer,Manual,0
sx4,2013,4.75,9.54,43000,Diesel,Dealer,Manual,0
ciaz,2017,7.25,9.85,6900,Petrol,Dealer,Manual,0
wagon r,2011,2.85,4.15,5200,Petrol,Dealer,Manual,0
swift,2014,4.6,6.87,42450,Diesel,Dealer,Manual,0
vitara brezza,2018,9.25,9.83,2071,Diesel,Dealer,Manual,0
ciaz,2015,6.75,8.12,18796,Petrol,Dealer,Manual,0
s cross,2015,6.5,8.61,33429,Diesel,Dealer,Manual,0
ciaz,2016,8.75,8.89,20273,Diesel,Dealer,Manual,0
ciaz,2015,7.45,8.92,42367,Diesel,Dealer,Manual,0


In [0]:
# Show schema
df_car.printSchema()

root
 |-- Car_Name: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Selling_Price: double (nullable = true)
 |-- Present_Price: double (nullable = true)
 |-- Kms_Driven: integer (nullable = true)
 |-- Fuel_Type: string (nullable = true)
 |-- Seller_Type: string (nullable = true)
 |-- Transmission: string (nullable = true)
 |-- Owner: integer (nullable = true)



## 2. Rename all column ##

In [0]:
# Rename column
df_car_new = df_car.withColumnRenamed("Car_Name", "car_name") \
                   .withColumnRenamed("Year", "year") \
                   .withColumnRenamed("Selling_Price", "selling_price") \
                   .withColumnRenamed("Present_Price", "present_price") \
                   .withColumnRenamed("Kms_Driven", "kms_driven") \
                   .withColumnRenamed("Fuel_Type", "fuel_type") \
                   .withColumnRenamed("Seller_Type", "seller_type") \
                   .withColumnRenamed("Transmission", "transmission") \
                   .withColumnRenamed("Owner", "owner")

# Display new df
display(df_car_new)

car_name,year,selling_price,present_price,kms_driven,fuel_type,seller_type,transmission,owner
ritz,2014,3.35,5.59,27000,Petrol,Dealer,Manual,0
sx4,2013,4.75,9.54,43000,Diesel,Dealer,Manual,0
ciaz,2017,7.25,9.85,6900,Petrol,Dealer,Manual,0
wagon r,2011,2.85,4.15,5200,Petrol,Dealer,Manual,0
swift,2014,4.6,6.87,42450,Diesel,Dealer,Manual,0
vitara brezza,2018,9.25,9.83,2071,Diesel,Dealer,Manual,0
ciaz,2015,6.75,8.12,18796,Petrol,Dealer,Manual,0
s cross,2015,6.5,8.61,33429,Diesel,Dealer,Manual,0
ciaz,2016,8.75,8.89,20273,Diesel,Dealer,Manual,0
ciaz,2015,7.45,8.92,42367,Diesel,Dealer,Manual,0


## 3. Add some new column ##

In [0]:
# Add new column
df_car_new = df_car_new.withColumn("selling_difference_present", F.col("present_price") - F.col("selling_price")) \
                       .withColumn("ms_driven", (F.col("kms_driven") / 1000))     
# Display df
display(df_car_new)

car_name,year,selling_price,present_price,kms_driven,fuel_type,seller_type,transmission,owner,selling_difference_present,ms_driven
ritz,2014,3.35,5.59,27000,Petrol,Dealer,Manual,0,2.24,27.0
sx4,2013,4.75,9.54,43000,Diesel,Dealer,Manual,0,4.789999999999999,43.0
ciaz,2017,7.25,9.85,6900,Petrol,Dealer,Manual,0,2.6,6.9
wagon r,2011,2.85,4.15,5200,Petrol,Dealer,Manual,0,1.3000000000000005,5.2
swift,2014,4.6,6.87,42450,Diesel,Dealer,Manual,0,2.2700000000000005,42.45
vitara brezza,2018,9.25,9.83,2071,Diesel,Dealer,Manual,0,0.5800000000000001,2.071
ciaz,2015,6.75,8.12,18796,Petrol,Dealer,Manual,0,1.3699999999999992,18.796
s cross,2015,6.5,8.61,33429,Diesel,Dealer,Manual,0,2.1099999999999994,33.429
ciaz,2016,8.75,8.89,20273,Diesel,Dealer,Manual,0,0.1400000000000005,20.273
ciaz,2015,7.45,8.92,42367,Diesel,Dealer,Manual,0,1.4699999999999998,42.367


## 4. Filter dataframe ##

In [0]:
# Method 1: Using command in code
df_car_filter_1 = df_car_new.filter((F.col("year") == '2014') & (F.col("fuel_type") == 'Petrol'))

display(df_car_filter_1)

car_name,year,selling_price,present_price,kms_driven,fuel_type,seller_type,transmission,owner,selling_difference_present,ms_driven
ritz,2014,3.35,5.59,27000,Petrol,Dealer,Manual,0,2.24,27.0
alto k10,2014,2.5,3.46,45280,Petrol,Dealer,Manual,0,0.96,45.28
alto k10,2014,2.55,3.98,46706,Petrol,Dealer,Manual,0,1.4300000000000002,46.706
ciaz,2014,7.5,12.04,15000,Petrol,Dealer,Automatic,0,4.539999999999999,15.0
etios g,2014,4.1,6.8,39485,Petrol,Dealer,Manual,1,2.7,39.485
etios g,2014,4.75,6.76,40000,Petrol,Dealer,Manual,0,2.01,40.0
Hyosung GT250R,2014,1.35,3.45,16500,Petrol,Individual,Manual,1,2.1,16.5
KTM 390 Duke,2014,1.15,2.4,7000,Petrol,Individual,Manual,0,1.25,7.0
Honda CBR 150,2014,0.65,1.2,23500,Petrol,Individual,Manual,0,0.5499999999999999,23.5
Bajaj Pulsar NS 200,2014,0.6,0.99,25000,Petrol,Individual,Manual,0,0.39,25.0


In [0]:
# Method 2: Using command like SQL
df_car_filter_2 = df_car_new.filter("Year == 2014 AND fuel_type == 'Petrol' ")

display(df_car_filter_2)

car_name,year,selling_price,present_price,kms_driven,fuel_type,seller_type,transmission,owner,selling_difference_present,ms_driven
ritz,2014,3.35,5.59,27000,Petrol,Dealer,Manual,0,2.24,27.0
alto k10,2014,2.5,3.46,45280,Petrol,Dealer,Manual,0,0.96,45.28
alto k10,2014,2.55,3.98,46706,Petrol,Dealer,Manual,0,1.4300000000000002,46.706
ciaz,2014,7.5,12.04,15000,Petrol,Dealer,Automatic,0,4.539999999999999,15.0
etios g,2014,4.1,6.8,39485,Petrol,Dealer,Manual,1,2.7,39.485
etios g,2014,4.75,6.76,40000,Petrol,Dealer,Manual,0,2.01,40.0
Hyosung GT250R,2014,1.35,3.45,16500,Petrol,Individual,Manual,1,2.1,16.5
KTM 390 Duke,2014,1.15,2.4,7000,Petrol,Individual,Manual,0,1.25,7.0
Honda CBR 150,2014,0.65,1.2,23500,Petrol,Individual,Manual,0,0.5499999999999999,23.5
Bajaj Pulsar NS 200,2014,0.6,0.99,25000,Petrol,Individual,Manual,0,0.39,25.0


## 5. Sort Dataframe ##

In [0]:
# Sort Dataframe
df_sort = df_car_new.orderBy(F.col("year"), ascending = True)

# Display 
display(df_sort)

car_name,year,selling_price,present_price,kms_driven,fuel_type,seller_type,transmission,owner,selling_difference_present,ms_driven
800,2003,0.35,2.28,127000,Petrol,Individual,Manual,0,1.93,127.0
sx4,2003,2.25,7.98,62000,Petrol,Dealer,Manual,0,5.73,62.0
corolla,2004,1.5,12.35,135154,Petrol,Dealer,Automatic,0,10.85,135.154
innova,2005,2.75,10.21,90000,Petrol,Individual,Manual,0,7.460000000000001,90.0
innova,2005,3.49,13.46,197176,Diesel,Dealer,Manual,0,9.97,197.176
innova,2005,3.51,13.7,75000,Petrol,Dealer,Manual,0,10.19,75.0
Hero Super Splendor,2005,0.2,0.57,55000,Petrol,Individual,Manual,0,0.3699999999999999,55.0
wagon r,2006,1.05,4.15,65000,Petrol,Dealer,Manual,0,3.1000000000000005,65.0
camry,2006,2.5,23.73,142000,Petrol,Individual,Automatic,3,21.23,142.0
Bajaj Pulsar 150,2006,0.1,0.75,92233,Petrol,Individual,Manual,0,0.65,92.233


## 6. Remove Duplicates in Dataframe ##

In [0]:
df_drop_duplicates = df_car_new.dropDuplicates()

display(df_drop_duplicates)

car_name,year,selling_price,present_price,kms_driven,fuel_type,seller_type,transmission,owner,selling_difference_present,ms_driven
Bajaj Discover 125,2012,0.2,0.57,25000,Petrol,Individual,Manual,1,0.3699999999999999,25.0
verna,2013,5.11,9.4,36198,Petrol,Dealer,Automatic,0,4.29,36.198
Royal Enfield Classic 350,2013,1.0,1.47,46500,Petrol,Individual,Manual,0,0.47,46.5
brio,2015,5.25,5.9,14465,Petrol,Dealer,Manual,0,0.6500000000000004,14.465
swift,2014,4.95,7.49,39000,Diesel,Dealer,Manual,0,2.54,39.0
wagon r,2006,1.05,4.15,65000,Petrol,Dealer,Manual,0,3.1000000000000005,65.0
Honda CBR 150,2014,0.65,1.2,23500,Petrol,Individual,Manual,0,0.5499999999999999,23.5
Royal Enfield Thunder 350,2016,1.2,1.5,18000,Petrol,Individual,Manual,0,0.3,18.0
Royal Enfield Bullet 350,2016,1.05,1.17,6000,Petrol,Individual,Manual,0,0.1199999999999998,6.0
TVS Apache RTR 160,2012,0.6,0.81,19000,Petrol,Individual,Manual,0,0.21,19.0


## 7. Use Group By in Spark Dataframe ##

In [0]:
# Count fuel_type
fuel_type_count = df_car_new.groupBy("fuel_type").count()

# Show
display(fuel_type_count)

fuel_type,count
Diesel,60
CNG,2
Petrol,239


In [0]:
# Count seller_type
seller_type_count = df_car_new.groupBy("seller_type").count()

# Display result
display(seller_type_count)

seller_type,count
Individual,106
Dealer,195


In [0]:
# Count owner
owner_count = df_car_new.groupBy("owner").count()

# Display result
display(owner_count)

owner,count
1,10
3,1
0,290


In [0]:
# Count year
year_count = df_car_new.groupBy("year").count()

# Display result
display(year_count)

year,count
2003,2
2007,2
2018,1
2015,61
2006,4
2013,33
2014,38
2004,1
2012,23
2009,6


## 8. Merge two dataframe ##

In [0]:
# Define file
file_path = "dbfs:/FileStore/tables/dataset/menu.csv"
file_type = "csv"

# Define schema
menu_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("price", StringType(), True)
])

# Read dataset into Spark Dataframe
df_menu = spark.read.format(file_type) \
                   .schema(menu_schema) \
                   .load(file_path)

# Drop column
df_menu = df_menu.drop("_c3")

# Display df_car
display(df_menu)

product_id,product_name,price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


In [0]:
# Define file
file_path = "dbfs:/FileStore/tables/dataset/sales.csv"
file_type = "csv"

# Define Schema
sales_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("customer_name", StringType(), True),
    StructField("time_sales", DateType(), True),
    StructField("country", StringType(), True),
    StructField("source_type", StringType(), True)
])

# Read dataset into Spark Dataframe
df_sales = spark.read.format(file_type) \
                   .schema(sales_schema) \
                   .load(file_path)

# Display df_car
display(df_sales)

product_id,customer_name,time_sales,country,source_type
1,A,2023-01-01,India,Swiggy
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy


In [0]:
# Merge two dataframe by product ID
df_merge = df_sales.join(df_menu, on = "product_id", how = "inner") \
                   .select(["product_id", "product_name", "price", "customer_name", "time_sales", "country", "source_type"])

# Display df_merge
display(df_merge)

product_id,product_name,price,customer_name,time_sales,country,source_type
1,PIZZA,100,A,2023-01-01,India,Swiggy
2,Chowmin,150,A,2022-01-01,India,Swiggy
2,Chowmin,150,A,2023-01-07,India,Swiggy
3,sandwich,120,A,2023-01-10,India,Restaurant
3,sandwich,120,A,2022-01-11,India,Swiggy
3,sandwich,120,A,2023-01-11,India,Restaurant
2,Chowmin,150,B,2022-02-01,India,Swiggy
2,Chowmin,150,B,2023-01-02,India,Swiggy
1,PIZZA,100,B,2023-01-04,India,Restaurant
1,PIZZA,100,B,2023-02-11,India,Swiggy


## 9. When, Otherwise in PySpark ##

In [0]:
# Create a new column day, month, year
fact_sales = df_merge.withColumn("day", F.day(F.col("time_sales"))) \
                     .withColumn("month", F.month(F.col("time_sales"))) \
                     .withColumn("year", F.year(F.col("time_sales")))

# Drop column time_sales
fact_sales = fact_sales.drop("time_sales")

# Display df
display(fact_sales)

product_id,product_name,price,customer_name,country,source_type,day,month,year
1,PIZZA,100,A,India,Swiggy,1,1,2023
2,Chowmin,150,A,India,Swiggy,1,1,2022
2,Chowmin,150,A,India,Swiggy,7,1,2023
3,sandwich,120,A,India,Restaurant,10,1,2023
3,sandwich,120,A,India,Swiggy,11,1,2022
3,sandwich,120,A,India,Restaurant,11,1,2023
2,Chowmin,150,B,India,Swiggy,1,2,2022
2,Chowmin,150,B,India,Swiggy,2,1,2023
1,PIZZA,100,B,India,Restaurant,4,1,2023
1,PIZZA,100,B,India,Swiggy,11,2,2023


In [0]:
# Create new column type_car
    # >= 150 : Expensive
    # 100 -  150 : Standard
    # < 100: Normal
fact_sales = fact_sales.withColumn("type_car", F.when(F.col("price") >= 150, "Expensive") \
                                                .when((F.col("price") < 150) & (F.col("price") >= 100), "Standard")
                                                .otherwise("Normal"))

# Display df
display(fact_sales)

product_id,product_name,price,customer_name,country,source_type,day,month,year,type_car
1,PIZZA,100,A,India,Swiggy,1,1,2023,Standard
2,Chowmin,150,A,India,Swiggy,1,1,2022,Expensive
2,Chowmin,150,A,India,Swiggy,7,1,2023,Expensive
3,sandwich,120,A,India,Restaurant,10,1,2023,Standard
3,sandwich,120,A,India,Swiggy,11,1,2022,Standard
3,sandwich,120,A,India,Restaurant,11,1,2023,Standard
2,Chowmin,150,B,India,Swiggy,1,2,2022,Expensive
2,Chowmin,150,B,India,Swiggy,2,1,2023,Expensive
1,PIZZA,100,B,India,Restaurant,4,1,2023,Standard
1,PIZZA,100,B,India,Swiggy,11,2,2023,Standard


In [0]:
# Count year sales for explore more
year_values_count = fact_sales.groupBy("year").count()

# Display
display(year_values_count)

year,count
2023,84
2022,33


In [0]:
# Create new column status_car for new_car, old_car
    # >= 2023: new_car
    # other: old_car
fact_sales = fact_sales.withColumn("status_car", F.when(F.col("year") >= 2023, "New") \
                                                  .otherwise("Old"))

# Display df
display(fact_sales)

product_id,product_name,price,customer_name,country,source_type,day,month,year,type_car,status_car
1,PIZZA,100,A,India,Swiggy,1,1,2023,Standard,New
2,Chowmin,150,A,India,Swiggy,1,1,2022,Expensive,Old
2,Chowmin,150,A,India,Swiggy,7,1,2023,Expensive,New
3,sandwich,120,A,India,Restaurant,10,1,2023,Standard,New
3,sandwich,120,A,India,Swiggy,11,1,2022,Standard,Old
3,sandwich,120,A,India,Restaurant,11,1,2023,Standard,New
2,Chowmin,150,B,India,Swiggy,1,2,2022,Expensive,Old
2,Chowmin,150,B,India,Swiggy,2,1,2023,Expensive,New
1,PIZZA,100,B,India,Restaurant,4,1,2023,Standard,New
1,PIZZA,100,B,India,Swiggy,11,2,2023,Standard,New


## 10. Casting column to another type ##

In [0]:
fact_sales.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- source_type: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- type_car: string (nullable = false)
 |-- status_car: string (nullable = false)



In [0]:
fact_sales = fact_sales.withColumn("price", F.col("price").cast("int"))

display(fact_sales)

product_id,product_name,price,customer_name,country,source_type,day,month,year,type_car,status_car
1,PIZZA,100,A,India,Swiggy,1,1,2023,Standard,New
2,Chowmin,150,A,India,Swiggy,1,1,2022,Expensive,Old
2,Chowmin,150,A,India,Swiggy,7,1,2023,Expensive,New
3,sandwich,120,A,India,Restaurant,10,1,2023,Standard,New
3,sandwich,120,A,India,Swiggy,11,1,2022,Standard,Old
3,sandwich,120,A,India,Restaurant,11,1,2023,Standard,New
2,Chowmin,150,B,India,Swiggy,1,2,2022,Expensive,Old
2,Chowmin,150,B,India,Swiggy,2,1,2023,Expensive,New
1,PIZZA,100,B,India,Restaurant,4,1,2023,Standard,New
1,PIZZA,100,B,India,Swiggy,11,2,2023,Standard,New


In [0]:
fact_sales.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- source_type: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- type_car: string (nullable = false)
 |-- status_car: string (nullable = false)



## 11. Handling Null values ##

In [0]:
from pyspark.sql.functions import col

conditions = [f"{c} is NULL" for c in fact_sales.columns]
condition_str = " AND ".join(conditions)

count_null = fact_sales.filter(condition_str).count()
print("Count null values: ", count_null)

Count null values:  0


In [0]:
#define function to fill null values with column mean
    #def fillna_mean(df, include=set()): 
    #    means = df.agg(*(
    #        mean(x).alias(x) for x in df.columns if x in include
    #    ))
    #    return df.fillna(means.first().asDict())

#fill null values with mean in specific columns
    #df = fillna_mean(df, ['points', 'assists'])