# 1. Web Scraping
This section scrapes phone specifications from GSMArena and saves them to a JSON file.

In [None]:
import aiohttp
import json
import os
from bs4 import BeautifulSoup
from tqdm.asyncio import tqdm_asyncio
from urllib.parse import urljoin
import sys

BASE_URL = "https://www.gsmarena.com"
JSON_FILE = "phone_specs.json"
HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
}

search_urls = [
    "https://www.gsmarena.com/results.php3?nPriceMin=900&nDisplayResMin=2073600&chkReview=selected&sAvailabilities=1",
    "https://www.gsmarena.com/results.php3?nPriceMin=700&nPriceMax=900&nDisplayResMin=2073600&chkReview=selected&sAvailabilities=1",
    "https://www.gsmarena.com/results.php3?nPriceMin=600&nPriceMax=700&nDisplayResMin=2073600&chkReview=selected&sAvailabilities=1",
    "https://www.gsmarena.com/results.php3?nPriceMin=500&nPriceMax=600&nDisplayResMin=2073600&chkReview=selected&sAvailabilities=1",
    "https://www.gsmarena.com/results.php3?nPriceMin=400&nPriceMax=500&nDisplayResMin=2073600&chkReview=selected&sAvailabilities=1",
    "https://www.gsmarena.com/results.php3?nPriceMin=350&nPriceMax=400&nDisplayResMin=2073600&chkReview=selected&sAvailabilities=1",
    "https://www.gsmarena.com/results.php3?nPriceMin=300&nPriceMax=350&nDisplayResMin=2073600&chkReview=selected&sAvailabilities=1",
    "https://www.gsmarena.com/results.php3?nPriceMin=250&nPriceMax=300&nDisplayResMin=2073600&chkReview=selected&sAvailabilities=1",
    "https://www.gsmarena.com/results.php3?nPriceMin=200&nPriceMax=250&nDisplayResMin=2073600&chkReview=selected&sAvailabilities=1",
    "https://www.gsmarena.com/results.php3?nYearMin=2023&nPriceMin=150&nPriceMax=200&nDisplayResMin=2073600&chkReview=selected&sAvailabilities=1",
    "https://www.gsmarena.com/results.php3?nYearMax=2023&nPriceMin=150&nPriceMax=200&nDisplayResMin=2073600&chkReview=selected&sAvailabilities=1",
    "https://www.gsmarena.com/results.php3?nPriceMax=150&nDisplayResMin=2073600&chkReview=selected&sAvailabilities=1"
]

def extract_specs_data(specs_element):
    categories = {}
    current_category = None
    for table in specs_element.find_all("table"):
        for row in table.find_all("tr"):
            category_header = row.find("th")
            if category_header and category_header.get("rowspan"):
                current_category = category_header.text.strip()
                categories[current_category] = {}

            title_cell = row.find("td", class_="ttl")
            value_cell = row.find("td", class_="nfo")

            if value_cell:
                value = ' '.join(value_cell.text.strip().split())
                key = value_cell.get("data-spec") or (title_cell.text.strip() if title_cell else None)
                if key and current_category:
                    target = categories[current_category]
                    if key in target:
                        if isinstance(target[key], list):
                            target[key].append(value)
                        else:
                            target[key] = [target[key], value]
                    else:
                        target[key] = value
    return categories

async def get_phone_links(session, url):
    try:
        async with session.get(url, timeout=10) as response:
            if response.status == 429:
                print(f"Rate limited at {url}. Exiting immediately.")
                sys.exit()
            text = await response.text()
            soup = BeautifulSoup(text, "html.parser")
            return [urljoin(BASE_URL, a["href"])
                    for div in soup.find_all("div", class_="makers")
                    for a in div.find_all("a", href=True)]
    except Exception as e:
        print(f"Error fetching links from {url}: {e}")
        return []

async def scrape_phone(session, url):
    try:
        async with session.get(url, timeout=10) as response:
            if response.status == 429:
                print(f"Rate limited at {url}. Exiting immediately.")
                sys.exit()
            text = await response.text()
            soup = BeautifulSoup(text, "html.parser")
            name_element = soup.find("h1", class_="specs-phone-name-title")
            specs_element = soup.find("div", id="specs-list")
            if not name_element or not specs_element:
                return None
            data = {"Phone Name": name_element.text.strip()}
            data.update(extract_specs_data(specs_element))
            return data
    except Exception as e:
        print(f"Error scraping {url}: {e}")
        return None

