In [1]:
#
# # Required Libraries
#

# E
import requests
from bs4 import BeautifulSoup
import time

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# T
import pandas as pd
import re

# R
import sqlite3
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
from tabulate import tabulate
import itertools

In [None]:
#############################################################################################
currentFileVersion = '0.0.1' 
print("{} ETL functions def file version v{} {}\n".format("*"*30, currentFileVersion, '*'*30))
#############################################################################################

In [8]:

def extract_phone_details_selenium(url, main_content_selector, brand_selector, model_family_selector, star_rating_selector, rating_value_selector, num_pages):
    """
    Extracts the phone details from the specified URL using the specified CSS selectors and locator strategies.
    
    Args:
    url (str): The URL of the web page to extract phone details from.
    main_content_selector (str): The CSS selector for the main content-holding element on the web page.
    brand_selector (str): The CSS selector for the element containing the phone brand name.
    model_family_selector (str): The CSS selector for the element containing the phone model family.
    star_rating_selector (str): The CSS selector for the element containing the phone star rating.
    rating_value_selector (str): The CSS selector for the element containing the phone rating value.
    upfront_cost_selector (str): The CSS selector for the element containing the phone upfront cost.
    upfront_pence_selector (str): The CSS selector for the element containing the pence value of the phone upfront cost.
    monthly_cost_val_selector (str): The CSS selector for the element containing the phone monthly cost value.
    monthly_pence_selector (str): The CSS selector for the element containing the pence value of the phone monthly cost.
    
    Returns:
    list: A list of dictionaries containing the extracted phone details.
    """
    # Create a new Chrome browser instance
    driver = webdriver.Chrome()

    for page_num in range(1, num_pages+1):
        # construct the URL for the current page
        page_url = f"{url}&page={page_num}"
        print(page_url)
        # Navigate to the page
        driver.get(url)

        # Wait for the page to load
        driver.implicitly_wait(10)

        # Find all main content-holding elements on the page
        main_contents = driver.find_elements(By.CSS_SELECTOR, main_content_selector)

        # Extract the phone details from each main content-holding element
        phone_details_list = []
        for main_content in main_contents:
            # Extract the brand name
            try:
                brand = main_content.find_element(By.CSS_SELECTOR, brand_selector).text
            except KeyError:
                brand = None
            if brand is None:
                continue

            # Extract the device model family
            try:
                model_family = main_content.find_element(By.CSS_SELECTOR, model_family_selector).text
            except KeyError:
                model_family = None
            if model_family is None:
                continue

            # Extract the star rating
            try:
                star_rating = main_content.find_element(By.CSS_SELECTOR, star_rating_selector).get_attribute("aria-label")
            except KeyError:
                star_rating = None
            if star_rating is None:
                continue

            # Extract the rating value
            try:
                rating_value = main_content.find_element(By.CSS_SELECTOR, rating_value_selector).text
            except KeyError:
                rating_value = None
            if rating_value is None:
                continue

            # # Wait for the upfront cost element to be visible and extract its value
            # upfront_cost_elem = WebDriverWait(main_content, 10).until(EC.visibility_of_element_located((By.CSS_SELECTOR, upfront_cost_selector)))
            
            # upfront_cost_elem = main_content.find_element(By.CSS_SELECTOR, upfront_cost_selector).text
            # upfront_cost = upfront_cost_elem.text.strip()



            # Add the extracted data to the phone details list
            phone_details_list.append({
                "Brand": brand,
                "Model Family": model_family,
                "Star Rating": star_rating,
                "Rating Value": rating_value
                #"Upfront Cost": upfront_cost

            })

    # Close the browser instance
    driver.quit()

    # Return the extracted data as a list of dictionaries
    return phone_details_list



