### Library Imports

In [6]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import sqlalchemy

### SCRAPER

In [7]:
def download_page(url: str) -> str:
    """Download raw HTML from PakWheels."""
    headers = {"User-Agent": "Mozilla/5.0"}
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.text

### PARSER

In [8]:
def parse_used_cars(html: str) -> list:
    soup = BeautifulSoup(html, "html.parser")
    cars = soup.find_all("li", class_="classified-listing featured-listing")
    cars_data = []

    for car in cars:
        try:
            car_info = car.find('ul', class_='list-unstyled search-vehicle-info-2 fs13').find_all('li')

            car_record = {
                'title': car.find('h3').get_text(strip=True),
                'price': car.find('strong', class_='generic-white fs18').get_text(strip=True),
                'location': car.find('ul', class_='list-unstyled search-vehicle-info fs13').get_text(strip=True),
                'imageLink': car.find('img', class_='pic')['src'],
                'adLink': 'https://www.pakwheels.com' + car.find('a', class_='car-name ad-detail-path')['href'],
                'model': car_info[0].get_text(strip=True),
                'kilometers': car_info[1].get_text(strip=True),
                'fuel': car_info[2].get_text(strip=True),
                'engine': car_info[3].get_text(strip=True),
                'transmission': car_info[4].get_text(strip=True)
            }

            cars_data.append(car_record)

        except Exception as e:
            # Skip malformed entries but DO NOT return early
            print("Error parsing a car:", e)
            continue

    return cars_data


### CLEANER

In [9]:
def clean_column(df, col, remove_strings):
    """Utility to strip values & convert a column to numeric."""
    df[col] = df[col].astype(str)

    for s in remove_strings:
        df[col] = df[col].str.replace(s, "", regex=False)

    df[col] = df[col].str.replace(",", "", regex=False)
    df[col] = pd.to_numeric(df[col], errors="coerce")
    return df


def clean_dataframe(df):
    """Clean and normalize numeric fields."""
    df = clean_column(df, "price", ["PKR", ",", " "])
    df = clean_column(df, "engine", ["cc"])
    df = clean_column(df, "kilometers", ["km", ","])
    return df

### LOAD to DB

In [10]:
def upload_to_sql(df, table_name, conn_string):
    """Upload cleaned dataframe to SQL Server."""
    engine = sqlalchemy.create_engine(conn_string)
    df.to_sql(table_name, engine, if_exists="replace", index=False)
    engine.dispose()

### Logger

In [15]:
from datetime import datetime
def write_log(message, logfile="pipeline_log.txt"):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with open(logfile, "a", encoding="utf-8") as f:
        f.write(f"{timestamp} â€” {message}\n")
        
def clear_log(logfile="pipeline_log.txt"):
    with open(logfile, "w", encoding="utf-8") as f:
        f.write("")   # overwrite existing logs

### PIPELINE ORCHESTRATOR

In [16]:
def run_pipeline(url, raw_csv_path, cleaned_csv_path, sql_conn_string=None, logfile="pipeline_log.txt"):
    # Clear previous logs
    clear_log(logfile)
    write_log("Pipeline started", logfile)
    print('Pipeline Started')
    try:
        write_log("Step 1: Downloading page...", logfile)
        html = download_page(url)

        write_log("Step 2: Parsing raw data...", logfile)
        raw_data = parse_used_cars(html)

        write_log(f"Step 3: Saving RAW data to {raw_csv_path}", logfile)
        pd.DataFrame(raw_data).to_csv(raw_csv_path, index=False)

        write_log("Step 4: Loading RAW CSV...", logfile)
        df_raw = pd.read_csv(raw_csv_path)

        write_log("Step 5: Cleaning data...", logfile)
        df = clean_dataframe(df_raw)

        write_log(f"Step 6: Saving CLEANED data to {cleaned_csv_path}", logfile)
        df.to_csv(cleaned_csv_path, index=False)

        if sql_conn_string:
            write_log("Step 7: Uploading to SQL Server...", logfile)
            upload_to_sql(df, "used_cars", sql_conn_string)

        write_log("Pipeline completed successfully.", logfile)
        print('Pipeline completed successfully')
    except Exception as e:
        write_log(f"ERROR: {str(e)}", logfile)
        raise

    return df


### MAIN

In [17]:
if __name__ == "__main__":

    URL = "https://www.pakwheels.com/used-cars/search/-/featured_1/"
    
    RAW_CSV = "used_cars_raw_data.csv"
    CLEANED_CSV = "used_cars_data.csv"

    SQL_CONNECTION = (
        "mssql+pyodbc://sa:admin1234@localhost/PakWheels?"
        "driver=ODBC+Driver+17+for+SQL+Server"
    )

    run_pipeline(URL, RAW_CSV, CLEANED_CSV, sql_conn_string=SQL_CONNECTION)

Pipeline Started
Pipeline completed successfully