def load_json():
    if os.path.exists(JSON_FILE):
        try:
            with open(JSON_FILE, "r", encoding="utf-8") as f:
                data = json.load(f)
                existing = set(item["Phone Name"] for item in data)
                return data, existing
        except:
            return [], set()
    return [], set()

def save_json(data):
    with open(JSON_FILE, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
    print(f"Saved to {JSON_FILE}")

async def scrape_search_url(session, search_url, all_data, seen_phones):
    phone_links = await get_phone_links(session, search_url)
    print(f"Found {len(phone_links)} phone links in {search_url}.")
    scrape_tasks = [scrape_phone(session, url) for url in phone_links]
    results = await tqdm_asyncio.gather(*scrape_tasks, desc="Scraping phones", unit="phone")
    new_count = 0
    for phone_data in results:
        if phone_data and phone_data["Phone Name"] not in seen_phones:
            all_data.append(phone_data)
            seen_phones.add(phone_data["Phone Name"])
            new_count += 1
    return new_count

async def run_scraper():
    if os.path.exists(JSON_FILE):
        print(f"{JSON_FILE} already exists. Exiting.")
        return

    all_data, seen_phones = load_json()
    print(f"Loaded {len(all_data)} existing phones.")
    async with aiohttp.ClientSession(headers=HEADERS) as session:
        new_total = 0
        for search_url in search_urls:
            print(f"Processing {search_url}...")
            new_count = await scrape_search_url(session, search_url, all_data, seen_phones)
            new_total += new_count
            print(f"New phones added from this search URL: {new_count}")
    save_json(all_data)
    print(f"Done. Total phones: {len(all_data)} | Newly added: {new_total}")

await run_scraper()

# 2. Data Processing
This section processes the scraped phone data, including price conversion, feature extraction, and tier classification.


In [None]:
import os
import re
import requests
import pandas as pd
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
import warnings

# Environment and warning settings
os.environ["LOKY_MAX_CPU_COUNT"] = "4"
warnings.filterwarnings("ignore", category=UserWarning, module="joblib")
pd.options.mode.chained_assignment = None

def get_processed_phone_data():
    """
    Processes phone specification data from a JSON file, transforming and categorizing it.
    
    Returns:
        pandas.DataFrame: Processed phone data with calculated metrics and categories
    """
    # Load and normalize raw data
    df = pd.read_json("phone_specs.json")
    specs = {
        col.lower().replace(" ", "_"): 
        (pd.json_normalize(df[col]) if isinstance(df[col][0], (dict, list)) 
         else pd.DataFrame(df[col])) 
        for col in df.columns
    }
    
    # Setup currency conversion
    rates = {k: 1 / v for k, v in requests.get("https://open.er-api.com/v6/latest/USD").json()["rates"].items()}
    symbol_map = {'$': 'USD', '€': 'EUR', '£': 'GBP', '₹': 'INR'}

    def to_usd(price_string):
        """Convert price strings in various currencies to USD"""
        pattern = r'([\$€£₹])?\s?([\d,]+(?:\.\d{2})?)\s?(USD|EUR|GBP|INR)?'
        match = re.search(pattern, str(price_string))
        
        if not match:
            return None
            
        # Extract amount and convert based on currency symbol or code
        amount = float(match.group(2).replace(',', ''))
        symbol = match.group(1)
        code = match.group(3)
        currency = symbol_map.get(symbol, code)
        
        return round(amount * rates.get(currency, 1), 2)

    # Create structured dataframe with relevant fields
    data = pd.DataFrame({
        "phone_name": specs["phone_name"].squeeze(),
        "chipset": extract_chipset_info(specs["platform"]["chipset"]),
        "battery_capacity": extract_numeric_value(specs["battery"]["batdescription1"]),
        "charging_speed": extract_numeric_value(specs["battery"]["Charging"]),
        "price": specs["misc"]["price"].apply(to_usd),
        "antutu_score": extract_antutu_score(specs["tests"]["tbench"]),
        "display_type": extract_display_type(specs["display"]["displaytype"]),
        "refresh_rate": specs["display"]["displaytype"].str.extract(r"(\d+)\s*Hz")[0].astype("Int64"),
        "brightness": extract_brightness(specs)
    })

    # Format display and post-processing
    pd.set_option("display.float_format", "{:,.0f}".format)
    
    # Classify display types
    data["display_type"] = data["display_type"].apply(
        lambda x: "OLED-Based" if pd.notna(x) and "OLED" in str(x).upper() else "LCD-Based"
    )
    
    # Fill missing AnTuTu scores with chipset averages
    data["antutu_score"] = pd.to_numeric(data["antutu_score"], errors="coerce")
    data["antutu_score"] = data["antutu_score"].fillna(
        data.groupby("chipset")["antutu_score"].transform("mean")
    ).astype("Int64")
    
    # Assign phone tiers based on price clustering
    data = assign_phone_tiers(data)
    
    return data

def extract_chipset_info(chipset_series):
    """Extract and clean chipset information from raw data"""
    return chipset_series.apply(
        lambda x: ", ".join(i.split(" (")[0].strip() for i in x) if isinstance(x, list) 
        else x.split(" (")[0].strip() if isinstance(x, str) else x
    )

def extract_numeric_value(series):
    """Extract numeric values from text strings"""
    return series.str.extract(r"(\d+)")[0].fillna(0).astype(int)

def extract_antutu_score(bench_series):
    """Extract AnTuTu benchmark scores from text strings"""
    return bench_series.str.extract(r"AnTuTu:\s*(\d+)")[0].fillna(0).astype(int)

def extract_display_type(display_series):
    """Extract and standardize display type information"""
    return (display_series.str.strip()
            .str.split(",")
            .apply(lambda x: ", ".join(i.strip() for i in x) if isinstance(x, list) else x)
            .str.split(",")
            .str[0]
            .str.strip())

def extract_brightness(specs):
    """Extract brightness information from multiple possible locations"""
    from_tests = specs["tests"]["Display"].str.extract(r"(\d+)\s*nits")[0]
    from_display = specs["display"]["displaytype"].str.extract(r"(\d+)\s*nits")[0]
    return from_tests.combine_first(from_display).astype("Int64")

def assign_phone_tiers(data):
    """
    Assign phones to price tiers using KMeans clustering.
    
    Args:
        data (pd.DataFrame): Phone data with price information
        
    Returns:
        pd.DataFrame: Same dataframe with added phone_tier column
    """
    # Remove outliers for better clustering
    q1, q3 = data['price'].quantile([0.25, 0.75])
    iqr = q3 - q1
    filtered_data = data[data['price'].between(q1 - 1.5 * iqr, q3 + 1.5 * iqr)]
    
    # Perform clustering on prices
    prices = filtered_data['price'].values.reshape(-1, 1)
    scaled_prices = StandardScaler().fit_transform(prices)
    clusters = KMeans(n_clusters=3, random_state=42).fit_predict(scaled_prices)
    filtered_data['phone_tier'] = clusters
    
    # Merge cluster info back to original data
    data = data.merge(filtered_data[['phone_tier']], left_index=True, right_index=True, how='left')
    data['phone_tier'] = data['phone_tier'].fillna(-1).astype(int)
    
    # Map numeric clusters to meaningful labels
    cluster_means = data[data['phone_tier'] != -1].groupby('phone_tier')['price'].mean().sort_values()
    cluster_map = {c: ['Budget', 'Mid-Range', 'Premium'][i] for i, c in enumerate(cluster_means.index)}
    cluster_map[-1] = 'Flagship'
    data['phone_tier'] = data['phone_tier'].map(cluster_map)
    
    return data

# Process the data
phone_data = get_processed_phone_data()
phone_data.head()

# 3. MongoDB Storage
This section stores the processed phone data in a MongoDB Atlas database.


In [None]:
from pymongo import MongoClient

# MongoDB connection
cluster = MongoClient("mongodb+srv://amrm08018:ps3Agk06kUVZTh8A@cluster0.qbbbzxv.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0")
db = cluster["Web_Scraping"] # database name
collection = db["Phones"] # collection name

# Check null values
print(phone_data.isnull().sum())

# Dataframe to dictionary
phone_data_dict = phone_data.to_dict(orient="records")

# Insert data into MongoDB
try:
    collection.insert_many(phone_data_dict)
    print("Data inserted successfully into MongoDB!")
except Exception as e:
    print(f"An error occurred: {e}")

# 4. Results Visualization
![Phone Data Visualization](Web_Scraping.Phones.jpg)

# 5. Visualization
This section creates interactive visualizations using Plotly and Seaborn to analyze the processed phone data.


In [None]:
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)

