In [79]:
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from urllib.parse import urlparse
from collections import deque
import time
import random
import csv
import os
import pandas as pd
import re 
import token
from nltk.corpus import stopwords
import nltk
from nltk.stem import WordNetLemmatizer
import string
from collections import Counter
import numpy as np

In [39]:
# --- CONFIGURATION OF COLORS ---

GREEN = "\033[92m"
RED = "\033[91m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
RESET = "\033[0m"

In [40]:
HEADER = {
    "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0",
    "Accept-Language": "en-US,en;q=0.5",
    "Referer": "https://www.google.com/"}

session = requests.Session()
session.headers.update(HEADER)

In [41]:
Filters_MBG = [r'/products?/', r'/shop/', r'/search?', r'/store/', r'/cart', r'/checkout', r'/account', 
           r'/login', r'/register', r'/tag', r'/search', r'#', "youtube.com", 
           "instagram.com", "facebook.com", "twitter.com", "pinterest.com", "linkedin.com", "tiktok.com", 
           "amazon.com", "google.com"]

In [42]:
def fetch_verify_url(url) :
    try:
        response = session.get(url, timeout=10)
        if response.status_code != 200:  # If the status code is not OK (200) the function return none and an error message
            print(f"Failed to fetch the url: {url} with status code {response.status_code}")
            return None
        return response  
    except requests.RequestException:
        return None

In [43]:
def to_soup(url):
    response = fetch_verify_url(url)
    if response:  # If the response is not none, the function return the beautiful soup object
        return BeautifulSoup(response.text, 'html.parser') 
    else:
        return None

In [None]:
def extract_MBG_links(url_MBG, max_levels=3):
    queue = deque([(url_MBG, 0)])
    links_MBG = set([url_MBG])
    start_domain = urlparse(url_MBG).netloc

    while queue:
        current_url, current_level = queue.popleft()

        # Si on atteint le niveau max, on arrête de creuser à partir d'ici
        if current_level >= max_levels:
            continue
    
        soup = to_soup(current_url)
        if not soup:
            continue

        content_area = soup.find('div', id='content') or soup.body
        if not content_area:
            continue

        for item in content_area.find_all('a', href=True):
            href = item.get('href')
            full_url = urljoin(current_url, href)
                    
            # Si nouveau lien trouvé
            if any(filter in full_url for filter in Filters_MBG):
                continue
            
            if full_url not in links_MBG:
                links_MBG.add(full_url)

                # On ajoute à la queue SEULEMENT si c'est interne (pour continuer le crawl)
                if urlparse(full_url).netloc == start_domain:
                    queue.append((full_url, current_level + 1))
                    
    return links_MBG

ALL_MBG_LINKS = extract_MBG_links("https://www.mindbodygreen.com/")
print(ALL_MBG_LINKS)
print(len(ALL_MBG_LINKS))

In [None]:
def create_filepath():

    user_filename = input("Enter the name of the file to save (e.g. : test) : ").strip() #strip deletes spaces before and after the string
    
    if not user_filename : 
        user_filename = "test"
    
    if not user_filename.endswith('.csv'): # Verify if the filename ends with .csv
        user_filename += '.csv' # Append .csv if not present
    filename = os.path.join("data", user_filename) # Construct the full file path

    # Create directory if not exists
    directory = os.path.dirname(filename)
    if directory and not os.path.exists(directory):
        os.makedirs(directory)
        print(f"{GREEN}Created directory : {os.path.abspath(directory)}{RESET}") # Full path info
    print(f'{BLUE}Output will be saved to : "{filename}"{RESET}')

    # Avertissement si écrasement (juste pour info)
    if os.path.exists(filename):
        print("{BLUE}Note : The file is overwritten{RESET}")
    
    return filename


In [None]:
def get_html_content(links):
    content = []
    failed_links = []
    #filename = create_filepath()

    for link in links:
        response = fetch_verify_url(link)
        if response:
            content.append({'url': link, 'html': response.text})
        else:
            failed_links.append(link)

    print(f"{GREEN}Fetched: {len(content)} {BLUE}|{RED} Failed: {len(failed_links)}{RESET}")

    return content, failed_links 

In [None]:
def save_to_csv(data, filename):
    if not data:
        print(f"{RED}No data to save.{RESET}")
        return

    # Get the keys from the first dictionary for the CSV header from get_html_content
    fieldnames = data[0].keys() # Detection of the existing colons in the data file
    
    try:
        with open(filename, 'w', newline='', encoding='utf-8') as f: # Opens the csv file as utf-8
            writer = csv.DictWriter(f, fieldnames=fieldnames, quoting=csv.QUOTE_ALL) # Initialising a writer to write the dictionary into the csv file
            writer.writeheader() # writes the colons headers
            writer.writerows(data) # writes the rows
        print(f"{GREEN}Success! Data saved to: {os.path.abspath(filename)}{RESET}")
        
    except Exception as e:
        print(f"{RED}Error saving file: {e}{RESET}")

In [None]:
def get_name_files() :
    print("Files in data directory :")
    print(os.listdir("data"))