def extract_phone_details_BAK(url, main_content_selector, brand_selector, model_family_selector, star_rating_selector, rating_value_selector, num_pages):
    """
    Extracts the phone details from the specified URL using the specified CSS selectors and locator strategies.
    
    Args:
    url (str): The URL of the web page to extract phone details from.
    main_content_selector (str): The CSS selector for the main content-holding element on the web page.
    brand_selector (str): The CSS selector for the element containing the phone brand name.
    model_family_selector (str): The CSS selector for the element containing the phone model family.
    star_rating_selector (str): The CSS selector for the element containing the phone star rating.
    rating_value_selector (str): The CSS selector for the element containing the phone rating value.
    num_pages (int): The number of pages to scrape.
    
    Returns:
    list: A list of dictionaries containing the extracted phone details.
    """
    phone_details_list = []
    for page_num in range(1, num_pages+1):
        # construct the URL for the current page
        page_url = f"{url}&page={page_num}"
        #print(page_url)
        
        # make the HTTP request and get the HTML content
        response = requests.get(page_url)
        soup = BeautifulSoup(response.content, 'html.parser')
        
        # find all main content-holding elements on the page
        main_contents = soup.select(main_content_selector)
        
        # extract the phone details from each main content-holding element
        for main_content in main_contents:
            # extract the brand name
            brand = main_content.select_one(brand_selector).text.strip()

            # extract the device model family
            model_family = main_content.select_one(model_family_selector).text.strip()

            # extract the star rating
            star_rating = main_content.select_one(star_rating_selector)
            if star_rating:
                star_rating = star_rating.get('aria-label')
            else:
                star_rating = None

            # extract the rating value
            rating_value = main_content.select_one(rating_value_selector).text.strip()

            # add the extracted data to the phone details list
            phone_details_list.append({
                "Brand": brand,
                "Model Family": model_family,
                "Star Rating": star_rating,
                "Rating Value": rating_value
            })
    
    return phone_details_list



def extract_phone_details(url, main_content_selector, brand_selector, model_family_selector, 
                            star_rating_selector, rating_value_selector, num_pages,
                            upf_price, upf_pence, mth_price, mth_pence):
    """
    Extracts the phone details from the specified URL using the specified CSS selectors and locator strategies.
    
    Args:
    url (str): The URL of the web page to extract phone details from.
    main_content_selector (str): The CSS selector for the main content-holding element on the web page.
    brand_selector (str): The CSS selector for the element containing the phone brand name.
    model_family_selector (str): The CSS selector for the element containing the phone model family.
    star_rating_selector (str): The CSS selector for the element containing the phone star rating.
    rating_value_selector (str): The CSS selector for the element containing the phone rating value.
    num_pages (int): The number of pages to scrape.
    upf_price (str): The CSS selector for the upfront cost price.
    upf_pence (str): The CSS selector for the upfront cost pence.
    mth_price (str): The CSS selector for the monthly cost price.
    mth_pence (str): The CSS selector for the monthly cost pence.
    
    Returns:
    list: A list of dictionaries containing the extracted phone details.
    """
    phone_details_list = []
    for page_num in range(1, num_pages+1):
        # construct the URL for the current page
        page_url = f"{url}&page={page_num}"
        #print(page_url)
        
        # make the HTTP request and get the HTML content
        response = requests.get(page_url)
        soup = BeautifulSoup(response.content, 'html.parser')
        
        # find all main content-holding elements on the page
        main_contents = soup.select(main_content_selector)
        
        # extract the phone details from each main content-holding element
        for main_content in main_contents:
            # extract the brand name
            brand = main_content.select_one(brand_selector).text.strip()

            # extract the device model family
            model_family = main_content.select_one(model_family_selector).text.strip()

            # extract the star rating
            star_rating = main_content.select_one(star_rating_selector)
            if star_rating:
                star_rating = star_rating.get('aria-label')
            else:
                star_rating = None

            # extract the rating value
            rating_value = main_content.select_one(rating_value_selector).text.strip()

            time.sleep(.5)
            # extract the upfront cost and monthly cost
            upfront_price_elem = main_content.select_one(upf_price)
            upfront_price = upfront_price_elem.text.strip() if upfront_price_elem else None
            upfront_pence_elem = main_content.select_one(upf_pence)
            upfront_pence = upfront_pence_elem.text.strip() if upfront_pence_elem else None
            monthly_price_elem = main_content.select_one(mth_price)
            monthly_price = monthly_price_elem.text.strip() if monthly_price_elem else None
            monthly_pence_elem = main_content.select_one(mth_pence)
            monthly_pence = monthly_pence_elem.text.strip() if monthly_pence_elem else None

            # add the extracted data to the phone details list
            phone_details_list.append({
                "Brand": brand,
                "Model Family": model_family,
                "Star Rating": star_rating,
                "Rating Value": rating_value,
                "Upfront Price": upfront_price,
                "Upfront Pence": upfront_pence,
                "Monthly Price": monthly_price,
                "Monthly Pence": monthly_pence
            })
    
    return phone_details_list


#### Pipeline reporting

In [2]:

def identify_outliers(phone_details_list, max_model_family_length=50, min_rating_value=0, max_rating_value=5):
    """
    Identifies outliers in the phone details list based on various criteria.

    Args:
    phone_details_list (list): A list of dictionaries containing phone details.
    max_rating_value (float): The maximum allowable rating value. Default is 5.
    max_model_family_length (int): The maximum allowable model family length. Default is 50.
    min_rating_value (float): The minimum allowable rating value. Default is 0.
    max_rating_value (float): The maximum allowable rating value. Default is 5.

    Returns:
    list: A list of dictionaries containing the outlier phone details.
    """
    outlier_phone_details_list = []

    for phone_details in phone_details_list:
        # check if the rating value falls outside the specified range
        rating_value = float(phone_details['Rating Value'])
        if rating_value < min_rating_value or rating_value > max_rating_value:
            outlier_phone_details_list.append(phone_details)

        # check if the model family length is greater than the maximum allowable length
        model_family = phone_details['Model Family']
        if len(model_family) > max_model_family_length:
            outlier_phone_details_list.append(phone_details)


    return outlier_phone_details_list



def time_extract_phone_details():
    urlBase = "https://www.o2.co.uk/shop/phones"
    urlSub = "#sort=content.sorting.featured&page="

    main_content_selector = ".device-info-content"
    brand_selector = '[data-qa-device-title] [data-qa-device-brand]'
    model_family_selector = '[data-qa-device-title] [data-qa-device-modelfamily]'
    star_rating_selector = ".star-rating__stars"
    rating_value_selector = ".device-rating__text [itemprop='ratingValue']"
    num_pages = 3

    return extract_phone_details(
        urlBase+urlSub,
        main_content_selector,
        brand_selector,
        model_family_selector,
        star_rating_selector,
        rating_value_selector,
        num_pages
    )

#### Cleaning

In [None]:
def remove_duplicates(phone_details_list, de_dup_cols_list):
    # convert list of dictionaries to a pandas DataFrame
    df = pd.DataFrame(phone_details_list)
    
    # drop duplicate rows based on the "Brand" and "Model Family" fields
    df_unique = df.drop_duplicates(subset=de_dup_cols_list)
    
    # convert the unique DataFrame back to a list of dictionaries
    unique_phone_details_list = df_unique.to_dict('records')
    
    return unique_phone_details_list



def capitalise_phone_details(phone_details_list):
    """
    This function capitalises the words in the 'Brand' and 'Model Family' keys
    of a list of dictionaries, excluding words such as 'in', 'on', 'the', 
    and the word 's. It also correctly handles words with apostrophes and hyphenated words.
    
    Input:
    phone_details_list (list): the list of dictionaries to modify.
    
    Output:
    The modified list of dictionaries with the 'Brand' and 'Model Family' keys
    in title case.
    """

    # list of 'words' to NOT capitalise (known connectives etc)
    words_to_exclude = ['in', 'on', 'the', 'de', 'le','la', 'en', 'by']

    for phone_details in phone_details_list:
        for key in ["Brand", "Model Family"]:
            if key in phone_details:
                def capitalise_words(s):
                    words = s.split(' ')
                    capitalised_words = []
                    for word in words:
                        if word.lower() in words_to_exclude:
                            # Don't capitalise words in the list
                            capitalised_words.append(word)
                        elif "'" in word:
                            apostrophe_index = word.index("'")
                            if word[apostrophe_index+1:].lower() == 's':
                                # Don't capitalise 's following an apostrophe
                                capitalised_words.append(word[:apostrophe_index+1] + word[apostrophe_index+1:])
                            else:
                                capitalised_words.append(word[:apostrophe_index+1] + word[apostrophe_index+1:].capitalize())
                        else:
                            # Capitalise each part of hyphenated words, but retain the hyphen
                            hyphenated_parts = word.split('-')
                            if all(part.lower() in words_to_exclude for part in hyphenated_parts):
                                # Don't capitalise any parts of hyphenated words if all parts are in the list
                                capitalised_parts = hyphenated_parts
                            elif any(part.lower() not in words_to_exclude for part in hyphenated_parts):
                                # Capitalise hyphenated parts that are not in the list
                                capitalised_parts = [part.capitalize() if part.lower() not in words_to_exclude else part for part in hyphenated_parts]
                            else:
                                capitalised_parts = hyphenated_parts
                            capitalised_words.append('-'.join(capitalised_parts))

                    return ' '.join(capitalised_words)

                phone_details[key] = capitalise_words(phone_details[key])

    return phone_details_list