import pandas as pd
import plotly.express as px
import seaborn as sns
import matplotlib.pyplot as plt
import streamlit as st
from sklearn.impute import KNNImputer
from sklearn.preprocessing import LabelEncoder

# Streamlit Setup
st.set_page_config(page_title="Phone Data Analysis", layout="wide")
st.title("Phone Data Visualizations")

# Display initial data
st.write(phone_data.head())
st.write("Data shape:", phone_data.shape)
st.write("Columns:", phone_data.columns)
st.write("Data types:", phone_data.dtypes)
st.write("Missing values:", phone_data.isnull().sum())

# Data cleaning
phone_data['price'] = phone_data.groupby('phone_tier')['price'].transform(lambda x: x.fillna(x.mean()))
phone_data['refresh_rate'] = phone_data['refresh_rate'].fillna(phone_data['refresh_rate'].mode()[0])
phone_data['brightness'] = phone_data['brightness'].astype('float')
phone_data['brightness'] = phone_data.groupby('display_type')['brightness'].transform(lambda x: x.fillna(x.mean()))

# KNN Imputation for Antutu Score
phone_data["antutu_score"] = phone_data["antutu_score"].replace(0, pd.NA)
encoder = LabelEncoder()
phone_data["phone_tier_encoded"] = encoder.fit_transform(phone_data["phone_tier"])
features = ["price", "phone_tier_encoded", "antutu_score"]
knn_imputer = KNNImputer(n_neighbors=5)
imputed_data = knn_imputer.fit_transform(phone_data[features])
phone_data["antutu_score"] = imputed_data[:, 2]
phone_data["antutu_score"] = phone_data["antutu_score"].astype(int)
phone_data = phone_data.drop(columns=["phone_tier_encoded"])
st.write("After Cleaning")
st.write("Missing values:", phone_data.isnull().sum())

