In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

spark1 = SparkSession.builder.appName("PracticeApp").getOrCreate()

print("Spark Session for Practice is Created")
print("Spark version: ", spark1.version)
print("------------- Happy Codding ----------")

Spark Session for Practice is Created
Spark version:  4.0.1
------------- Happy Codding ----------


In [3]:
data = [
    {"name":"masud rana", "cgpa":3.60, "address":"Kanchan, Rupganj, Narayanganj", "university":"Green University of Bangladesh"}
]

df = spark1.createDataFrame(data)
df.show(truncate=False)


+-----------------------------+----+----------+------------------------------+
|address                      |cgpa|name      |university                    |
+-----------------------------+----+----------+------------------------------+
|Kanchan, Rupganj, Narayanganj|3.6 |masud rana|Green University of Bangladesh|
+-----------------------------+----+----------+------------------------------+



## Dwonload 'csv' data from kaggle

In [4]:
import kagglehub
import shutil
from pathlib import Path

downloaded_dir = kagglehub.dataset_download("karkavelrajaj/amazon-sales-dataset", force_download=True)
source_dir = Path(downloaded_dir)

if not source_dir.exists() and not source_dir.is_dir():
    raise FileNotFoundError("Maybe kaggle unable to download the dataset!")

files = [f for f in source_dir.iterdir() if f.is_file()]

if len(files) != 1:
    raise RuntimeError("It should contain exactly one file!")

source_file = files[0]
source_file_ext = source_file.suffix
print("source file path and extentions: ", source_file, source_file_ext)


project_root = Path.cwd()
target_dir = project_root / "data" / "raw"
target_dir.mkdir(parents=True, exist_ok=True)
print("project root: ", project_root, "target dir: ", target_dir)

target_file_name = f"amazon_sales_raw{source_file_ext}"
target_file_path = target_dir / target_file_name
print("target file name: ", target_file_name, "target file path: ", target_file_path)
shutil.move(str(source_file), str(target_file_path)) 

  from .autonotebook import tqdm as notebook_tqdm


Downloading from https://www.kaggle.com/api/v1/datasets/download/karkavelrajaj/amazon-sales-dataset?dataset_version_number=1...


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 1.95M/1.95M [00:01<00:00, 1.43MB/s]

Extracting files...
source file path and extentions:  C:\Users\User\.cache\kagglehub\datasets\karkavelrajaj\amazon-sales-dataset\versions\1\amazon.csv .csv
project root:  c:\Users\User\PycharmProjects\AmazonSalesETLwithAirflow target dir:  c:\Users\User\PycharmProjects\AmazonSalesETLwithAirflow\data\raw
target file name:  amazon_sales_raw.csv target file path:  c:\Users\User\PycharmProjects\AmazonSalesETLwithAirflow\data\raw\amazon_sales_raw.csv





'c:\\Users\\User\\PycharmProjects\\AmazonSalesETLwithAirflow\\data\\raw\\amazon_sales_raw.csv'

## 

In [5]:
# print schema data types
df = (
    spark1.read
    .option("header", "true")
    .option("multiLine", "true")
    .option("quote", '"')
    .option("escape", '"')
    .option("mode", "PERMISSIVE")
    .option("columnNameOfCorruptRecord", "_corrupt_record")
    .csv(str(target_file_path))
)
df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- discounted_price: string (nullable = true)
 |-- actual_price: string (nullable = true)
 |-- discount_percentage: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- rating_count: string (nullable = true)
 |-- about_product: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_title: string (nullable = true)
 |-- review_content: string (nullable = true)
 |-- img_link: string (nullable = true)
 |-- product_link: string (nullable = true)



In [6]:
# print all columns
print(len(df.columns))

df.columns

16


['product_id',
 'product_name',
 'category',
 'discounted_price',
 'actual_price',
 'discount_percentage',
 'rating',
 'rating_count',
 'about_product',
 'user_id',
 'user_name',
 'review_id',
 'review_title',
 'review_content',
 'img_link',
 'product_link']

In [7]:
df.show(5, truncate=False)

+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------+----------------+------------+-------------------+------+------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [8]:
# count of non-null values in each column
from pyspark.sql.functions import col, count

df.select([count(c).alias(c) for c in df.columns]).show()