def clean_phone_details_list(phone_details_list):
    """
    Cleans the values in phone_details_list by removing leading/trailing spaces and multiple spaces.
    
    Args:
    phone_details_list (list): A list of dictionaries containing phone details.
    
    Returns:
    list: The cleaned list of dictionaries.
    """
    cleaned_phone_details_list = []
    
    for phone_details in phone_details_list:
        cleaned_phone_details = {}
        for key, value in phone_details.items():

            # Need the none check as some such data coming / so .split() fails 
            if value is not None:
                # Remove leading/trailing spaces and multiple spaces from value
                cleaned_value = ' '.join(value.split())
                cleaned_phone_details[key] = cleaned_value
            else:
                cleaned_phone_details[key] = None
        cleaned_phone_details_list.append(cleaned_phone_details)
    
    return cleaned_phone_details_list



def lowercase_phone_details(phone_details_list):
    """
    This function takes a list of phone details dictionaries and converts
    all the string values to lowercase.
    """
    for phone_details in phone_details_list:
        for key, value in phone_details.items():
            if isinstance(value, str):
                phone_details[key] = value.lower()
    return phone_details_list



def remove_dodgy_chars(phone_details_list, unwanted_chars):
    """
    Removes unwanted characters from the 'Brand' and 'Model Family' key values of the input phone details list.
    
    Args:
    phone_details_list (list): A list of dictionaries containing phone details.
    unwanted_chars (list): A list of unwanted characters to be removed from the phone details.
    
    Returns:
    list: A list of dictionaries containing the cleaned phone details.
    """
    cleaned_phone_details_list = []
    
    for phone_details in phone_details_list:
        cleaned_phone_details = {}
        for key, value in phone_details.items():
            if isinstance(value, str):
                cleaned_value = value
                for unwanted_char in unwanted_chars:
                    cleaned_value = cleaned_value.replace(unwanted_char, '')
                cleaned_value = ' '.join(cleaned_value.split())
                cleaned_phone_details[key] = cleaned_value.lower()
            else:
                cleaned_phone_details[key] = value
        cleaned_phone_details_list.append(cleaned_phone_details)
    
    return cleaned_phone_details_list


def format_rating_values(phone_details_list):
    for phone_details in phone_details_list:
        rating_value = phone_details.get('Rating Value')
        if rating_value is not None:
            try:
                numeric_value = float(rating_value)
                formatted_value = "{:.2f}".format(numeric_value)
                phone_details['Rating Value'] = formatted_value
            except ValueError:
                # Ignore non-numeric values
                pass
    return phone_details_list



def convert_to_float(phone_details_list, key, dec_places):
    """
    Converts the values for the specified key in a list of phone details to a float data type.
    If the value cannot be converted to a float, it is set to None.
    If the key is not found in a particular phone details dictionary, it is set to None.

    Args:
    phone_details_list (list): A list of phone details dictionaries.
    key (str): The key to convert to float.

    Returns:
    list: The modified list of phone details dictionaries.
    """
    for phone_details in phone_details_list:
        try:
            # Try to convert the value for the specified key to a float
            value = float(phone_details[key])
            # If successful, round the value to two decimal places and assign it back to the dictionary
            phone_details[key] = round(value, dec_places)
        except (ValueError, KeyError):
            # If the value cannot be converted to a float or the key is not found, set it to None
            phone_details[key] = None
            # Add a comment explaining why the value was set to None
            if key not in phone_details:
                print("Key '{}' not found in phone details: {}".format(key, phone_details))
            else:
                print("Could not convert '{}' to float for key '{}': {}".format(phone_details[key], key, phone_details))  
    return phone_details_list


def clean_phone_details_list(phone_details_list, key_names):
    """
    Cleans the passed list of phone details by converting the values for each key in the passed list of key names
    to a float, removing any non-numeric characters, and returning the cleaned list.

    Args:
    phone_details_list (list): List of dictionaries, where each dictionary represents phone details.
    key_names (list): List of key names to be cleaned.

    Returns:
    List of dictionaries, where each dictionary represents cleaned phone details.

    """

    cleaned_list = []
    for phone in phone_details_list:
        cleaned_phone = {}

        for key, value in phone.items():
            if key in key_names:
                # Remove all non-numeric characters from the value
                value = ''.join(filter(lambda x: x.isdigit() or x == '.', value))

                # Convert the value to a float
                value = float(value)

                # Add the cleaned value to the cleaned phone dictionary
                cleaned_phone[key] = value

            else:
                # If the key is not in the key names to be cleaned, simply add it to the cleaned phone dictionary
                cleaned_phone[key] = value

        # Add the cleaned phone dictionary to the cleaned list
        cleaned_list.append(cleaned_phone)

    return cleaned_list




