In [1]:
from pyspark.sql.functions import col, sum, when, isnull, coalesce, collect_set, mean, to_date,trim, cast, date_format

In [2]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Data Preprocessing").config("spark.master", "local") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()


sc = spark.sparkContext

In [3]:
df = spark.read.csv(r"file:\\\D:\CDAC\Project\new_retail_data.csv",header=True,inferSchema=True)

In [4]:
df.printSchema()

root
 |-- Transaction_ID: double (nullable = true)
 |-- Customer_ID: double (nullable = true)
 |-- Name: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone: double (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: double (nullable = true)
 |-- Country: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Income: string (nullable = true)
 |-- Customer_Segment: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Year: double (nullable = true)
 |-- Month: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Total_Purchases: double (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Total_Amount: double (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Product_Brand: string (nullable = true)
 |-- Product_Type: string (nullable = true)
 |-- Feedback: string (nullable = tr

In [5]:
#df.show(5,truncate=False,vertical=True)

-RECORD 0---------------------------------------
 Transaction_ID   | 8691788.0                   
 Customer_ID      | 37249.0                     
 Name             | Michelle Harrington         
 Email            | Ebony39@gmail.com           
 Phone            | 1.414786801E9               
 Address          | 3959 Amanda Burgs           
 City             | Dortmund                    
 State            | Berlin                      
 Zipcode          | 77985.0                     
 Country          | Germany                     
 Age              | 21.0                        
 Gender           | Male                        
 Income           | Low                         
 Customer_Segment | Regular                     
 Date             | 9/18/2023                   
 Year             | 2023.0                      
 Month            | September                   
 Time             | 22:03:55                    
 Total_Purchases  | 3.0                         
 Amount           | 

In [6]:
#type(df)

pyspark.sql.dataframe.DataFrame

In [7]:
#print(df.columns)

['Transaction_ID', 'Customer_ID', 'Name', 'Email', 'Phone', 'Address', 'City', 'State', 'Zipcode', 'Country', 'Age', 'Gender', 'Income', 'Customer_Segment', 'Date', 'Year', 'Month', 'Time', 'Total_Purchases', 'Amount', 'Total_Amount', 'Product_Category', 'Product_Brand', 'Product_Type', 'Feedback', 'Shipping_Method', 'Payment_Method', 'Order_Status', 'Ratings', 'products']


- Drop columns - ['Transaction_ID','Name', 'Email', 'Phone','Address','Zipcode','Year', 'Month','Time','Order_Status']
- Handle Null values using mean - ['Age','Ratings']
- Handle Null values by mapping - ['Product_Category', 'Product_Brand','City', 'State', 'Country']
- Handle Null values by calculation - ['Total_Purchases', 'Amount', 'Total_Amount']
- Handle Null values using mode - ['Gender','Income','Customer_Segment','Feedback', 'Shipping_Method', 'Payment_Method']

In [8]:
# Checking null values
#null_count = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
#null_count.show(vertical=True)

-RECORD 0---------------
 Transaction_ID   | 333 
 Customer_ID      | 308 
 Name             | 382 
 Email            | 347 
 Phone            | 362 
 Address          | 315 
 City             | 248 
 State            | 281 
 Zipcode          | 340 
 Country          | 271 
 Age              | 173 
 Gender           | 317 
 Income           | 290 
 Customer_Segment | 215 
 Date             | 359 
 Year             | 350 
 Month            | 273 
 Time             | 350 
 Total_Purchases  | 361 
 Amount           | 357 
 Total_Amount     | 350 
 Product_Category | 283 
 Product_Brand    | 281 
 Product_Type     | 0   
 Feedback         | 184 
 Shipping_Method  | 337 
 Payment_Method   | 297 
 Order_Status     | 235 
 Ratings          | 184 
 products         | 0   



#### Drop columns - ['Transaction_ID','Name', 'Email', 'Phone','Address','Zipcode','Year', 'Month','Time','Order_Status']

In [5]:
# Drop Columns which are not required  
df_req = df.drop('Transaction_ID','Name', 'Email', 'Phone','Address','Zipcode','Year', 'Month','Time','Order_Status')

#### Handle Null values by mapping - ['City', 'State', 'Country','Product_Category', 'Product_Brand']

In [6]:
# Create a mapping of State to City where City is not null
City_mapping = df_req.filter(col('City').isNotNull()) \
    .select('State', 'City') \
    .distinct() \
    .groupBy('State') \
    .agg(collect_set('City').alias('City_Set'))

# Extract a single City from the set for each State
City_mapping = City_mapping.withColumn('City_Mapped', col('City_Set')[0])

# Join the original DataFrame with the mapping to fill null values
df_req = df_req.join(City_mapping, on='State', how='left') \
    .withColumn('City', coalesce(col('City'), col('City_Mapped'))) \
    .drop('City_Set', 'City_Mapped')

# Show the updated DataFrame
#df_req.select(sum(col('City').isNull().cast('int')).alias("City")).show()


In [11]:
#City_mapping.show(5)

+---------+--------------------+-------------+
|    State|            City_Set|  City_Mapped|
+---------+--------------------+-------------+
|     Utah|[Oklahoma City, M...|Oklahoma City|
|   Hawaii|[Oklahoma City, M...|Oklahoma City|
|Minnesota|[Oklahoma City, M...|Oklahoma City|
|     Ohio|[Oklahoma City, M...|Oklahoma City|
| Arkansas|[Oklahoma City, M...|Oklahoma City|
+---------+--------------------+-------------+
only showing top 5 rows



In [7]:
# Create a mapping of State to Country where Country is not null
Country_mapping = df_req.filter(col('Country').isNotNull()) \
    .select('State', 'Country') \
    .distinct() \
    .groupBy('State') \
    .agg(collect_set('Country').alias('Country_Set'))

# Extract a single Country from the set for each State
Country_mapping = Country_mapping.withColumn('Country_Mapped', col('Country_Set')[0])

# Join the original DataFrame with the mapping to fill null values
df_req = df_req.join(Country_mapping, on='State', how='left') \
    .withColumn('Country', coalesce(col('Country'), col('Country_Mapped'))) \
    .drop('Country_Set', 'Country_Mapped')

# Show the updated DataFrame
#df_req.select(sum(col('Country').isNull().cast('int')).alias("Country")).show()


In [13]:
#Country_mapping.show(5)

+---------+-----------+--------------+
|    State|Country_Set|Country_Mapped|
+---------+-----------+--------------+
|     Utah|      [USA]|           USA|
|   Hawaii|      [USA]|           USA|
|Minnesota|      [USA]|           USA|
|     Ohio|      [USA]|           USA|
| Arkansas|      [USA]|           USA|
+---------+-----------+--------------+
only showing top 5 rows



In [8]:
# Create a mapping of Country to State where State is not null
State_mapping = df_req.filter(col('State').isNotNull()) \
    .select('Country', 'State') \
    .distinct() \
    .groupBy('Country') \
    .agg(collect_set('State').alias('State_Set'))

# Extract a single State from the set for each Product
State_mapping = State_mapping.withColumn('State_Mapped', col('State_Set')[0])

# Join the original DataFrame with the mapping to fill null values
df_req = df_req.join(State_mapping, on='Country', how='left') \
    .withColumn('State', coalesce(col('State'), col('State_Mapped'))) \
    .drop('State_Set', 'State_Mapped')

# Show the updated DataFrame
#df_req.select(sum(col('State').isNull().cast('int')).alias("State")).show()


In [15]:
#State_mapping.show()

+---------+--------------------+---------------+
|  Country|           State_Set|   State_Mapped|
+---------+--------------------+---------------+
|  Germany|            [Berlin]|         Berlin|
|      USA|[Michigan, Vermon...|       Michigan|
|       UK|           [England]|        England|
|   Canada|           [Ontario]|        Ontario|
|Australia|   [New South Wales]|New South Wales|
+---------+--------------------+---------------+



In [9]:
# Create a mapping of Product to Product_Brand where Product_Brand is not null
product_brand_mapping = df_req.filter(col('Product_Brand').isNotNull()) \
    .select('products', 'Product_Brand') \
    .distinct() \
    .groupBy('products') \
    .agg(collect_set('Product_Brand').alias('Product_Brand_Set'))

# Extract a single Product_Brand from the set for each Product
product_brand_mapping = product_brand_mapping.withColumn('Product_Brand_Mapped', col('Product_Brand_Set')[0])

# Join the original DataFrame with the mapping to fill null values
df_req = df_req.join(product_brand_mapping, on='products', how='left') \
    .withColumn('Product_Brand', coalesce(col('Product_Brand'), col('Product_Brand_Mapped'))) \
    .drop('Product_Brand_Set', 'Product_Brand_Mapped')

# Show the updated DataFrame
#df_req.select(sum(col('Product_Brand').isNull().cast('int')).alias("Product_Brand")).show()


In [17]:
#product_brand_mapping.show(318,truncate=False)

+----------------------------+--------------------------------------------+--------------------+
|products                    |Product_Brand_Set                           |Product_Brand_Mapped|
+----------------------------+--------------------------------------------+--------------------+
|4K TV                       |[Sony, Samsung]                             |Sony                |
|A-line dress                |[Zara]                                      |Zara                |
|Acer Iconia Tab             |[Apple, Samsung]                            |Apple               |
|Acer Swift                  |[Apple]                                     |Apple               |
|Action                      |[HarperCollins]                             |HarperCollins       |
|Adventure                   |[Penguin Books, Random House, HarperCollins]|Penguin Books       |
|Affogato                    |[Nestle]                                    |Nestle              |
|Air conditioner             |

In [10]:
# Create a mapping of Product to Product_Category where Product_Category is not null
product_category_mapping = df_req.filter(col('Product_Category').isNotNull()) \
    .select('products', 'Product_Category') \
    .distinct() \
    .groupBy('products') \
    .agg(collect_set('Product_Category').alias('Product_Category_Set'))

# Extract a single Product_Brand from the set for each Product
product_category_mapping = product_category_mapping.withColumn('Product_Category_Mapped', col('Product_Category_Set')[0])

# Join the original DataFrame with the mapping to fill null values
df_req = df_req.join(product_category_mapping, on='products', how='left') \
    .withColumn('Product_Category', coalesce(col('Product_Category'), col('Product_Category_Mapped'))) \
    .drop('Product_Category_Set', 'Product_Category_Mapped')

# Show count  of null values from updated DataFrame
#df_req.select(sum(col('Product_Category').isNull().cast('int')).alias("Product_Category")).show()


In [19]:
#product_category_mapping.show(318)

+--------------------+--------------------+-----------------------+
|            products|Product_Category_Set|Product_Category_Mapped|
+--------------------+--------------------+-----------------------+
|               4K TV|       [Electronics]|            Electronics|
|        A-line dress|          [Clothing]|               Clothing|
|     Acer Iconia Tab|       [Electronics]|            Electronics|
|          Acer Swift|       [Electronics]|            Electronics|
|              Action|             [Books]|                  Books|
|           Adventure|             [Books]|                  Books|
|            Affogato|           [Grocery]|                Grocery|
|     Air conditioner|       [Electronics]|            Electronics|
|      Alkaline water|           [Grocery]|                Grocery|
|  Amazon Fire Tablet|       [Electronics]|            Electronics|
|           Americano|           [Grocery]|                Grocery|
|          Android TV|       [Electronics]|     

#### Handle Null values using mean - ['Age','Ratings']

In [11]:
# Filling null values by mean
mean_Age = df_req.select(mean("Age")).collect()[0][0]
mean_Ratings = df_req.select(mean("Ratings")).collect()[0][0] #3.166
df_req = df_req.fillna({'Ratings': round(mean_Ratings,0),
                       "Age":round(mean_Age)
                      })

#### Handle Null values using mode - ['Gender','Income','Customer_Segment','Feedback', 'Shipping_Method', 'Payment_Method']

In [12]:
mode_df_Income = df_req.groupBy("Income").count().orderBy("count",ascending=0).first()
mode_value_Income = mode_df_Income["Income"] #Medium

mode_df_Gender = df_req.groupBy("Gender").count().orderBy("count",ascending=0).first()
mode_value_Gender = mode_df_Gender["Gender"] # Male

mode_df_Customer_Segment = df_req.groupBy("Customer_Segment").count().orderBy("count",ascending=0).first()
mode_value_Customer_Segment = mode_df_Customer_Segment["Customer_Segment"]

mode_df_Feedback = df_req.groupBy("Feedback").count().orderBy("count",ascending=0).first()
mode_value_Feedback = mode_df_Feedback["Feedback"] # Good

mode_df_Payment_Method = df_req.groupBy("Payment_Method").count().orderBy("count",ascending=0).first()
mode_value_Payment_Method = mode_df_Payment_Method["Payment_Method"]

mode_df_Shipping_Method = df_req.groupBy("Shipping_Method").count().orderBy("count",ascending=0).first()
mode_value_Shipping_Method = mode_df_Shipping_Method["Shipping_Method"]

In [13]:
# Filling null value with mode value
df_req = df_req.fillna({
                        "Income": mode_value_Income,
                        'Feedback': mode_value_Feedback,
                        "Payment_Method":mode_value_Payment_Method,
                       "Shipping_Method":mode_value_Shipping_Method,
                       "Customer_Segment":mode_value_Customer_Segment,
                       "Gender":mode_value_Gender
                      })

#### Handle Null values by calculation - ['Total_Purchases', 'Amount', 'Total_Amount']

In [14]:
# Filling null values in Total_Purchases 
df_req = df_req.withColumn(
    'Total_Purchases',
    when(
        col('Total_Purchases').isNull(),
        col('Total_Amount') / col('Amount')
    ).otherwise(col('Total_Purchases'))
)

In [15]:
# Filling null values in Amount 
df_req = df_req.withColumn(
    'Amount',
    when(
        col('Amount').isNull(),
        col('Total_Amount') / col('Total_Purchases')
    ).otherwise(col('Amount'))
)

In [16]:
# Filling null values in Total_Amount 
df_req = df_req.withColumn(
    'Total_Amount',
    when(
        col('Total_Amount').isNull(),
        col('Total_Purchases') * col('Amount')
    ).otherwise(col('Total_Amount'))
)

In [17]:
# Checking null values
null_count1 = df_req.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_req.columns])
#null_count1.show(vertical=True)

In [18]:
df_req=df_req.dropna()

In [19]:
# Checking null values
null_count2 = df_req.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_req.columns])
null_count2.show(vertical=True)

-RECORD 0---------------
 products         | 0   
 Country          | 0   
 State            | 0   
 Customer_ID      | 0   
 City             | 0   
 Age              | 0   
 Gender           | 0   
 Income           | 0   
 Customer_Segment | 0   
 Date             | 0   
 Total_Purchases  | 0   
 Amount           | 0   
 Total_Amount     | 0   
 Product_Category | 0   
 Product_Brand    | 0   
 Product_Type     | 0   
 Feedback         | 0   
 Shipping_Method  | 0   
 Payment_Method   | 0   
 Ratings          | 0   



In [20]:
print(df_req.columns)

['products', 'Country', 'State', 'Customer_ID', 'City', 'Age', 'Gender', 'Income', 'Customer_Segment', 'Date', 'Total_Purchases', 'Amount', 'Total_Amount', 'Product_Category', 'Product_Brand', 'Product_Type', 'Feedback', 'Shipping_Method', 'Payment_Method', 'Ratings']


In [21]:
duplicates_df = df_req.groupBy('products', 'Country', 'State', 'Customer_ID', 'City', 'Age', 'Gender', 'Income', 'Customer_Segment', 'Date', 'Total_Purchases', 'Amount', 'Total_Amount', 'Product_Category', 'Product_Brand', 'Product_Type', 'Feedback', 'Shipping_Method', 'Payment_Method', 'Ratings').count().filter(col("count") > 1)

# Show the resulting DataFrame with duplicates
#duplicates_df.show()

- We found that there are some duplicate rows. we have to remove duplicates

In [22]:
# Remove duplicates based on all columns
df_no_duplicates = df_req.dropDuplicates()

# Show the resulting DataFrame
#df_no_duplicates.show()

In [32]:
#df_no_duplicates.count()

301336

In [23]:
df=df_no_duplicates

In [24]:
# Replace 'USA' with 'United States of America' and 'UK' with 'United Kingdom' in the 'Country' column
df = df.withColumn("Country", 
                   when(col("Country") == "USA", "United States of America")
                   .when(col("Country") == "UK", "United Kingdom")
                   .otherwise(col("Country")))

In [25]:
df = df.withColumnRenamed("Total_Purchases", "Quantity")

In [26]:
quantiles = df.approxQuantile("Age", [0.25, 0.75], 0.01)
Q1 = quantiles[0]
Q3 = quantiles[1]

# Calculate IQR
IQR = Q3 - Q1

# Determine bounds for outliers
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

print(f"Q1: {Q1}, Q3: {Q3}, IQR: {IQR}")
print(f"Lower Bound: {lower_bound}, Upper Bound: {upper_bound}")

# Identify outliers
outliers = df.filter((col("Age") < lower_bound) | (col("Age") > upper_bound))

# Show the outliers
#outliers.show()

Q1: 22.0, Q3: 46.0, IQR: 24.0
Lower Bound: -14.0, Upper Bound: 82.0


### After doing all Data Cleaning We have 20 Features and 301336 rows 

#### converting date column from in same format yyyy-mm-dd

In [27]:
df = df.withColumn("Date",trim("Date"))

In [28]:
# Convert the string date column to a date type using different formats
df = df.withColumn("Date",
                   coalesce(
                       to_date("Date", "MM/dd/yyyy"),
                       to_date("Date", "M/d/yyyy"),
                       to_date("Date", "MM/d/yyyy"),
                       to_date("Date", "M/dd/yyyy")
                   ))

# Format the date column to 'yyyy-MM-dd'
df = df.withColumn("Date", date_format("Date", "yyyy-MM-dd"))

# Show the updated DataFrame
#df.select("Date").show(5)

In [29]:
import pandas as pd

In [30]:
df_pd = df.toPandas()

In [31]:
type(df_pd)

pandas.core.frame.DataFrame

In [32]:
df_pd['Date'] = pd.to_datetime(df_pd['Date'])
print(df_pd['Date'].dtype)
df_pd["Date"]

datetime64[ns]


0        2023-09-18
1        2023-03-24
2        2023-10-04
3        2023-12-28
4        2023-06-06
            ...    
301331   2023-07-02
301332   2023-06-16
301333   2024-02-14
301334   2023-05-28
301335   2024-01-20
Name: Date, Length: 301336, dtype: datetime64[ns]

In [34]:
df_pd.to_excel("output.xlsx", index=False)

####  uploding dataframe on hive in table format


In [None]:
#spark.sql("CREATE DATABASE IF NOT EXISTS BigDataProject")
#df.write.mode("overwrite").saveAsTable("BigDataProject.retailTransitionTable")

In [None]:
#spark.sql("SELECT * FROM BigDataProject.retailTransitionTable").show()

In [63]:
#df.groupby("Date").agg(sum("Total_Amount").alias("Total_Amount_sum")).count()

366

In [35]:
df_t=df_pd

In [36]:
df_t.head()

Unnamed: 0,products,Country,State,Customer_ID,City,Age,Gender,Income,Customer_Segment,Date,Quantity,Amount,Total_Amount,Product_Category,Product_Brand,Product_Type,Feedback,Shipping_Method,Payment_Method,Ratings
0,Cycling shorts,Germany,Berlin,37249.0,Dortmund,21.0,Male,Low,Regular,2023-09-18,3.0,108.028757,324.08627,Clothing,Nike,Shorts,Excellent,Same-Day,Debit Card,5.0
1,Dress shirt,Germany,Berlin,26603.0,Munich,29.0,Male,Medium,Premium,2023-03-24,1.0,46.58807,46.58807,Clothing,Zara,Shirt,Bad,Same-Day,Cash,1.0
2,Candles,Germany,Berlin,31878.0,Cologne,25.0,Male,Medium,New,2023-10-04,10.0,397.611229,3976.112295,Home Decor,Home Depot,Decorations,Excellent,Standard,Cash,4.0
3,Drill,Germany,Berlin,78484.0,Cologne,19.0,Female,Medium,New,2023-12-28,9.0,274.984254,2474.858288,Home Decor,Home Depot,Tools,Good,Express,PayPal,4.0
4,Flavored water,Germany,Berlin,66989.0,Leipzig,19.0,Male,Medium,New,2023-06-06,2.0,85.717621,171.435242,Grocery,Coca-Cola,Water,Excellent,Standard,Debit Card,5.0
