In [1]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [2]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

# Command used for putting raw data in HDFS
## hdfs dfs -put shared/Walmart-Retail-Dataset.csv /user/talentum/DBDA_Project/.

# Loading raw data from HDFS

In [3]:
df = spark.read.format('csv').options(Header=True, inferSchema=True).load('DBDA_Project/Walmart-Retail-Dataset.csv')

In [4]:
df.count()

1041860

In [5]:
print(df.columns)

['city', 'customer_age', 'customer_name', 'customer_segment', 'discount', 'order_date', 'order_id', 'order_priority', 'order_quantity', 'product_base_margin', 'product_category', 'product_container', 'product_name', 'product_sub_category', 'profit', 'region', 'sales', 'ship_date', 'ship_mode', 'shipping_cost', 'state', 'unit_price', 'zip_code', 'adjusted_col', '_c24', '_c25', '_c26', '_c27', '_c28']


# Dropping irrelevant columns -> df1

In [6]:
df1=df.drop('_c24', '_c25', '_c26', '_c27', '_c28')

In [7]:
print(df1.columns)

['city', 'customer_age', 'customer_name', 'customer_segment', 'discount', 'order_date', 'order_id', 'order_priority', 'order_quantity', 'product_base_margin', 'product_category', 'product_container', 'product_name', 'product_sub_category', 'profit', 'region', 'sales', 'ship_date', 'ship_mode', 'shipping_cost', 'state', 'unit_price', 'zip_code', 'adjusted_col']


In [8]:
df1.count()

1041860

In [9]:
print(df1.select("product_sub_category").distinct().show())

+--------------------+
|product_sub_category|
+--------------------+
|           Envelopes|
|        Rubber Bands|
|      Delivery Truck|
|  Chairs & Chairmats|
|                  \N|
|                null|
|     Copiers and Fax|
|           Bookcases|
| Pens & Art Supplies|
|              Labels|
|         71.97487538|
|Computer Peripherals|
|               Paper|
|          Appliances|
|         Express Air|
|              Tables|
|            Scissors|
|  Storage & Organiz"|
|  Office Furnishings|
|     Office Machines|
+--------------------+
only showing top 20 rows

None


# Deleting '\N' values -> df2

In [10]:
from pyspark.sql.functions import col
# Define the value to be deleted
value_to_delete = "\\N"

df2 = df1.filter(
    ~(
        (col("city") == value_to_delete) |
        (col("customer_age") == value_to_delete) |
        (col("customer_name") == value_to_delete)|
        (col("customer_segment") == value_to_delete)|
        (col("discount") == value_to_delete)|
        (col("order_date") == value_to_delete)|
        (col("order_id") == value_to_delete)|
        (col("order_priority") == value_to_delete)|
        (col("order_quantity") == value_to_delete)|
        (col("product_base_margin") == value_to_delete)|
        (col("product_category") == value_to_delete)|
        (col("product_container") == value_to_delete)|
        (col("product_name") == value_to_delete)|
        (col("product_sub_category") == value_to_delete)|
        (col("profit") == value_to_delete)|
        (col("region") == value_to_delete)|
        (col("sales") == value_to_delete)|
        (col("ship_date") == value_to_delete)|
        (col("shipping_cost") == value_to_delete)|
        (col("state") == value_to_delete)|
        (col("unit_price") == value_to_delete)|
        (col("zip_code") == value_to_delete))
)


In [11]:
df2.count()

1020437

In [12]:
unique_names = df2.select("product_sub_category").distinct()

In [13]:
unique_names.show()

+--------------------+
|product_sub_category|
+--------------------+
|           Envelopes|
|        Rubber Bands|
|  Chairs & Chairmats|
|     Copiers and Fax|
|           Bookcases|
| Pens & Art Supplies|
|              Labels|
|Computer Peripherals|
|               Paper|
|          Appliances|
|              Tables|
|            Scissors|
|  Office Furnishings|
|     Office Machines|
|Storage & Organiz...|
|Telephones and Co...|
|Binders and Binde...|
+--------------------+



# Shifting Columns

# 1. Concating columns to get Scissors and Rulers together ->df3

In [14]:
from pyspark.sql.functions import when, concat, col, lit
df3 = df2.withColumn(
    "product_sub_category",
    when(col("product_sub_category") == "Scissors", concat(col("product_sub_category"), lit(","), col("profit"))).otherwise(col("product_sub_category")))

# 2. Shifting Columns ->df4

In [15]:
from pyspark.sql.functions import when, concat, col, lit

In [16]:
condition2 = col("product_sub_category") == "Scissors, Rulers and Trimmers"