print(get_name_files())

In [None]:
def get_corpus(html):
    if not html: 
        return ""

    soup = BeautifulSoup(html, "html.parser")

    for tag in soup(["script", "style", "nav", "noscript"]):
        tag.decompose()

    raw_text = soup.get_text(separator=' ', strip=True)
    
    text = ' '.join(raw_text.split())

    return text

In [None]:
def corpus_csv_file(input_csv, output_csv):
    df = pd.read_csv(input_csv)  # Take the csv file with the raw html as the input

    if 'html' not in df.columns:  # Verify that the html colon exists
        raise ValueError(f"The html column is missing in: {input_csv}")

    df['corpus_text'] = df['html'].apply(get_corpus)  # Cleans the html column
    df = df[['url', 'corpus_text']]  # Keep the url and text colon (not the raw html)

    df.to_csv(output_csv, index=False, encoding='utf-8')  # Creats a new csv file as the output of the function

    return df

In [None]:
"""SCRAP URL : https://www.mindbodygreen.com/"""

# 1. Create the filename ("FINAL_MBG")
target_filename = create_filepath()

# 2. Extract the data
content, failed_links, filename = get_html_content(list(ALL_MBG_LINKS)) #(I did it in a previous cell)

# 3. Save the data to the file
save_to_csv(content, target_filename)
df = pd.read_csv(target_filename)  # df is now a table with the data from the CSV file

In [None]:
def normalize_html(text):
    text = text.lower()  # convert all letters to lowercase
    text = re.sub(r'\[\d+\]', ' ', text)  # remove reference numbers like [1], [2], etc.
    text = re.sub(r'\d+', ' ', text)  # remove all numbers
    text = re.sub(r'[^a-z\s]', ' ', text)  # keep only English letters and spaces
    text = re.sub(r'\s+', ' ', text)  # replace multiple spaces with a single space
    return text.strip()  # remove leading and trailing spaces

In [None]:
def normalize_csv_file(input_csv, output_csv):
    df = pd.read_csv(input_csv)  # Take the csv file with the cleaned text as the input

    if 'cleaned_text' not in df.columns:  # Verify that the cleaned text colon exists
        raise ValueError(f"The cleaned text is missing in: {input_csv}")

    df['normalized_text'] = df['cleaned_text'].apply(normalize_html)  # normalize the cleaned text
    df = df[['url', 'normalized_text']]  # keep the url and normalized text 

    df.to_csv(output_csv, index=False, encoding='utf-8')  # creats a new csv file as the output of the function

    return df

cleaned_MBG_csv = "data/mbg_cleaned_pages.csv"

In [None]:
stop_words = list(set(stopwords.words('english'))) + ["'s"]

stop_words = set(stopwords.words('english'))

#stem = nltk.stem.SnowballStemmer("english")
lemmatizer = WordNetLemmatizer()

def tokenize_html(text):
    text = text.lower()
    tokens = nltk.word_tokenize(text)
    tokens = [token for token in tokens if token not in string.punctuation]  # remove punctuation
    tokens = [token for token in tokens if token not in stop_words]  # remove stopwords
    #tokens = [stem.stem(token) for token in tokens]  # apply stemming (racinisation)
    tokens = [lemmatizer.lemmatize(token) for token in tokens]  # apply lemmatization
    tokens = [lemmatizer.lemmatize(token, pos='v') for token in tokens]
    return tokens

In [None]:
def tokenize_csv_file(input_csv, output_csv):
    df = pd.read_csv(input_csv)  # Take the csv file with the normalized text as the input

    if 'normalized_text' not in df.columns:  # Verify that the normalized text colon exists
        raise ValueError(f"The normalized text is missing in: {input_csv}")

    df['tokenized_text'] = df['normalized_text'].apply(tokenize_html)  # cleans the html colon
    df = df[['url', 'tokenized_text']]  # keep the url and text colon (not the raw html)

    df.to_csv(output_csv, index=False, encoding='utf-8')  # creats a new csv file as the output of the function

    return df

In [None]:
#content, failed, raw_filename = get_html_content(list(ALL_MBG_LINKS), max=10) déjà run 

if raw_filename: 
    print(f"Fichier brut généré : {raw_filename}")
    
    # 2. GENERATION DU CORPUS
    # On déduit le nom du fichier de sortie (ex: 'data/mbg_raw_corpus.csv')
    corpus_filename = raw_filename.replace(".csv", "_corpus.csv")
    
    print(f"Traitement du corpus vers -> {corpus_filename} ...")
    
    # Appel de ta fonction avec le nom qu'on vient de récupérer
    df_corpus = corpus_csv_file(raw_filename, corpus_filename)
    
    print("✅ Corpus généré avec succès !")
    print(df_corpus.head())
else:
    print("❌ Pas de fichier brut, impossible de faire le corpus.")

In [None]:
corpus_wiki_csv = corpus_csv_file("wikipedia_lifestyle_content.csv", "wikipedia_lifestyle_corpus.csv")