#### Transform

In [None]:
def combine_prices(phone_details_list):
    for details in phone_details_list:
        details['Total Monthly Price'] = details['Monthly Price'] + (details['Monthly Pence'] / 100)
        details['Total Upfront Price'] = details['Upfront Price'] + (details['Upfront Pence'] / 100)
        del details['Monthly Price']
        del details['Monthly Pence']
        del details['Upfront Price']
        del details['Upfront Pence']
    return phone_details_list


def combine_prices(phone_details_list, key_names):
    for phone_details in phone_details_list:
        for key in key_names:
            if key in phone_details:
                if 'Price' in key:
                    price_key = key.replace(' ', '')  # remove space in key names
                    price_total = float(str(phone_details[f'{key}']) + str(phone_details[f'{price_key}']))
                    phone_details[f'Total {key}'] = price_total
                    del phone_details[f'{key}']
                    del phone_details[f'{price_key}']
    return phone_details_list

### Represent

In [6]:

def save_data_to_file(filename, data, file_type='csv', file_path='./'):
    """
    Saves the data to a file in the specified file type and file path.
    
    Args:
    filename (str): The name of the file to save.
    data (list): A list of dictionaries containing the data to save.
    file_type (str): The file type to save the data in (defaults to csv).
    file_path (str): The path to save the file in (defaults to current directory).
    """
    print("Exporting data to file on path : {}{}{}".format(file_path,filename,file_type))
    if file_type == 'csv':
        df = pd.DataFrame(data)
        df.to_csv(f"{file_path}/{filename}.csv", index=False)
    elif file_type == 'json':
        with open(f"{file_path}/{filename}.json", 'w') as f:
            json.dump(data, f)
    else:
        print("Invalid file type. Only CSV and JSON are supported.")




def plot_average_rating_by_brand(phone_details_list):
    # convert phone_details_list to a pandas DataFrame
    df = pd.DataFrame(phone_details_list)
    df['Rating Value'] = pd.to_numeric(df['Rating Value'])

    # create a list of unique brands
    brands = df['Brand'].unique()

    # create a dictionary of random colors for each brand
    colors = {brand: sns.color_palette("Set2", len(brands))[i] for i, brand in enumerate(brands)}

    # create a bar plot with Seaborn
    fig, ax = plt.subplots(figsize=(10, 3))
    sns.barplot(x="Brand", y="Rating Value", data=df, errcolor="grey", capsize=0.05, palette=colors, ax=ax)

    # set plot title and axis labels
    ax.set_title("Average Rating Value by Brand")
    ax.set_xlabel("Brand")
    ax.set_ylabel("Rating Value")

    # add angled x-axis labels
    ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha="right")

    # add value labels to the bars
    for p in ax.patches:
        ax.annotate("{:.2f}".format(p.get_height()), (p.get_x() + p.get_width() / 2., p.get_height()),
                    ha='center', va='center', xytext=(0, 10), textcoords='offset points')

    # adjust plot margins to ensure labels within each bar are displayed within the main plot area
    plt.margins(y=0.1)

    # display the plot
    plt.show()



def create_phone_details_table(conn):
    """
    Creates a phone_details table in an in-memory SQLite database.
    """
    c = conn.cursor()
    c.execute('''CREATE TABLE phone_details
                 (id INTEGER PRIMARY KEY AUTOINCREMENT,
                  brand TEXT,
                  model_family TEXT,
                  star_rating TEXT,
                  rating_value TEXT)''')
    conn.commit()



def insert_phone_details(conn, phone_details_list):
    """
    Inserts the phone details into the phone_details table in the in-memory SQLite database.
    
    Args:
    conn (sqlite3.Connection): SQLite database connection.
    phone_details_list (list): A list of dictionaries containing the extracted phone details.
    """
    c = conn.cursor()
    for phone_details in phone_details_list:
        c.execute("INSERT INTO phone_details (brand, model_family, star_rating, rating_value) VALUES (?, ?, ?, ?)",
                  (phone_details['Brand'], phone_details['Model Family'], phone_details['Star Rating'], phone_details['Rating Value']))
    conn.commit()