+----------+------------+--------+----------------+------------+-------------------+------+------------+-------------+-------+---------+---------+------------+--------------+--------+------------+
|product_id|product_name|category|discounted_price|actual_price|discount_percentage|rating|rating_count|about_product|user_id|user_name|review_id|review_title|review_content|img_link|product_link|
+----------+------------+--------+----------------+------------+-------------------+------+------------+-------------+-------+---------+---------+------------+--------------+--------+------------+
|      1465|        1465|    1465|            1465|        1465|               1465|  1465|        1463|         1465|   1465|     1465|     1465|        1465|          1465|    1465|        1465|
+----------+------------+--------+----------------+------------+-------------------+------+------------+-------------+-------+---------+---------+------------+--------------+--------+------------+



In [9]:
# count null values in each column
from pyspark.sql.functions import col, sum, when

df.select(
    [sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df.columns]
).show()

+----------+------------+--------+----------------+------------+-------------------+------+------------+-------------+-------+---------+---------+------------+--------------+--------+------------+
|product_id|product_name|category|discounted_price|actual_price|discount_percentage|rating|rating_count|about_product|user_id|user_name|review_id|review_title|review_content|img_link|product_link|
+----------+------------+--------+----------------+------------+-------------------+------+------------+-------------+-------+---------+---------+------------+--------------+--------+------------+
|         0|           0|       0|               0|           0|                  0|     0|           2|            0|      0|        0|        0|           0|             0|       0|           0|
+----------+------------+--------+----------------+------------+-------------------+------+------------+-------------+-------+---------+---------+------------+--------------+--------+------------+



In [10]:
# rename the product_name to product_title
df.withColumnRenamed("product_name", "product_title").show(2)

+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|product_id|       product_title|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|       about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B07JW9H4J1|Wayona Nylon Brai...|Computers&Accesso...|            ‚Çπ399|      ‚Çπ1,099|                64%|   4.2|      

In [11]:
df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- discounted_price: string (nullable = true)
 |-- actual_price: string (nullable = true)
 |-- discount_percentage: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- rating_count: string (nullable = true)
 |-- about_product: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_title: string (nullable = true)
 |-- review_content: string (nullable = true)
 |-- img_link: string (nullable = true)
 |-- product_link: string (nullable = true)



## verify every column is correct

In [12]:
# from pyspark.sql import SparkSession

# spark = SparkSession.builder.getOrCreate()

# df_raw = (
#     spark.read
#     .option("header", "true")
#     .option("multiLine", "true")
#     .option("quote", '"')
#     .option("escape", '"')
#     .option("mode", "PERMISSIVE")
#     .option("columnNameOfCorruptRecord", "_corrupt_record")
#     .csv(str(target_file_path))
# )


In [13]:
# df_raw.printSchema()

In [14]:
# # check other values interefare with discounted_price column
# from pyspark.sql.functions import col

# invalid_discounted_price_df = df_raw.filter(
#     col("discounted_price").isNotNull() &
#     ~col("discounted_price").rlike(r"^‚Çπ[\d,]+(\.\d+)?$")
# )

# invalid_discounted_price_df.select("discounted_price").show(truncate=False)


In [15]:
# remove special characters and cast  
from pyspark.sql.functions import when, regexp_replace, col

def remove_symbol_and_cast(column_name, target_type):
    cleaned_symbol = regexp_replace(col(column_name), "[^\d.]", "")
    return when(
        cleaned_symbol.rlike(r"^\d+(\.\d+)?$"),
        cleaned_symbol.try_cast(target_type)
    ).otherwise(None)


df = df.withColumn("discounted_price", remove_symbol_and_cast("discounted_price", "float")) \
    .withColumn("actual_price", remove_symbol_and_cast("actual_price", "float")) \
    .withColumn("discount_percentage", remove_symbol_and_cast("discount_percentage", "int")) \
    .withColumn("rating_count", remove_symbol_and_cast("rating_count", "int"))

In [21]:
df.show(5)

+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|product_id|        product_name|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|        about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B07JW9H4J1|Wayona Nylon Brai...|Computers&Accesso...|           399.0|      1099.0|                 64|   4.2|       

In [17]:
df = df.withColumn("rating", col("rating").try_cast("float"))

In [18]:
df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- discounted_price: float (nullable = true)
 |-- actual_price: float (nullable = true)
 |-- discount_percentage: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- rating_count: integer (nullable = true)
 |-- about_product: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_title: string (nullable = true)
 |-- review_content: string (nullable = true)
 |-- img_link: string (nullable = true)
 |-- product_link: string (nullable = true)



In [19]:
# see null values in each columns
df.select(
    [sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df.columns]
).show()