tier_order = ["Budget", "Mid-Range", "Premium", "Flagship"]
phone_data["phone_tier"] = pd.Categorical(phone_data["phone_tier"], categories=tier_order, ordered=True)

# Scatter Plot for Performance vs Price
color_map = {
    'Budget': 'rgba(0, 200, 81, 0.7)',
    'Mid-Range': 'rgba(0, 122, 255, 0.7)',
    'Premium': 'rgba(138, 43, 226, 0.7)',
    'Flagship': 'rgba(255, 0, 0, 0.7)'
}

phone_data['text'] = phone_data['phone_name']

best_phones_idx = phone_data.groupby('phone_tier').apply(
    lambda group: (group['antutu_score'] / group['price']).idxmax()
)
best_phones_data = phone_data.loc[best_phones_idx]

fig = px.scatter(
    phone_data,
    x='price',
    y='antutu_score',
    color='phone_tier',
    category_orders={'phone_tier': tier_order},
    color_discrete_map=color_map,
    hover_data={'text': True, 'price': True, 'antutu_score': True, 'phone_tier': False},
    title="Price vs Antutu Score",
    labels={'price': 'Price (USD)', 'antutu_score': 'Antutu Score (K)'}
)

for _, row in best_phones_data.iterrows():
    fig.add_scatter(
        x=[row['price']],
        y=[row['antutu_score']],
        mode='markers',
        marker=dict(symbol='diamond', size=10, color=color_map[row['phone_tier']], line=dict(width=1.2, color='gold')),
        name=f"Best Phone ({row['phone_tier']})",
        hovertemplate=(
            f"<b>{row['phone_name']}</b><br>"
            f"Price: {row['price']}<br>"
            f"Phone Tier: {row['phone_tier']}<br>"
            f"Antutu Score: {row['antutu_score']}<extra></extra>"
        )
    )

fig.update_layout(
    xaxis_title="Price (USD)",
    yaxis_title="Antutu Score (K)",
    legend_title="Phone Tier",
    hovermode='closest',
    template="plotly_white"
)