df4 = df3.withColumn("profit",
    when(condition2,  col("region")).otherwise(col("profit"))).withColumn("region",
    when(condition2,  col("sales")).otherwise(col("region"))).withColumn("sales",
    when(condition2,  col("ship_date")).otherwise(col("sales"))).withColumn("ship_date",
    when(condition2,  col("ship_mode")).otherwise(col("ship_date"))).withColumn("ship_mode",
    when(condition2,  col("shipping_cost")).otherwise(col("ship_mode"))).withColumn("shipping_cost",
    when(condition2,  col("state")).otherwise(col("shipping_cost"))).withColumn("state",
    when(condition2,  col("unit_price")).otherwise(col("state"))).withColumn("unit_price",
    when(condition2,  col("zip_code")).otherwise(col("unit_price"))).withColumn("zip_code",
    when(condition2,  col("adjusted_col")).otherwise(col("zip_code")))
      
                                       
                                      

# Changing Newport values df->5

In [17]:
print(df4.select("region").distinct().collect())

[Row(region='South'), Row(region='Central'), Row(region='East'), Row(region='40.2'), Row(region='West')]


In [18]:
print(df4.select("state").distinct().collect())

[Row(state='Utah'), Row(state='Minnesota'), Row(state='Ohio'), Row(state='Oregon'), Row(state='Arkansas'), Row(state='Texas'), Row(state='North Dakota'), Row(state='Pennsylvania'), Row(state='Connecticut'), Row(state='Nebraska'), Row(state='Vermont'), Row(state='Nevada'), Row(state='Washington'), Row(state='Illinois'), Row(state='Oklahoma'), Row(state='MO'), Row(state='New Mexico'), Row(state='West Virginia'), Row(state='Rhode Island'), Row(state='Georgia'), Row(state='Montana'), Row(state='Michigan'), Row(state='Virginia'), Row(state='North Carolina'), Row(state='Wyoming'), Row(state='Kansas'), Row(state='New Jersey'), Row(state='Maryland'), Row(state='Alabama'), Row(state='Arizona'), Row(state='Iowa'), Row(state='Kentucky'), Row(state='Louisiana'), Row(state='1.22'), Row(state='Mississippi'), Row(state='Tennessee'), Row(state='New Hampshire'), Row(state='MA'), Row(state='Florida'), Row(state='Indiana'), Row(state='Idaho'), Row(state='South Carolina'), Row(state='South Dakota'), Row(s

In [19]:
df5 = df4.withColumn("region", when(col("city") == "Newport", "South").otherwise(col("region"))).withColumn("state", when(col("city") == "Newport", "Rhode Island").otherwise(col("state")))

In [20]:
print(df5.select("region").distinct().collect())

[Row(region='South'), Row(region='Central'), Row(region='East'), Row(region='West')]


In [21]:
print(df4.select("region").distinct().collect())

[Row(region='South'), Row(region='Central'), Row(region='East'), Row(region='40.2'), Row(region='West')]


In [23]:
print(df5.select("state").distinct().collect())

[Row(state='Utah'), Row(state='Minnesota'), Row(state='Ohio'), Row(state='Oregon'), Row(state='Arkansas'), Row(state='Texas'), Row(state='North Dakota'), Row(state='Pennsylvania'), Row(state='Connecticut'), Row(state='Nebraska'), Row(state='Vermont'), Row(state='Nevada'), Row(state='Washington'), Row(state='Illinois'), Row(state='Oklahoma'), Row(state='MO'), Row(state='New Mexico'), Row(state='West Virginia'), Row(state='Rhode Island'), Row(state='Georgia'), Row(state='Montana'), Row(state='Michigan'), Row(state='Virginia'), Row(state='North Carolina'), Row(state='Wyoming'), Row(state='Kansas'), Row(state='New Jersey'), Row(state='Maryland'), Row(state='Alabama'), Row(state='Arizona'), Row(state='Iowa'), Row(state='Kentucky'), Row(state='Louisiana'), Row(state='Mississippi'), Row(state='Tennessee'), Row(state='New Hampshire'), Row(state='MA'), Row(state='Florida'), Row(state='Indiana'), Row(state='Idaho'), Row(state='South Carolina'), Row(state='South Dakota'), Row(state='California'),

In [24]:
print(df4.select("state").distinct().count())

48


In [25]:
print(df5.select("state").distinct().count())

47


# Null Values treatment

# 1. Finding Null values in each column

In [26]:
from pyspark.sql.functions import col, sum

# Declaring list to hold the null counts for each column
null_counts = []

# Iterating over each column in the DataFrame
for column in df5.columns:
    null_count = df5.select(sum(col(column).isNull().cast("int")).alias(column)).collect()[0][0]
    null_counts.append((column, null_count))

# Printing the null counts for each column
for column, null_count in null_counts:
    print(f"Column '{column}' has {null_count} null values")

Column 'city' has 0 null values
Column 'customer_age' has 0 null values
Column 'customer_name' has 0 null values
Column 'customer_segment' has 0 null values
Column 'discount' has 0 null values
Column 'order_date' has 0 null values
Column 'order_id' has 0 null values
Column 'order_priority' has 0 null values
Column 'order_quantity' has 0 null values
Column 'product_base_margin' has 0 null values
Column 'product_category' has 0 null values
Column 'product_container' has 0 null values
Column 'product_name' has 0 null values
Column 'product_sub_category' has 0 null values
Column 'profit' has 0 null values
Column 'region' has 0 null values
Column 'sales' has 0 null values
Column 'ship_date' has 0 null values
Column 'ship_mode' has 3378 null values
Column 'shipping_cost' has 0 null values
Column 'state' has 0 null values
Column 'unit_price' has 0 null values
Column 'zip_code' has 0 null values
Column 'adjusted_col' has 1002853 null values


In [27]:
# Grouping by the column with null values and counting the occurrences
unique_value_counts = df5.groupBy("ship_mode").count()

# Showing the result
unique_value_counts.show()

+--------------+------+
|     ship_mode| count|
+--------------+------+
|Delivery Truck|336845|
|   Regular Air|339630|
|          null|  3378|
|   Express Air|340584|
+--------------+------+



# 1.First Dropping 'adjusted_col' Column->df6

In [28]:
df6 = df5.drop("adjusted_col")

In [29]:
print(df6.columns)

['city', 'customer_age', 'customer_name', 'customer_segment', 'discount', 'order_date', 'order_id', 'order_priority', 'order_quantity', 'product_base_margin', 'product_category', 'product_container', 'product_name', 'product_sub_category', 'profit', 'region', 'sales', 'ship_date', 'ship_mode', 'shipping_cost', 'state', 'unit_price', 'zip_code']


In [30]:
df6.describe()

DataFrame[summary: string, city: string, customer_age: string, customer_name: string, customer_segment: string, discount: string, order_date: string, order_id: string, order_priority: string, order_quantity: string, product_base_margin: string, product_category: string, product_container: string, product_name: string, product_sub_category: string, profit: string, region: string, sales: string, ship_date: string, ship_mode: string, shipping_cost: string, state: string, unit_price: string, zip_code: string]

# 2.Dropping Null values->df7

In [31]:
df6.count()

1020437

In [32]:
df7 = df6.dropna()

In [33]:
df7.count()

1017059

In [34]:
null_count6 = df6.filter(col("ship_mode").isNull()).count()
null_count7 = df7.filter(col("ship_mode").isNull()).count()

print(null_count6,null_count7)

3378 0


# Changing datatype of order_date ->df8 and removing null values formed ->df9

In [35]:
from pyspark.sql.functions import to_date, year, month, dayofmonth

#Converting string to date type for order_date
df8 = df7.withColumn("order_date", to_date(df["order_date"], "dd-MM-yyyy"))


In [36]:
null_count8 = df8.filter(col("order_date").isNull()).count()

print(null_count8)

1938


In [37]:
df9=df8.dropna()

In [38]:
null_count9 = df9.filter(col("order_date").isNull()).count()
print(null_count9)

0


# Removing rows having year 2023 from order_date for further use in model ->df10 -> df11 -> df12

In [39]:
df10 = df9.withColumn('year', year(df9['order_date']))
df11 = df10.filter(df10['year'] != 2023)
df12 = df11.drop('year')

In [40]:
df12.columns


['city',
 'customer_age',
 'customer_name',
 'customer_segment',
 'discount',
 'order_date',
 'order_id',
 'order_priority',
 'order_quantity',
 'product_base_margin',
 'product_category',
 'product_container',
 'product_name',
 'product_sub_category',
 'profit',
 'region',
 'sales',
 'ship_date',
 'ship_mode',
 'shipping_cost',
 'state',
 'unit_price',
 'zip_code']

# Checking for null values to confirm if file is ready for further work

In [41]:
null_counts1 = []
for column in df12.columns:
    null_count1 = df12.select(sum(col(column).isNull().cast("int")).alias(column)).collect()[0][0]
    null_counts1.append((column, null_count1))

# Printing the null counts for each column
for column, null_count1 in null_counts1:
    print(f"Column '{column}' has {null_count1} null values")

Column 'city' has 0 null values
Column 'customer_age' has 0 null values
Column 'customer_name' has 0 null values
Column 'customer_segment' has 0 null values
Column 'discount' has 0 null values
Column 'order_date' has 0 null values
Column 'order_id' has 0 null values
Column 'order_priority' has 0 null values
Column 'order_quantity' has 0 null values
Column 'product_base_margin' has 0 null values
Column 'product_category' has 0 null values
Column 'product_container' has 0 null values
Column 'product_name' has 0 null values
Column 'product_sub_category' has 0 null values
Column 'profit' has 0 null values
Column 'region' has 0 null values
Column 'sales' has 0 null values
Column 'ship_date' has 0 null values
Column 'ship_mode' has 0 null values
Column 'shipping_cost' has 0 null values
Column 'state' has 0 null values
Column 'unit_price' has 0 null values
Column 'zip_code' has 0 null values


# Saving final preprocessed dataframe to .csv file on hdfs.

In [42]:
output_path = "/user/talentum/DBDA_Project/Preprocessing/"

# Save the DataFrame to a CSV file
df12.coalesce(1).write.option("header", True).csv(output_path)