In [17]:
from pyspark.sql.functions import lower, regexp_replace, regexp_extract, col, trim, when, instr, lit, split, size, avg, isnan, when, count, isnull, mean, coalesce
from pyspark.sql.functions import concat_ws
from pyspark.sql import Window

special_char = '[^a-z0-9A-Z_ ' \
               'àáãạảăắằẳẵặâấầẩẫậèéẹẻẽêềếểễệđìíĩỉịòóõọỏôốồổỗộơớờởỡợùúũụủưứừửữựỳỵỷỹýÀÁÃẠẢĂẮẰẲẴẶÂẤẦẨẪẬ' \
               'ÈÉẸẺẼÊỀẾỂỄỆĐÌÍĨỈỊÒÓÕỌỎÔỐỒỔỖỘƠỚỜỞỠỢÙÚŨỤỦƯỨỪỬỮỰỲỴỶỸÝ]+'

In [3]:
from pyspark.sql import SparkSession

# Create a DataFrame using SparkSession
spark = (SparkSession
    .builder
    .appName("lazada_cleaning")
    .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/10 22:19:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [13]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType, MapType

path = '/home/jazzdung/data/lazada/Thời trang & Phụ kiện Nữ/Đồng Hồ Nữ/Đồng Hồ Cao Cấp/product.ndjson'
# path = '/home/jazzdung/E-Commerce-Support-System/mr_clean/product.ndjson'

# spark.conf.set("spark.sql.caseSensitive", True)
# df = spark.read.json(path)
# spark.conf.set("spark.sql.caseSensitive", False)

# to convert attrs to String
schema = StructType([
    StructField("avg_rating", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("price", StringType(), True),
    StructField("brand_name", StringType(), True),
    StructField("num_review", StringType(), True),
    StructField("attrs", MapType(StringType(),StringType(),True)), 
    StructField("product_desc", StringType(), True),
    StructField("url", StringType(), True)
])

df = spark.read.format("json").schema(schema).load(path)


df.count()
df.printSchema()
# df.show(100, truncate=False)

root
 |-- avg_rating: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)
 |-- brand_name: string (nullable = true)
 |-- num_review: string (nullable = true)
 |-- attrs: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- product_desc: string (nullable = true)
 |-- url: string (nullable = true)



In [72]:
def clean_product_name(df):
    # Lowercase
    product_name = lower(col('product_name'))

    # # Remove like tier
    # product_name = regexp_replace(product_name, 'yêu thích\n|yêu thích\+\n', ' ')

    # Remove contents inside [], option indicate promotion, prices    
    product_name = regexp_replace(product_name, r'\[.*?\]', ' ')

    # Remove contents inside [], option indicate promotion, prices    
    product_name = regexp_replace(product_name, r'\(.*?\)', ' ')

    # Remove special character
    product_name = regexp_replace(product_name, special_char, ' ')
    # Remove redundant whitespaces  
    product_name = regexp_replace(product_name, ' +', ' ')

    # Trim
    product_name = trim(product_name)
    return df.withColumn('product_name', product_name)

def clean_price(df):
    value = col('price')   
    value = regexp_replace(value, r'₫.*' , '')
    value = regexp_replace(value, '\.', '')
    value = value.cast('int')

    return df.withColumn('price', value)

def clean_brand(df):
    brand_name = lower(col('brand_name'))
    brand_name = regexp_replace(brand_name, 'no brand', 'no info')
    brand_name = regexp_replace(brand_name, special_char, ' ')
    brand_name = regexp_replace(brand_name, ' +', ' ')
    brand_name = trim(brand_name)
    return df.withColumn('brand_name', brand_name)

def clean_review(df):
    num_review = lower(col('num_review'))
    num_review = regexp_replace(num_review, 'không có đánh giá', '0')
    num_review = regexp_replace(num_review, ' đánh giá', '')
    num_review = num_review.cast('int')

    return df.withColumn('num_review', num_review)

def clean_attrs(df):
    attrs = lower(col('attrs').cast('string'))
    attrs = regexp_replace(attrs, special_char, ' ')
    attrs = regexp_replace(attrs, ' +', ' ')
    attrs = trim(attrs)
    return df.withColumn('attrs', attrs)

def clean_desc(df):
    product_desc = lower(col('product_desc'))
    product_desc = regexp_replace(product_desc, ' &amp;', ',')
    product_desc = regexp_replace(product_desc, '<svg.*?</svg>|<div>|div|class=|"|<label.*?>|<flex.*?>| href=/', '')
    product_desc = regexp_replace(product_desc, '</a>', '-')
    product_desc = regexp_replace(product_desc, '</label>', ': ')
    product_desc = regexp_replace(product_desc, '< ', '<')
    product_desc = regexp_replace(product_desc, "\/.*?\>","/>")
    product_desc = regexp_replace(product_desc, '<a ', '<')
    product_desc = regexp_replace(product_desc, '<p ', '<')

    product_desc = regexp_replace(product_desc, '</>', ' ')
    product_desc = regexp_replace(product_desc, '<.*?>', ' ')
    product_desc = regexp_replace(product_desc, '\\n', ' ')
    product_desc = regexp_replace(product_desc, special_char, ' ')

    product_desc = regexp_replace(product_desc, ' +', ' ')

    return df.withColumn('product_desc', product_desc)

In [78]:
df_cleaned = clean_product_name(df)
df_cleaned = clean_price(df_cleaned)
df_cleaned = clean_brand(df_cleaned)
df_cleaned = clean_review(df_cleaned)
df_cleaned = clean_attrs(df_cleaned)
df_cleaned = clean_desc(df_cleaned)
df_cleaned = df_cleaned.withColumn("avg_rating",df_cleaned["avg_rating"].cast('double'))

df_cleaned = df_cleaned.withColumnRenamed('brand_name', 'brand')
df_cleaned = df_cleaned.withColumnRenamed('product_desc', 'description')

df_cleaned.printSchema()
df_cleaned.show(10)

root
 |-- avg_rating: double (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- brand: string (nullable = true)
 |-- num_review: integer (nullable = true)
 |-- attrs: string (nullable = true)
 |-- description: string (nullable = true)
 |-- url: string (nullable = true)

+----------+--------------------+------+---------+----------+--------------------+--------------------+--------------------+
|avg_rating|        product_name| price|    brand|num_review|               attrs|         description|                 url|
+----------+--------------------+------+---------+----------+--------------------+--------------------+--------------------+
|       4.9|đồng hồ nữ đơn gi...|425000|  no info|        26|phân loại màu sắc...| sản phẩm được nh...|https://www.lazad...|
|       4.9|đồng hồ nữ đơn gi...|425000|  no info|        26|phân loại màu sắc...| sản phẩm được nh...|https://www.lazad...|
|       4.9|đồng hồ nữ đơn gi...|425000|  no info|

In [73]:
clean_product_name(df).select("product_name").distinct().show(100, truncate=False)
clean_price(df).select("price").distinct().show(100, truncate=False)
clean_brand(df).select("brand_name").distinct().show(100, truncate=False)
clean_review(df).select("num_review").distinct().show(100, truncate=False)
clean_attrs(df).select("attrs").distinct().show(100, truncate=False)


clean_desc(df).select("url", "product_desc").distinct().show(100, truncate=False)


+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|product_name                                                                                                                                                                                                                                         |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|đồng hồ nữ pha lê thời trang đồng hồ kim cương vòng tay nữ thường ngày thương hiệu cao cấp hàng đầu đồng hồ pha lê rhinestone casual đồng hồ đeo tay                                                                                                 |
|louiswi

In [65]:
desc_df = df.select("product_desc")
# desc_df.show(100, truncate=False)
df.printSchema()
# df.show(100, truncate=False)

root
 |-- avg_rating: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)
 |-- brand_name: string (nullable = true)
 |-- num_review: string (nullable = true)
 |-- attrs: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- product_desc: string (nullable = true)
 |-- url: string (nullable = true)



# Implement

In [None]:


df_cleaned = df_cleaned.withColumn("avg_rating",df_cleaned["avg_rating"].cast('double'))
