In [0]:
%pip install nltk
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

import re
import string
from string import punctuation
from nltk.corpus import stopwords
from nltk.tokenize import ToktokTokenizer
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.stem import PorterStemmer
from bs4 import BeautifulSoup

from pyspark.sql.functions import from_utc_timestamp, date_format
from pyspark.sql.types import TimestampType
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import col


[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [0]:
table_name = []

In [0]:
for i in dbutils.fs.ls('mnt/bronze/dbo/'):
    table_name.append(i.name.split('/')[0])

In [0]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
from bs4 import BeautifulSoup
import re
from nltk.stem import PorterStemmer, WordNetLemmatizer
from nltk.corpus import stopwords

def clean_reviews(text_column):
    ps = PorterStemmer()
    lemmatizer = WordNetLemmatizer()
    stopwords_list = stopwords.words('english')

    def strip_html(text):
        return BeautifulSoup(text, "html.parser").get_text()

    def remove_punctuation(text):
        return re.sub(r'[^\w\s]', '', text)

    def remove_digits(text):
        return re.sub(r'\d+', '', text)

    def remove_repeated_chars(text):
        return re.sub(r'(.)\1{2,}', r'\1', text)

    def clean(text):
        text = strip_html(text)
        text = remove_punctuation(text)
        text = remove_digits(text)
        text = text.lower()
        text = ' '.join([word for word in text.split() if word not in stopwords_list])
        text = remove_repeated_chars(text)
        text = ' '.join([lemmatizer.lemmatize(word) for word in text.split()])
        text = ' '.join([ps.stem(word) for word in text.split()])
        return text

    return udf(clean, StringType())(col(text_column))
def clean_reviews(text_column):
    ps = PorterStemmer()
    lemmatizer = WordNetLemmatizer()
    stopwords_list = stopwords.words('english')

    def strip_html(text):
        return BeautifulSoup(text, "html.parser").get_text()

    def remove_punctuation(text):
        return re.sub(r'[^\w\s]', '', text)

    def remove_digits(text):
        return re.sub(r'\d+', '', text)

    def remove_repeated_chars(text):
        return re.sub(r'(.)\1{2,}', r'\1', text)

    def clean(text):
        text = strip_html(text)
        text = remove_punctuation(text)
        text = remove_digits(text)
        text = text.lower()
        text = ' '.join([word for word in text.split() if word not in stopwords_list])
        text = remove_repeated_chars(text)
        text = ' '.join([lemmatizer.lemmatize(word) for word in text.split()])
        text = ' '.join([ps.stem(word) for word in text.split()])
        return text

    return udf(clean, StringType())(col(text_column))

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import col, count, when, lower, regexp_replace

# Function to calculate the mode (most frequent value)
def get_mode(df, column):
    mode_df = df.groupBy(column).count().orderBy(col("count").desc()).limit(1)
    mode_value = mode_df.collect()[0][0]
    return mode_value

# Function to clean review text
def clean_reviews(review_col):
    return lower(regexp_replace(review_col, '[^a-zA-Z0-9\s]', ''))  # Remove punctuation and convert to lower case

for table in table_name:
    # Load each table data from the bronze layer
    path = f'/mnt/bronze/dbo/{table}/{table}.parquet'
    df = spark.read.format('parquet').load(path)
    
    # Apply data preprocessing
    if 'is_recommended' in df.columns:
        df = df.na.fill({'is_recommended': 0})  # Fill missing 'is_recommended' with 0
    
    if 'helpfulness' in df.columns:
        df = df.na.fill({'helpfulness': 0})  # Fill missing 'helpfulness' with 0
    
    if 'review_text' in df.columns:
        df = df.na.drop(subset=['review_text'])

    # Handle categorical and text columns with missing values
    if 'skin_tone' in df.columns:
        mode_value = get_mode(df, 'skin_tone')
        if mode_value is not None:
            df = df.na.fill({'skin_tone': mode_value})
    
    if 'eye_color' in df.columns:
        mode_value = get_mode(df, 'eye_color')
        if mode_value is not None:
            df = df.na.fill({'eye_color': mode_value})
    
    if 'skin_type' in df.columns:
        mode_value = get_mode(df, 'skin_type')
        if mode_value is not None:
            df = df.na.fill({'skin_type': mode_value})
    
    if 'hair_color' in df.columns:
        mode_value = get_mode(df, 'hair_color')
        if mode_value is not None:
            df = df.na.fill({'hair_color': mode_value})
    
    if 'review_title' in df.columns:
        df = df.na.fill({'review_title': 'No_title'})  # Fill missing 'review_title' with 'No_title'

    # Correcting the "Grey" to "gray" value in eye_color
    if 'eye_color' in df.columns:
        df = df.withColumn('eye_color', when(col('eye_color') == 'Grey', 'gray').otherwise(col('eye_color')))

    # Remove duplicates
    df = df.dropDuplicates()

    # Convert column data types
    if 'author_id' in df.columns:
        df = df.withColumn('author_id', col('author_id').cast('string'))
    
    if 'rating' in df.columns:
        df = df.withColumn('rating', col('rating').cast('int'))
    
    if 'is_recommended' in df.columns:
        df = df.withColumn('is_recommended', col('is_recommended').cast('boolean'))
    
    if 'helpfulness' in df.columns:
        df = df.withColumn('helpfulness', col('helpfulness').cast('float'))
    
    if 'total_feedback_count' in df.columns:
        df = df.withColumn('total_feedback_count', col('total_feedback_count').cast('int'))

    if 'total_pos_feedback_count' in df.columns:
        df = df.withColumn('total_pos_feedback_count', col('total_pos_feedback_count').cast('int'))
    
    if 'total_neg_feedback_count' in df.columns:
        df = df.withColumn('total_neg_feedback_count', col('total_neg_feedback_count').cast('int'))
    
    if 'submission_time' in df.columns:
        df = df.withColumn('submission_time', col('submission_time').cast('timestamp'))

    # Text cleaning for NLP
    if 'review_text' in df.columns:
        df = df.withColumn('clean_review', clean_reviews(col('review_text')))

    # Adjust columns containing dates
    for col_name in df.columns:
        if "Date" in col_name or "date" in col_name:
            df = df.withColumn(col_name, date_format(from_utc_timestamp(col(col_name).cast(TimestampType()), "UTC"), "yyyy-MM-dd"))

In [0]:
import nltk
from pyspark.sql.functions import col, lower, regexp_replace, udf
from pyspark.sql.types import StringType
import re
from bs4 import BeautifulSoup
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer, WordNetLemmatizer

# Initialize NLTK components (ensure this is only done once)
nltk.download('stopwords')
nltk.download('wordnet')

# Define the clean_reviews function
def clean_reviews(df, text_column):
    # Initialize necessary tools
    ps = PorterStemmer()
    lemmatizer = WordNetLemmatizer()
    stopwords_list = stopwords.words('english')
    
    # Define the cleaning functions as UDFs
    def strip_html(text):
        if text:
            soup = BeautifulSoup(text, "html.parser")
            return soup.get_text()
        return ""

    def remove_repeated_chars(text):
        pattern = re.compile(r'(.)\1{2,}')
        return pattern.sub(r'\1', text)

    # Register UDFs
    strip_html_udf = udf(strip_html, StringType())
    remove_repeated_chars_udf = udf(remove_repeated_chars, StringType())

    # Cleaning process
    df = df.withColumn('clean_reviews', strip_html_udf(col(text_column)))  # Remove HTML tags
    df = df.withColumn('clean_reviews', regexp_replace(col('clean_reviews'), r'[^\w\s]', ''))  # Remove punctuation
    df = df.withColumn('clean_reviews', regexp_replace(col('clean_reviews'), r'\d+', ''))  # Remove digits
    df = df.withColumn('clean_reviews', regexp_replace(col('clean_reviews'), r'^\s+|\s+$', ''))  # Remove leading/trailing spaces
    df = df.withColumn('clean_reviews', lower(col('clean_reviews')))  # Convert to lowercase

    # Remove stopwords
    def remove_stopwords(text):
        return ' '.join([word for word in text.split() if word not in stopwords_list])

    remove_stopwords_udf = udf(remove_stopwords, StringType())
    df = df.withColumn('clean_reviews', remove_stopwords_udf(col('clean_reviews')))

    # Remove repeated characters
    df = df.withColumn('clean_reviews', remove_repeated_chars_udf(col('clean_reviews')))

    # Lemmatization and stemming (these can be done together or separately)
    def lemmatize_and_stem(text):
        return ' '.join([ps.stem(lemmatizer.lemmatize(word)) for word in text.split()])

    lemmatize_and_stem_udf = udf(lemmatize_and_stem, StringType())
    df = df.withColumn('clean_reviews', lemmatize_and_stem_udf(col('clean_reviews')))

    return df


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...


In [0]:
# List of table names to process
table_names = ["products", "reviews", "authors"]  # Update with your actual table names

# Iterate through each table
for table in table_names:
    # Load each table data from the bronze layer
    path = f'/mnt/bronze/dbo/{table}/{table}.parquet'
    df = spark.read.format('parquet').load(path)
    
    # Check if 'review_text' exists in the DataFrame
    if 'review_text' in df.columns:
        # Apply the cleaning function
        df = clean_reviews(df, 'review_text')

        # Display the first 5 rows of the 'clean_reviews' column
        display(df.select("clean_reviews").limit(5))
    else:
        print(f"'review_text' column not found in table: {table}")


'review_text' column not found in table: products


clean_reviews
thick cleanser bare scent moistur clean face well like tini bit get eye make vision cloudi much rub water rins still get bid cloudy would see use winter
doesnt leav skin need anoth product use multipl time get rid layer
would give could broke bad thought chicken pock never broken skin care product love previou item brand horribl
didnt love made skin feel felt hard get im probabl fan balm cleanser prefer might like
thought id tri afford product receiv feel separ part go thick like vaselin im squeez thick part thin drizzl oil leak everywher doesnt melt spread easili feel like im pull skin unnecessarili cleans somehow still alway raccoon eye afterward go anoth makeup remov other mention leaf oili residu hard wash way better cleans balm


'review_text' column not found in table: authors


In [0]:
for table in table_names:
    # Load each table data from the bronze layer
    path = f'/mnt/bronze/dbo/{table}/{table}.parquet'
    
    # Read the parquet file into a DataFrame
    df = spark.read.format('parquet').load(path)

    # Ensure DataFrame is not empty
    if df.count() == 0:
        print(f"No data found in table: {table}")
        continue  # Skip this table if no data

    # Display schema to confirm the data structure
    df.printSchema()

    # Save the processed data to the silver layer using Delta format
    silver_path = f'/mnt/silver/dbo/{table}/'
    
    # Writing DataFrame to Delta format
    try:
        df.write.format("delta").mode("overwrite").save(silver_path)
        print(f"Table {table} has been successfully saved to silver layer.")
    except Exception as e:
        print(f"Error saving table {table}: {e}")


root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)