st.plotly_chart(fig)

# Bubble Chart for Battery vs Charging Speed
fig = px.scatter(
    phone_data,
    x='charging_speed',
    y='battery_capacity',
    size='price',
    color='phone_tier',
    hover_name='phone_name',
    title="Battery Capacity vs Charging Speed",
    labels={'charging_speed': 'Charging Speed (W)', 'battery_capacity': 'Battery Capacity (mAh)'},
    category_orders={"phone_tier": ["Budget", "Mid-Range", "Premium", "Flagship"]}
)

fig.update_layout(
    xaxis_title="Charging Speed (W)",
    yaxis_title="Battery Capacity (mAh)",
    template="plotly_white"
)

st.plotly_chart(fig)

# Pie Chart for Display Types
display_counts = phone_data['display_type'].value_counts().reset_index()
display_counts.columns = ['display_type', 'count']

fig = px.pie(
    display_counts,
    values='count',
    names='display_type',
    title="Distribution of Display Types",
    hole=0.3
)

fig.update_traces(textinfo='percent+label')
st.plotly_chart(fig)

# Bar Chart for Number of Phones by Tier
phone_count_by_tier = phone_data["phone_tier"].value_counts().reset_index()
phone_count_by_tier.columns = ["phone_tier", "count"]
phone_count_by_tier["phone_tier"] = pd.Categorical(phone_count_by_tier["phone_tier"], categories=tier_order, ordered=True)
phone_count_by_tier = phone_count_by_tier.sort_values("phone_tier")

fig = px.bar(
    phone_count_by_tier,
    x="phone_tier",
    y="count",
    text="count",
    title="Number of Phones in Each Tier",
    labels={"phone_tier": "Phone Tier", "count": "Number of Phones"},
    color="phone_tier",
    color_discrete_sequence=px.colors.qualitative.Set2
)
fig.update_traces(texttemplate='%{text}', textposition='outside')
fig.update_layout(
    xaxis_title="Phone Tier",
    yaxis_title="Number of Phones",
    template="plotly_white"
)

st.plotly_chart(fig)

# Correlation Matrix
columns_of_interest = ["price", "antutu_score", "battery_capacity", "charging_speed", "refresh_rate", "brightness"]
correlation_data = phone_data[columns_of_interest]
correlation_matrix = correlation_data.corr()

plt.figure(figsize=(10, 8))
sns.heatmap(
    correlation_matrix,
    annot=True,
    fmt=".2f",
    cmap="coolwarm",
    cbar=True,
    square=True
)
plt.title("Correlation Matrix", fontsize=16)
plt.xticks(rotation=45, ha='right', fontsize=12)
plt.yticks(fontsize=12)
st.pyplot(plt)

# Top 10 Chipsets by Antutu Score
filtered_data = phone_data[phone_data["antutu_score"] > 0]
top_10_data = (
    filtered_data.sort_values(by="antutu_score", ascending=False)
    .groupby("chipset")
    .first()
    .reset_index()
    .sort_values(by="antutu_score", ascending=False)
    .head(10)
)

fig = px.bar(
    top_10_data,
    x="chipset",
    y="antutu_score",
    text="phone_name",
    title="Top 10 Chipsets by Antutu Score with Phones",
    labels={"chipset": "Chipset", "antutu_score": "Antutu Score"},
    color="antutu_score",
    color_continuous_scale="Viridis"
)

fig.update_traces(textposition="outside")
fig.update_layout(
    xaxis_title="Chipset",
    yaxis_title="Antutu Score",
    xaxis_tickangle=45,
    template="plotly_white",
    showlegend=False
)
st.plotly_chart(fig)

# Box Plot for Brightness by Display Type
fig = px.box(
    phone_data,
    x="display_type",
    y="brightness",
    title="Comparison of Brightness by Display Type",
    labels={"display_type": "Display Type", "brightness": "Brightness (nits)"},
    color="display_type",
    hover_data=["phone_name"],
    template="plotly_white"
)
fig.update_layout(
    xaxis_title="Display Type",
    yaxis_title="Brightness (nits)",
    xaxis_tickangle=45,
    showlegend=False
)
st.plotly_chart(fig)

In [None]:
!streamlit run src/visualization.py