# **Data Manipulation using PySpark**

**Importing Libraries:**

In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.context import SparkContext
from functools import reduce
from pyspark.sql.functions import col, regexp_replace, when, avg
from pyspark.sql.types import StringType,BooleanType,DateType,FloatType, IntegerType
import os

**Create PySpark Session:**

SparkSession is a unified entry point for interacting with Spark. It provides the ability to configure and manage the Spark application and enables access to Spark's APIs. Without it, PySpark cannot initialize and run.

In [2]:
# Create SparkSession
spark = SparkSession.builder.appName("data_product").getOrCreate()

**Read consumption data from separated files:**

In [3]:
# Read datafrom local machine

appliances_df   = spark.read.csv(path=r"C:\Users\user\OneDrive\RFA _Personal Files\02. COURSE\Purwadhika_Data Engineering\Purwadhika_VS\meet46\data_original\All Appliances.csv", header=True, inferSchema=True)
vehicle_df      = spark.read.csv(path=r"C:\Users\user\OneDrive\RFA _Personal Files\02. COURSE\Purwadhika_Data Engineering\Purwadhika_VS\meet46\data_original\All Car and Motorbike Products.csv",header=True,inferSchema=True)
electronics_df  = spark.read.csv(path=r"C:\Users\user\OneDrive\RFA _Personal Files\02. COURSE\Purwadhika_Data Engineering\Purwadhika_VS\meet46\data_original\All Electronics.csv",header=True,inferSchema=True)
fitness_df      = spark.read.csv(path=r"C:\Users\user\OneDrive\RFA _Personal Files\02. COURSE\Purwadhika_Data Engineering\Purwadhika_VS\meet46\data_original\All Exercise and Fitness.csv",header=True,inferSchema=True)
groceries_df    = spark.read.csv(path=r"C:\Users\user\OneDrive\RFA _Personal Files\02. COURSE\Purwadhika_Data Engineering\Purwadhika_VS\meet46\data_original\All Grocery and Gourmet Foods.csv",header=True,inferSchema=True)

**Union data into one PySpark Dataframe:**

In [4]:
# Union data into one df
all_data_dfs_list = [appliances_df, vehicle_df, electronics_df, fitness_df, groceries_df]
all_data_dfs = reduce(DataFrame.unionAll, all_data_dfs_list)

**Backing up PySpark Dataframe:**

In [5]:
# Backing up dataframe
df_product = all_data_dfs.alias('df_product')
df_product.show()

+--------------------+-------------+--------------+--------------------+--------------------+-------+-------------+--------------+------------+
|                name|main_category|  sub_category|               image|                link|ratings|no_of_ratings|discount_price|actual_price|
+--------------------+-------------+--------------+--------------------+--------------------+-------+-------------+--------------+------------+
|Pigeon by Stovekr...|   appliances|All Appliances|https://m.media-a...|https://www.amazo...|    3.9|      128,941|          ₹599|      ₹1,245|
|Pigeon Polypropyl...|   appliances|All Appliances|https://m.media-a...|https://www.amazo...|    4.1|      274,505|          ₹199|        ₹545|
|Glun Multipurpose...|   appliances|All Appliances|https://m.media-a...|https://www.amazo...|    3.8|          365|          ₹199|        ₹899|
|beatXP Kitchen Sc...|   appliances|All Appliances|https://m.media-a...|https://www.amazo...|    3.7|        3,290|          ₹299|      

**Checkpoint 1: Checking up datatypes:**

In [6]:
for column in df_product.dtypes:
    print(column[0]+" , "+column[1])

name , string
main_category , string
sub_category , string
image , string
link , string
ratings , string
no_of_ratings , string
discount_price , string
actual_price , string


**Data Transforming: removing currency symbols, and convert datatypes:**

In [7]:
#4. handling discount_price and actual_price

df_product = df_product.withColumn("discount_price",regexp_replace(col("discount_price"), r'[^\d.]', "").cast("float"))

df_product = df_product.withColumn("actual_price",regexp_replace(col("actual_price"), r'[^\d.]', "").cast("float"))

df_product = df_product.withColumn(
    'no_of_ratings',
    when(col('no_of_ratings').rlike('^[0-9]+$'), col('no_of_ratings').cast('int')).otherwise(None))

df_product = df_product.withColumn("ratings", regexp_replace(col("ratings"), r"[^\d.]", "").cast("float"))


**Checkpoint 2: Checking up datatypes:**

In [8]:
# 2nd checking data type
for column in df_product.dtypes:
    print(column[0]+" , "+column[1])

name , string
main_category , string
sub_category , string
image , string
link , string
ratings , float
no_of_ratings , int
discount_price , float
actual_price , float


**Create new column named discount_amount:**

In [9]:
# Create new column (discrepancy between actual_price and discount_price)
df_product = df_product.withColumn("discount_amount", (col("actual_price")-col("discount_price")).cast("float"))

**Drop duplicated data within the Dataframe:**

In [10]:
#6. drop duplicated data
df_product = df_product.drop_duplicates()

**Sorting the data:**

In [11]:
#7. sorting data
df_product = df_product.sort(df_product["no_of_ratings"].desc())

**Filtering data based on ratings:**

In [12]:
#8. filter ratings

df_product = df_product.filter(df_product["ratings"] < 5.0)

**Finding highest rated by main_category and sub_category:**

In [13]:
#9. Finding highest rated product based on main_category and sub_category

average_ratings = df_product.groupBy('main_category', 'sub_category') \
    .agg(avg('ratings').alias('average_ratings'))

# Find the category with the highest average ratings
highest_average = average_ratings.orderBy(col('average_ratings').desc())

# Show the results
highest_average.show()


+--------------------+--------------------+------------------+
|       main_category|        sub_category|   average_ratings|
+--------------------+--------------------+------------------+
|grocery & gourmet...|All Grocery & Gou...| 4.162970950195428|
| tv, audio & cameras|     All Electronics| 4.068329122765017|
|          appliances|      All Appliances| 3.878102694300688|
|    sports & fitness|All Exercise & Fi...|3.8478478456999325|
|     car & motorbike|All Car & Motorbi...| 3.796893872095656|
+--------------------+--------------------+------------------+



**Finding highest discount_amount:**

In [14]:
#10. Finding product that has the highest discount_amount

highest_discount = df_product.orderBy(col('discount_amount').desc()).limit(5)

highest_discount.show()

+--------------------+-------------------+---------------+--------------------+--------------------+-------+-------------+--------------+------------+---------------+
|                name|      main_category|   sub_category|               image|                link|ratings|no_of_ratings|discount_price|actual_price|discount_amount|
+--------------------+-------------------+---------------+--------------------+--------------------+-------+-------------+--------------+------------+---------------+
|ROCKWELL RVC550B ...|         appliances| All Appliances|https://m.media-a...|https://www.amazo...|    4.8|            9|       20500.0|    110489.0|        89989.0|
|ROCKWELL RVC550B ...|         appliances| All Appliances|https://m.media-a...|https://www.amazo...|    4.8|            9|       20500.0|    110489.0|        89989.0|
|(Renewed) Lenovo ...|tv, audio & cameras|All Electronics|https://m.media-a...|https://www.amazo...|    3.4|           44|       17399.0|     94000.0|        76601.0