Table products has been successfully saved to silver layer.
root
 |-- product_id: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- rating: long (nullable = true)
 |-- is_recommended: double (nullable = true)
 |-- helpfulness: double (nullable = true)
 |-- total_feedback_count: long (nullable = true)
 |-- total_pos_feedback_count: long (nullable = true)
 |-- total_neg_feedback_count: long (nullable = true)
 |-- submission_time: date (nullable = true)
 |-- review_text: string (nullable = true)
 |-- review_title: string (nullable = true)
 |-- ID: integer (nullable = true)

Table reviews has been successfully saved to silver layer.
root
 |-- author_id: string (nullable = true)
 |-- skin_tone: string (nullable = true)
 |-- eye_color: string (nullable = true)
 |-- skin_type: string (nullable = true)
 |-- hair_color: string (nullable = true)

Table authors has been successfully sa

In [0]:
# List of table names to process
table_names = ["products", "reviews", "authors"]  # Update with your actual table names

# Iterate through each table
for table in table_names:
    # Load each table data from the bronze layer
    path = f'/mnt/bronze/dbo/{table}/{table}.parquet'
    df = spark.read.format('parquet').load(path)
    
    # Check if 'review_text' exists in the DataFrame
    if 'review_text' in df.columns:
        # Apply the cleaning function
        df = clean_reviews(df, 'review_text')
        
        # Display the first 5 rows of the 'clean_reviews' column
        display(df.select("clean_reviews").limit(5))
    else:
        print(f"'review_text' column not found in table: {table}")