+----------+------------+--------+----------------+------------+-------------------+------+------------+-------------+-------+---------+---------+------------+--------------+--------+------------+
|product_id|product_name|category|discounted_price|actual_price|discount_percentage|rating|rating_count|about_product|user_id|user_name|review_id|review_title|review_content|img_link|product_link|
+----------+------------+--------+----------------+------------+-------------------+------+------------+-------------+-------+---------+---------+------------+--------------+--------+------------+
|         0|           0|       0|               0|           0|                  0|     1|           2|            0|      0|        0|        0|           0|             0|       0|           0|
+----------+------------+--------+----------------+------------+-------------------+------+------------+-------------+-------+---------+---------+------------+--------------+--------+------------+



In [22]:
# drop rows will null values
df = df.dropna(subset=[c for c in df.columns])


In [23]:
# see null values in each columns
df.select(
    [sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df.columns]
).show()

+----------+------------+--------+----------------+------------+-------------------+------+------------+-------------+-------+---------+---------+------------+--------------+--------+------------+
|product_id|product_name|category|discounted_price|actual_price|discount_percentage|rating|rating_count|about_product|user_id|user_name|review_id|review_title|review_content|img_link|product_link|
+----------+------------+--------+----------------+------------+-------------------+------+------------+-------------+-------+---------+---------+------------+--------------+--------+------------+
|         0|           0|       0|               0|           0|                  0|     0|           0|            0|      0|        0|        0|           0|             0|       0|           0|
+----------+------------+--------+----------------+------------+-------------------+------+------------+-------------+-------+---------+---------+------------+--------------+--------+------------+



In [37]:
# count and sum of numeric columns
from pyspark.sql.functions import col, count, sum, round as spark_round
from pyspark.sql.types import IntegerType, FloatType, DoubleType, LongType, ShortType

# filter only numeric columns
numeric_types = (IntegerType, FloatType, DoubleType, LongType, ShortType)
numeric_columns = [
    field.name
    for field in df.schema.fields
    if isinstance(field.dataType, numeric_types)
]

numeric_columns

['discounted_price',
 'actual_price',
 'discount_percentage',
 'rating',
 'rating_count']

In [38]:
# count of numeric columns
df.select([count(c).alias(c) for c in numeric_columns]).show()

# sum of each numeric columns
df.select([spark_round(sum(col(c)), 3).alias(c) for c in numeric_columns]).show()

+----------------+------------+-------------------+------+------------+
|discounted_price|actual_price|discount_percentage|rating|rating_count|
+----------------+------------+-------------------+------+------------+
|            1462|        1462|               1462|  1462|        1462|
+----------------+------------+-------------------+------+------------+

+----------------+------------+-------------------+------+------------+
|discounted_price|actual_price|discount_percentage|rating|rating_count|
+----------------+------------+-------------------+------+------------+
|      4576033.43|  7972414.28|              69697|5989.4|    26765385|
+----------------+------------+-------------------+------+------------+



In [39]:
df.show(5)

+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|product_id|        product_name|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|        about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B07JW9H4J1|Wayona Nylon Brai...|Computers&Accesso...|           399.0|      1099.0|                 64|   4.2|       

## Transformation apply

In [None]:
# add price difference column
df = df.withColumn("price_difference", col("actual_price") - col("discounted_price"))

df.show(5)

+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+
|product_id|        product_name|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|        about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|price_difference|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+
|B07JW9H4J1|Wayona Nylon Brai...|Computers&Accesso...|           39

In [51]:
df = df.drop("discount_price_calc")

df.columns

['product_id',
 'product_name',
 'category',
 'discounted_price',
 'actual_price',
 'discount_percentage',
 'rating',
 'rating_count',
 'about_product',
 'user_id',
 'user_name',
 'review_id',
 'review_title',
 'review_content',
 'img_link',
 'product_link']

In [54]:
# rating bucket
df = df.withColumn(
    "rating_bucket",
    when(col("rating") >= 4.5, "Excellent")
    .when(col("rating") >= 4.0, "Good")
    .when(col("rating") >= 3.0, "Average")
    .otherwise("Poor")
)

df.show(10)

+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+-------------+
|product_id|        product_name|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|        about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|price_difference|rating_bucket|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+-------------+
|B07JW9H4J1|Wayona Nylon 

In [55]:
# review strength
df = df.withColumn(
    "review_strength",
    when(col("rating_count") >= 10000, "Very High")
    .when(col("rating_count") >= 1000, "High")
    .when(col("rating_count") >= 100, "Medium")
    .otherwise("Low")
)

In [57]:
df.show(4)

+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+-------------+---------------+
|product_id|        product_name|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|        about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|price_difference|rating_bucket|review_strength|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+--------