# Reviews Data Collection and Processing
### Objectives :
- Scrape "Backmarket" customers reviews from Trustpilot.
- Clean review data collected
- Processed reviews data collected


### Required packages

In [1]:
# Required packages
import os
import json
import requests
import pandas as pd
from bs4 import BeautifulSoup
from datetime import datetime
from pathlib import Path
import time

import logging
import shutil

import re
from typing import List, Optional
import emoji
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords


### Global Logging Configuration
- All log messages (INFO, ERROR, etc.) will be written to the specified log file (concatenation.log).
- If StreamHandler is included, logs will also appear in the console.

In [2]:
# Ensure the logs directory exists
log_dir = "../logs_etl"
os.makedirs(log_dir, exist_ok=True)  # Create the directory if it doesn't exist
log_file = os.path.join(log_dir, "concatenation.log")

# Configure logging only if it hasn't been configured yet
if not logging.root.handlers:
    logging.basicConfig(
        level=logging.INFO,  # Fixed logging level
        format="%(asctime)s - %(levelname)s - %(message)s",
        handlers=[
            logging.FileHandler(log_file),  # Write logs to a file
            logging.StreamHandler()         # Optional: Keep console output
        ]
    )


### Data collection
Target company for review analysis : Back Market - A global marketplace for refurbished devices. 

Review data will be collected from Trustpilot, a platform for collecting verified customer reviews


#### Constants and Parameters

In [None]:
# Constants and parameters
COMPANY_NAME = "backmarket" # Backmarket
BASE_URL = f'https://fr.trustpilot.com/review/www.{COMPANY_NAME}.fr'
MAX_PAGE_PARAM_FILE = Path("../parameters/max_page.txt")
NEW_REVIEW_PARAM_FILE = Path("../parameters/new_reviews.txt")
HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/91.0.4472.124 Safari/537.36"
    )
}

# Directories
RAW_DATA_DIR = Path("../data/raw")
CLEANED_DATA_DIR = Path("../data/cleaned")
FULL_REVIEWS_DIR = Path("../data/full")
ARCHIVE_DIR = Path("../data/archive")