'review_text' column not found in table: products


clean_reviews
thick cleanser bare scent moistur clean face well like tini bit get eye make vision cloudi much rub water rins still get bid cloudy would see use winter
doesnt leav skin need anoth product use multipl time get rid layer
would give could broke bad thought chicken pock never broken skin care product love previou item brand horribl
didnt love made skin feel felt hard get im probabl fan balm cleanser prefer might like
thought id tri afford product receiv feel separ part go thick like vaselin im squeez thick part thin drizzl oil leak everywher doesnt melt spread easili feel like im pull skin unnecessarili cleans somehow still alway raccoon eye afterward go anoth makeup remov other mention leaf oili residu hard wash way better cleans balm


'review_text' column not found in table: authors


In [0]:
from pyspark.sql.functions import count, when, col

# List of table names to process
table_names = ["products", "reviews", "authors"]  # Update with your actual table names

# Iterate through each table
for table in table_names:
    # Load each table data from the bronze layer
    path = f'/mnt/bronze/dbo/{table}/{table}.parquet'
    df = spark.read.format('parquet').load(path)
    
    # Check for null values in the DataFrame
    null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
    
    # Display the null value counts for each column
    print(f"Null value counts for table: {table}")
    for column, null_count in null_counts.items():  # Renamed variable 'count' to 'null_count' to avoid conflict
        print(f"{column}: {null_count}")
    print()

Null value counts for table: products
product_id: 0
product_name: 0

Null value counts for table: reviews
product_id: 0
author_id: 0
rating: 0
is_recommended: 117486
helpfulness: 331832
total_feedback_count: 0
total_pos_feedback_count: 0
total_neg_feedback_count: 0
submission_time: 0
review_text: 999
review_title: 167011
ID: 0

Null value counts for table: authors
author_id: 0
skin_tone: 75622
eye_color: 98422
skin_type: 339017
hair_color: 99738



In [0]:
display(spark.read.format('parquet').load('/mnt/bronze/dbo/authors/authors.parquet'))


author_id,skin_tone,eye_color,skin_type,hair_color
10000015049,fair,green,,brown
10000060335,fair,blue,,blonde
10000098796,,,,
1000010050,,,,
10000117144,light,brown,,brown
10000120301,lightMedium,green,,blonde
10000217994,fair,brown,,red
10000247828,mediumTan,green,,black
10000254625,fairLight,Grey,,red
10000290716,light,blue,,blonde