#### Function to scrape reviews from trustpilot
This Python function, extract_reviews, is designed to scrape and collect customer reviews from a paginated website ([trustpilot platform](https://fr.trustpilot.com/)) for a specific company. 
Here's a concise description of its functionality:

1. **Purpose** :
The function extracts structured review data (e.g., review ID, title, text, rating, reply, dates) from multiple pages of a website and saves the collected data into a CSV file.

2. **Key Steps** :
    - Input Parameters :
        - company_name: Name of the company (default is a predefined constant).
        - start_page: The page number to start scraping from (default is 1).
        - end_page: The last page to scrape (default is a predefined constant).
    - HTTP Requests :
        Sends GET requests to the website using requests with custom headers and handles potential HTTP errors.
    - HTML Parsing :
        Uses BeautifulSoup to parse the HTML content and extract embedded JSON data containing reviews from a script tag.
    - Data Extraction :
        Iterates through the JSON data, extracting specific fields (e.g., review text, rating, dates) and storing them in a list of dictionaries.
    -  Error Handling :
        Includes error handling for network issues, missing data, and parsing errors.
    - Rate Limiting :
        Implements a delay (time.sleep) after processing every 10 pages to avoid overloading the server.
    - Output :
    Converts the collected data into a Pandas DataFrame and saves it as a CSV file in a specified directory.
3. **Output** :
    - Returns the file path of the saved CSV file containing the extracted reviews.

In [5]:
# function to get the end page to collect reviews from
def get_end_page(file_path=MAX_PAGE_PARAM_FILE):

    try:
        # Open and read the file
        with open(file_path, 'r') as file:
            # Read the first line and strip whitespace/newline characters
            first_line = file.readline().strip()

            # Validate that the line contains only digits
            if first_line.isdigit():
                return int(first_line)
            else:
                logging.error("Error: The first line does not contain a valid integer.")
                return None

    except FileNotFoundError:
        logging.error(f"Error: The file '{file_path}' was not found.")
        return None

    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}")
        return None


# Example usage
if __name__ == "__main__":
    end_page = get_end_page()
    if end_page is not None:
        print(f"The value of end_page is: {end_page}")

The value of end_page is: 2150


In [7]:
# function to extract reviews for a given company name 
def extract_reviews(company_name=COMPANY_NAME):
    """
    Scrapes customer reviews from a paginated website for a specific company. 
    Extracts key review details such as text, rating, and dates into a structured format. 
    Saves the collected data as a CSV file for further analysis.
    """
    headers = HEADERS
    base_url = BASE_URL
    keys = ["id", "title", "review", "rating", "reply", "experienceDate", "createdDateTime", "publishedDate", "replyPublishedDate"]
    
    new_max_page_flag_path = os.path.join("./parameters", "max_page.txt")
    end_page = get_end_page(file_path=new_max_page_flag_path)
    end_page = max(10, end_page)
    start_page = max(1, end_page - 10)

    reviews_list = []  # List to collect all review data
    counter = 0
    for page in range(end_page, start_page, -1):
        counter += 1
        logging.info(f"Processing page {page}")
        
        url_page = f"{base_url}?page={page}"
        
        try:
            response = requests.get(url_page, headers=headers, timeout=5)
            response.raise_for_status()  # Verify HTTP errors
        except requests.RequestException as e:
            logging.error(f"Error accessing page {page}: {e}")
            continue

        try:
            soup = BeautifulSoup(response.content, 'html.parser')
            script_content = soup.body.script.contents if soup.body and soup.body.script else None
            
            if not script_content:
                logging.warning(f"No data found in page {page}")
                continue

            raw_data = json.loads(script_content[0])
            raw_data = raw_data.get("props", {}).get("pageProps", {}).get("reviews", [])
            
            for review in raw_data:
                tmp = {}
                tmp["id"] = review.get("id")
                tmp["title"] = review.get("title")
                tmp["review"] = review.get("text")
                tmp["rating"] = review.get("rating")
                try:
                    tmp["reply"] = review.get("reply", {}).get("message")
                    tmp["replyPublishedDate"] = review.get("reply", {}).get("publishedDate")
                except:
                    tmp["reply"] = None
                    tmp["replyPublishedDate"] = None
                
                tmp["experienceDate"] = review.get("dates", {}).get("experiencedDate")
                tmp["createdDateTime"] = review.get("labels", {}).get("verification", {}).get("createdDateTime")
                tmp["publishedDate"] = review.get("dates", {}).get("publishedDate")

                reviews_list.append({key: tmp.get(key) for key in keys})
            
        except Exception as e:
            logging.error(f"Error processing page {page}: {e}")
            continue
        
        # Avoid hitting the server too frequently
        # if counter % 5 == 0 and counter != 10:
        #     logging.info("Sleeping for 45 seconds to avoid overloading the server.")
        #     time.sleep(45)

    if not reviews_list:
        logging.warning("No reviews collected.")
        return
    
    # Convert list of dicts to DataFrame and save to CSV
    df_raw_reviews = pd.DataFrame(reviews_list)

    # Save reviews data into a CSV file
    output_dir = "../data/raw"
    os.makedirs(output_dir, exist_ok=True)
    raw_file_path = os.path.join(output_dir, f"raw_reviews_{start_page+1}-{end_page}.csv")
    df_raw_reviews.to_csv(raw_file_path, index=False)
    logging.info(f"Saved reviews data to {raw_file_path}")
    
    # Update the next max page flag
    if start_page >= 10:
        flag_value = start_page
    else: 
        flag_value = 10
    with open(new_max_page_flag_path, "w") as f:
        f.write(str(flag_value))
    logging.info(f"Wrote new max page value '{flag_value}' to {new_max_page_flag_path}")
    
    logging.info(f"Finished processing pages {start_page}-{end_page}")
    
    return raw_file_path

#### Initial data collection
There are over 3,000 review pages for Back Market, making it time-consuming to scrape all the data. To avoid overloading the server, we ran the extraction function in batches of 25 pages, incorporating significant delays between batches. The initial scraped data has been saved in the "data/raw" folder.

In [8]:
# Concatenate all extracted reviews into one csv file
def concatenate_reviews(input_dir="../data/raw", \
                        output_file="concat_reviews.csv", \
                        file_prefix="raw_reviews_"):
    """
    Concatenates multiple review CSV files into a single DataFrame,
    removes duplicates and missing values, 
    saves the cleaned data to a new CSV file,
    and moves processed files to an '.archive' folder.
    """
    # Validate input directory
    if not os.path.exists(input_dir):
        logging.error(f"Input directory '{input_dir}' does not exist.")
        return None

    # List files matching the prefix
    files = [f for f in os.listdir(input_dir) if f.startswith(file_prefix)]
    if not files:
        logging.error(f"No files found with prefix '{file_prefix}' in '{input_dir}'.")
        return None

    logging.info(f"Found {len(files)} files to process.")

    # Create .archive folder if it doesn't exist
    archive_dir = os.path.join(input_dir, ".archive")
    os.makedirs(archive_dir, exist_ok=True)
    logging.info(f"Created/verified '.archive' folder at: {archive_dir}")

    # Read and concatenate files efficiently
    try:
        df_list = []
        for f in files:
            file_path = os.path.join(input_dir, f)
            logging.info(f"Reading file: {file_path}")
            df_list.append(pd.read_csv(file_path))
        
        df = pd.concat(df_list, ignore_index=True)
        logging.info("Concatenated all files into a single DataFrame.")
    except Exception as e:
        logging.error(f"Error reading or concatenating files: {e}")
        return None

    # Log initial state
    logging.info(f"Initial DataFrame info:\n{df.info()}")

    # Remove duplicates
    df.drop_duplicates(subset=["id"], inplace=True)
    logging.info(f"Removed duplicates. New DataFrame info:\n{df.info()}")

    # Drop rows with missing values in critical columns
    critical_columns = ["id", "review", "rating", "experienceDate"]
    df.dropna(subset=critical_columns, inplace=True)
    logging.info(f"Removed rows with missing values. Final DataFrame info:\n{df.info()}")

    # Save cleaned DataFrame to CSV
    try:
        output_path = os.path.join(input_dir, output_file)
        df.to_csv(output_path, index=False)
        logging.info(f"Saved cleaned DataFrame to: {output_path}")
    except Exception as e:
        logging.error(f"Error saving cleaned DataFrame: {e}")
        return None

    # Move processed files to .archive folder
    try:
        for f in files:
            src_path = os.path.join(input_dir, f)
            dst_path = os.path.join(archive_dir, f)
            shutil.move(src_path, dst_path)
            logging.info(f"Moved file to archive: {dst_path}")
    except Exception as e:
        logging.error(f"Error moving files to archive: {e}")
        return None

    return df



In [9]:
df = pd.read_csv("../data/full/full_reviews.csv")
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 24522 entries, 0 to 24521
Data columns (total 26 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   id                     24522 non-null  object 
 1   title                  24522 non-null  object 
 2   review                 24522 non-null  object 
 3   rating                 24522 non-null  int64  
 4   reply                  5897 non-null   object 
 5   experienceDate         24522 non-null  object 
 6   createdDateTime        24522 non-null  object 
 7   publishedDate          24522 non-null  object 
 8   replyPublishedDate     5897 non-null   object 
 9   reviewExperienceDelay  24522 non-null  float64
 10  date                   24522 non-null  object 
 11  year                   24522 non-null  int64  
 12  yearQuarter            24522 non-null  object 
 13  yearMonth              24522 non-null  object 
 14  month                  24522 non-null  int64  
 15  mo

In [10]:
df_concat = concatenate_reviews()
df_concat.info()

2025-03-30 15:38:59,097 - INFO - Found 25 files to process.
2025-03-30 15:38:59,099 - INFO - Created/verified '.archive' folder at: ../data/raw/.archive
2025-03-30 15:38:59,100 - INFO - Reading file: ../data/raw/raw_reviews_2251-2260.csv
2025-03-30 15:38:59,106 - INFO - Reading file: ../data/raw/raw_reviews_2211-2220.csv
2025-03-30 15:38:59,110 - INFO - Reading file: ../data/raw/raw_reviews_1871-1880.csv
2025-03-30 15:38:59,114 - INFO - Reading file: ../data/raw/raw_reviews_2291-2300.csv
2025-03-30 15:38:59,118 - INFO - Reading file: ../data/raw/raw_reviews_2181-2190.csv
2025-03-30 15:38:59,120 - INFO - Reading file: ../data/raw/raw_reviews_2271-2280.csv
2025-03-30 15:38:59,122 - INFO - Reading file: ../data/raw/raw_reviews_2161-2170.csv
2025-03-30 15:38:59,124 - INFO - Reading file: ../data/raw/raw_reviews_2861-2870.csv
2025-03-30 15:38:59,127 - INFO - Reading file: ../data/raw/raw_reviews_2301-2310.csv
2025-03-30 15:38:59,128 - INFO - Reading file: ../data/raw/raw_reviews_1851-1860.c

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 25162 entries, 0 to 25161
Data columns (total 9 columns):
 #   Column              Non-Null Count  Dtype 
---  ------              --------------  ----- 
 0   id                  25162 non-null  object
 1   title               25162 non-null  object
 2   review              25162 non-null  object
 3   rating              25162 non-null  int64 
 4   reply               5966 non-null   object
 5   experienceDate      25151 non-null  object
 6   createdDateTime     25162 non-null  object
 7   publishedDate       25162 non-null  object
 8   replyPublishedDate  5966 non-null   object
dtypes: int64(1), object(8)
memory usage: 1.7+ MB
<class 'pandas.core.frame.DataFrame'>
Index: 24574 entries, 0 to 25161
Data columns (total 9 columns):
 #   Column              Non-Null Count  Dtype 
---  ------              --------------  ----- 
 0   id                  24574 non-null  object
 1   title               24574 non-null  object
 2   review        

### Process reviews data

In this step we preprocess raw text data for Natural Language Processing (NLP) tasks by performing a series of cleaning and normalization steps. We removes noise (e.g., hashtags, URLs, mentions, stopwords), converts text to lowercase, tokenizes it, filters out non-alphabetic tokens, and returns the cleaned text as a single string.

#### Text cleaning and normalization
- Noise Removal :
    - Removes hashtags, HTML entities, stock tickers, URLs, retweet tags, mentions, and special characters.
    - Replaces emojis with their textual descriptions and strips punctuation.

- Normalization :
    - Converts text to lowercase, replaces ampersands (&) with "and," and removes short words (≤3 characters).

- Tokenization :
    - Splits text into tokens using French-specific word tokenization.

- Filtering :
    - Removes numbers, non-alphabetic tokens, and stopwords (including custom additions to the French stopwords list).


In [11]:
# Function to clean and process review text

# os.environ["NLTK_DATA"] = "/Users/micheldpd/Projects/custrev/nltk_data"

# Preload stopwords outside the function to avoid repeated loading
STOP_WORDS_TO_ADD = ["être", "leur", "leurs", "avoir", "cela", "les", "de", "pour", "des", "cette", "a",
                   "j'ai", "car", "c'est", "chez", "tout", "fait", "chez", "donc", 
                   "n'est", "si", "alors", "n'ai", "faire", "deux", "comme", "jour", "tr", "si", "ue",
                   "back", "market", "backmarket"

]
STOP_WORDS = set(stopwords.words('french')).union(set(STOP_WORDS_TO_ADD))


def clean_text(text: str) -> str:
    """
    Cleans raw text by removing noise (e.g., hashtags, URLs, stopwords) and normalizing content.
    Tokenizes, filters alphabetic tokens, and removes French stopwords for NLP tasks.
    Returns the cleaned and normalized text as a single string.
    """
    
    # Remove hashtags (keep text after #)
    text = re.sub(r'#', '', text)

    # Remove HTML special entities (e.g., &amp;)
    text = re.sub(r'&\w*;', '', text)

    # Remove stock tickers (e.g., $AAPL)
    text = re.sub(r'\$\w*', '', text)

    # Remove hyperlinks (covers various URL patterns)
    text = re.sub(r'https?://[^\s/$.?#].[^\s]*', '', text)
    text = re.sub(r'http(\S)+', '', text)  # Catch incomplete URLs
    text = re.sub(r'http\s*\.\.\.', '', text)  # Catch truncated URLs

    # Remove retweet tags and mentions
    text = re.sub(r'(RT|rt)\s*@\s*\S+', '', text)
    text = re.sub(r'RT\s?@', '', text)
    text = re.sub(r'@\S+', '', text)

    # Replace & with 'and', fix < and > (assuming intent was to escape them)
    text = re.sub(r'&', 'and', text)

    # Remove words with 3 or fewer letters (e.g., "the", "cat")
    text = re.sub(r'\b\w{1,3}\b', ' ', text)

    # Remove characters beyond Basic Multilingual Plane (BMP) of Unicode
    text = ''.join(c for c in text if ord(c) <= 0xFFFF)

    # Strip leading/trailing whitespace
    text = text.strip()

    # Convert emojis to text descriptions (e.g., 😊 -> :smiling_face:)
    text = emoji.demojize(text)

    # Remove punctuation, keeping alphanumeric characters and spaces
    text = re.sub(r'[^\w\s]', ' ', text)

    # Tokenize text (lowercase for consistency)
    tokens: List[str] = word_tokenize(text.lower(), language='french')

    # Filter out numbers and keep only alphabetic tokens
    tokens_alpha = [token for token in tokens if token.isalpha()]

    # Remove stopwords
    tokens_cleaned = [token for token in tokens_alpha if token not in STOP_WORDS]

    # Join tokens back into a single string
    cleaned_text = ' '.join(tokens_cleaned)

    return cleaned_text


# Example usage
if __name__ == "__main__":
    sample_text = "RT @user: I love this product! < and 16§789> #great https://example.com 😊 &amp; fast service"
    cleaned = clean_text(sample_text)
    print(cleaned)

love this product great fast service


#### Review data cleaning and transformation

##### Define customer sentiment
- Customer sentiment is "positive" if rating >= 4
- Customer sentiment is neutral if rating == 3
- Customer sentiment is "negative if rating < 3

In [12]:
# Function to define the sentiment base on the rating value

def get_sentiment(rating):
    if rating >= 4:
        return "positive"
    elif rating == 3:
        return "neutral"
    else:
        return "negative"

In [14]:
# Function to clean, process and enrich raw review data
def process_reviews(raw_file):
    try:
        df = pd.read_csv(raw_file)
        logging.info(f"Successfully loaded {len(df)} rows of data.")
    except Exception as e:
        logging.error(f"Error loading raw data: {e}")
        return None

    try:
        df["experienceDate"] = pd.to_datetime(df["experienceDate"])
        df["experienceDate"] = df["experienceDate"].dt.tz_localize(None)
        df["createdDateTime"] = pd.to_datetime(df["createdDateTime"])
        df["createdDateTime"] = df["createdDateTime"].dt.tz_localize(None)
        df["publishedDate"] = pd.to_datetime(df["publishedDate"])
        df["publishedDate"] = df["publishedDate"].dt.tz_localize(None)
        df["reviewExperienceDelay"] = (df["createdDateTime"] - df["experienceDate"]).dt.total_seconds() / 60
        try:
            df["replyPublishedDate"] = pd.to_datetime(df["replyPublishedDate"])
        except:
            df["replyPublishedDate"] = None
        logging.info("Date formats standardized successfully.")
    except Exception as e:
        logging.error(f"Error standardizing date formats: {e}")
        return None

    try:
        df["date"] = pd.to_datetime(df["createdDateTime"]).dt.date
        df["year"] = pd.to_datetime(df["createdDateTime"]).dt.year
        df["yearQuarter"] = (
            pd.to_datetime(df["createdDateTime"]).dt.year.astype(str)
            + "-Q"
            + pd.to_datetime(df["createdDateTime"]).dt.quarter.astype(str)
        )
        df["yearMonth"] = pd.to_datetime(df["createdDateTime"]).dt.strftime("%Y" +"-"+ "%m")

        df["month"] = pd.to_datetime(df["createdDateTime"]).dt.month
        df["monthName"] = pd.to_datetime(df["createdDateTime"]).dt.month_name()
        df["day"] = pd.to_datetime(df["createdDateTime"]).dt.day
        df["dayName"] = pd.to_datetime(df["createdDateTime"]).dt.day_name()
        df["hour"] = pd.to_datetime(df["createdDateTime"]).dt.hour
        try:
            df["replyYear"] = pd.to_datetime(df["replyPublishedDate"]).dt.year
            df["replyMonth"] = pd.to_datetime(df["replyPublishedDate"]).dt.month
            df["replyDay"] = pd.to_datetime(df["replyPublishedDate"]).dt.day
            df["replyHour"] = pd.to_datetime(df["replyPublishedDate"]).dt.hour
        except:
            df["replyYear"] = None
            df["replyMonth"] = None
            df["replyDay"] = None
            df["replyHour"] = None
        logging.info("Temporal features extracted successfully.")
    except Exception as e:
        logging.error(f"Error extracting temporal features: {e}")
        return None
    
    # Add columns for review length and number of words
    try:
        df["reviewLength"] = df["review"].str.len()
        df["titleLength"] = df["title"].str.len()
    except Exception as e:
        logging.error(f"Error adding review length column: {e}")
        return None

    try:
        initial_rows = len(df)
        df.dropna(
            inplace=True,
            subset=["id", "review", "rating", "experienceDate", "createdDateTime", "publishedDate"],
        )
        removed_rows = initial_rows - len(df)
        logging.info(f"Removed {removed_rows} rows with missing values. Remaining rows: {len(df)}")
    except Exception as e:
        logging.error(f"Error removing rows with missing values: {e}")
        return None

    try:
        initial_rows = len(df)
        df = df[df["rating"].isin([1, 2, 3, 4, 5])]
        removed_rows = initial_rows - len(df)
        logging.info(f"Removed {removed_rows} rows with invalid ratings. Remaining rows: {len(df)}")
    except Exception as e:
        logging.error(f"Error removing rows with invalid ratings: {e}")
        return None

    try:
        df["sentiment"] = df["rating"].apply(lambda x: "positive" if x >= 4 else ("neutral" if x == 3 else "negative"))
        logging.info("Sentiment column added successfully.")
    except Exception as e:
        logging.error(f"Error adding sentiment column: {e}")
        return None

    try:
        df["review"] = df["review"].apply(clean_text)
        logging.info("Review text cleaned successfully.")
    except Exception as e:
        logging.error(f"Error cleaning review text: {e}")
        return None

    try:
        initial_rows = len(df)
        df = df[df["review"].str.len() > 4]
        removed_rows = initial_rows - len(df)
        logging.info(f"Removed {removed_rows} short reviews. Remaining rows: {len(df)}")
    except Exception as e:
        logging.error(f"Error removing short reviews: {e}")
        return None

    try:
        output_dir = CLEANED_DATA_DIR
        output_dir.mkdir(parents=True, exist_ok=True)
        timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
        cleaned_file_path = output_dir / f"cleaned_reviews_{timestamp}.csv"
        df.to_csv(cleaned_file_path, index=False)
        logging.info(f"Cleaned data saved to: {cleaned_file_path}")
    except Exception as e:
        logging.error(f"Error saving cleaned data: {e}")
        return None

    return str(cleaned_file_path)


In [None]:
# Processed raw review
# file_path = process_reviews("../data/raw/concat_reviews.csv")
# print(file_path)

2025-03-30 15:41:24,685 - INFO - Successfully loaded 24563 rows of data.
2025-03-30 15:41:24,723 - INFO - Date formats standardized successfully.
2025-03-30 15:41:24,820 - INFO - Temporal features extracted successfully.
2025-03-30 15:41:24,831 - INFO - Removed 0 rows with missing values. Remaining rows: 24563
2025-03-30 15:41:24,834 - INFO - Removed 0 rows with invalid ratings. Remaining rows: 24563
2025-03-30 15:41:24,836 - INFO - Sentiment column added successfully.
2025-03-30 15:41:26,940 - INFO - Review text cleaned successfully.
2025-03-30 15:41:26,946 - INFO - Removed 59 short reviews. Remaining rows: 24504
2025-03-30 15:41:27,130 - INFO - Cleaned data saved to: ../data/cleaned/cleaned_reviews_20250330154126.csv


../data/cleaned/cleaned_reviews_20250330154126.csv


### Loading cleaned review into the full review base

After cleaning and processing, the extracted raw reviews are consolidated into the full review database.
Before consolidation, the pipeline checks for the presence of new data. If no new data is found, the full review database remains unchanged, and the pipeline stops at this stage.
Once the loading process is complete, the cleaned review file is archived to ensure proper organization and to maintain a history of processed data.

In [17]:
# function to load cleaned and process reviews data into the review database

# Named constants for return values
SUCCESS = 1
NO_UPDATES = 0
ERROR = None

def ensure_directory_exists(path: Path):
    """Ensure the given directory exists."""
    path.mkdir(parents=True, exist_ok=True)

def get_file_path(directory: Path, filename: str) -> Path:
    """Construct a file path from a directory and filename."""
    return directory / filename

def calculate_percentage_increase(initial: int, final: int) -> float:
    """Calculate the percentage increase between two values."""
    return 100 if initial == 0 else ((final - initial) / initial * 100)

def load_reviews(cleaned_file: str, full_reviews_dir: Path = FULL_REVIEWS_DIR, archive_dir: Path = ARCHIVE_DIR) -> int:
    """
    Load cleaned reviews into the full database.

    Args:
        cleaned_file (str): Path to the cleaned reviews CSV file.
        full_reviews_dir (Path): Directory for the full reviews file.
        archive_dir (Path): Directory for archived files.

    Returns:
        int: SUCCESS (1) if new records were added, NO_UPDATES (0) if no updates were made, ERROR (None) if an error occurred.
    """
    try:
        # Ensure directories exist
        ensure_directory_exists(full_reviews_dir)
        ensure_directory_exists(archive_dir)

        # Load cleaned reviews
        df = pd.read_csv(cleaned_file)
        if df.empty:
            logging.warning("Input DataFrame is empty")
            return NO_UPDATES

        # Define file paths
        full_reviews_path = get_file_path(full_reviews_dir, "full_reviews.csv")
        df_full = pd.DataFrame()

        # Load existing data if it exists
        if full_reviews_path.exists():
            try:
                df_full = pd.read_csv(full_reviews_path, low_memory=False)
                logging.info(f"Loaded existing data from {full_reviews_path}")
            except pd.errors.EmptyDataError:
                logging.warning(f"Empty CSV file found at {full_reviews_path}")
            except pd.errors.ParserError:
                logging.error(f"Parsing error in {full_reviews_path}")
        else:
            logging.info(f"No existing file found at {full_reviews_path}, initializing empty DataFrame")

        # Update full reviews
        initial_length = len(df_full)
        df_full_updated = pd.concat([df_full, df], ignore_index=True).drop_duplicates(subset=df.columns, keep="last")
        final_length = len(df_full_updated)
        new_records_added = final_length - initial_length

        logging.info(f"Original records: {initial_length}")
        logging.info(f"Updated records: {final_length}")
        logging.info(f"New records added: {new_records_added}")

        if new_records_added > 0:
            percentage_increase = calculate_percentage_increase(initial_length, final_length)
            logging.info(f"Percentage increase in data: {percentage_increase:.2f}%")

            # Backup existing data
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            backup_path = get_file_path(full_reviews_dir, f"full_reviews_backup_{timestamp}.csv")
            df_full.to_csv(backup_path, index=False)
            logging.info(f"Backup created at {backup_path}")

            # Save updated data
            df_full_updated.to_csv(full_reviews_path, index=False)
            logging.info(f"Updated data saved to {full_reviews_path}")

            return SUCCESS
        else:
            logging.info("No new reviews to process. Exiting without updates.")
            return NO_UPDATES

    except Exception as e:
        logging.error(f"Error loading reviews: {e}")
        return ERROR


In [None]:
# Application
# result = load_reviews(file_path)
# print(result)

2025-03-30 15:43:11,419 - INFO - No existing file found at ../data/full/full_reviews.csv, initializing empty DataFrame
2025-03-30 15:43:11,440 - INFO - Original records: 0
2025-03-30 15:43:11,441 - INFO - Updated records: 24504
2025-03-30 15:43:11,441 - INFO - New records added: 24504
2025-03-30 15:43:11,441 - INFO - Percentage increase in data: 100.00%
2025-03-30 15:43:11,442 - INFO - Backup created at ../data/full/full_reviews_backup_20250330_154311.csv
2025-03-30 15:43:11,575 - INFO - Updated data saved to ../data/full/full_reviews.csv


1


### Application

In [None]:
# application
# if __name__ == "__main__":
    
#     raw_file_path = extract_reviews(company_name=COMPANY_NAME)
#     cleaned_file = process_reviews(raw_file_path)
#     print(f"\n cleaned_file_path: {cleaned_file} \n")

#     df_clean = pd.read_csv(cleaned_file)
#     print(df_clean.info())
#     print(df_clean.head())

#     print("\n load reviews: \n")
#     load_reviews(cleaned_file)
#     df_full = pd.read_csv("../data/full/full_reviews.csv")
#     df_full.info()
#     df_full.head()


2025-03-27 09:11:52,163 - INFO - Processing page 3210
2025-03-27 09:11:52,661 - INFO - Processing page 3209
2025-03-27 09:11:53,188 - INFO - Processing page 3208
2025-03-27 09:11:53,711 - INFO - Processing page 3207
2025-03-27 09:11:54,236 - INFO - Processing page 3206
2025-03-27 09:11:54,792 - INFO - Processing page 3205
2025-03-27 09:11:55,878 - INFO - Processing page 3204
2025-03-27 09:11:57,394 - INFO - Processing page 3203
2025-03-27 09:11:57,905 - INFO - Processing page 3202
2025-03-27 09:11:58,437 - INFO - Processing page 3201
2025-03-27 09:11:58,963 - INFO - Saved reviews data to ../data/raw/raw_reviews_3201-3210.csv
2025-03-27 09:11:58,964 - INFO - Wrote new max page value '3200' to ../parameters/max_page.txt
2025-03-27 09:11:58,964 - INFO - Finished processing pages 3200-3210
2025-03-27 09:11:58,964 - INFO - Loading raw data from: ../data/raw/raw_reviews_3201-3210.csv
2025-03-27 09:11:58,967 - INFO - Successfully loaded 200 rows of data.
2025-03-27 09:11:58,967 - INFO - Stand


 cleaned_file_path: ../data/cleaned/cleaned_reviews_20250327091159.csv 

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 198 entries, 0 to 197
Data columns (total 22 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   id                     198 non-null    object 
 1   title                  198 non-null    object 
 2   review                 198 non-null    object 
 3   rating                 198 non-null    int64  
 4   reply                  17 non-null     object 
 5   experienceDate         198 non-null    object 
 6   createdDateTime        198 non-null    object 
 7   publishedDate          198 non-null    object 
 8   replyPublishedDate     17 non-null     object 
 9   reviewExperienceDelay  198 non-null    float64
 10  year                   198 non-null    int64  
 11  yearQuarter            198 non-null    object 
 12  month                  198 non-null    int64  
 13  monthName              198 non-null 