## Scraping test

In [54]:
import time
import pandas as pd
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager

class Cricketer_Stats_Scraper:

    def __init__(self, player_name):
        self.player_name = player_name
        self.player_id = None
        self.player_url = None
    
        # Initialize class variables for storing stats
        self.battingstats = None
        self.bowlingstats = None
        self.allroundstats = None
        self.fieldingstats = None
        self.player_info = None

        # Set up the WebDriver and open the search URL
        options = webdriver.ChromeOptions()
        options.add_argument("--headless")  # Run in headless mode
        options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
        print("Setting up WebDriver...")
        self.driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)

        # Call get_player_url() to fetch the player's URL and ID when the object is initialized
        self.get_player_url()

    def get_player_url(self):
        start_time = time.time()
        print(f"Extracting {self.player_name}'s player URL and Player ID....")
        search_url = f"https://search.espncricinfo.com/ci/content/site/search.html?search={self.player_name.lower().replace(' ', '%20')};type=player"
        self.driver.get(search_url)

        try:
            player_link_element = self.driver.find_element(By.CSS_SELECTOR, "h3.name.link-cta a")
            self.player_url = player_link_element.get_attribute("href")
            self.player_id = self.player_url.split('-')[-1]
            print(f"Extraction Successful for {self.player_name}.")
            end_time = time.time()
            print(f"Time taken to extract URL: {end_time - start_time:.2f} seconds")
        except Exception as e:
            print(f"Error in extracting {self.player_name}'s url:", e)
            return None, None

    def extract_inns_data(self, record_type):
        start_time = time.time()
        print(f"Starting extraction of {self.player_name}'s {record_type} stats....")
        
        # Construct the search URL based on record_type (batting, bowling, etc.)
        search_url = f"https://stats.espncricinfo.com/ci/engine/player/{self.player_id}.html?class=11;template=results;type={record_type};view=innings"
        
        # Open the URL
        self.driver.get(search_url)

        # Step 1: Extract the headers of the table
        headers = self.driver.find_elements(By.CSS_SELECTOR, "thead tr.headlinks th")
        header_names = [header.text for header in headers if header.text != ''] + ['Match id']  # Add match_id column name
        
        # Step 2: Extract the data from the 4th tbody
        rows = self.driver.find_elements(By.XPATH, "(//tbody)[4]//tr")
        
        # Step 3: Extract the data column-wise and store it in a list
        player_data = []
        for row in rows:
            cells = row.find_elements(By.TAG_NAME, "td")
            row_data = [cell.text for cell in cells if cell.text != '']
            player_data.append(row_data)
        
        # Step 4: Create a DataFrame from the extracted data
        innings_data = pd.DataFrame(player_data, columns=header_names)
        
        end_time = time.time()
        print(f"Extracted {innings_data.shape[0]} records in {end_time - start_time:.2f} seconds")
        
        return innings_data

    def extract_player_info(self):
        try:
            start_time = time.time()
            print(f"Starting extraction of {self.player_name}'s personal info....")
            
            # Start by opening the player info URL
            self.driver.get(self.player_url)

            # Step 1: Extract headers within the specified div tag
            headers = self.driver.find_elements(By.XPATH, "//div[@class='ds-grid lg:ds-grid-cols-3 ds-grid-cols-2 ds-gap-4 ds-mb-8']//p[@class='ds-text-tight-m ds-font-regular ds-uppercase ds-text-typo-mid3']")
            header_names = ['Player ID','Player URL']+[header.text for header in headers]

            # Step 2: Extract values within the specified div tag
            values = self.driver.find_elements(By.XPATH, "//div[@class='ds-grid lg:ds-grid-cols-3 ds-grid-cols-2 ds-gap-4 ds-mb-8']//span[@class='ds-text-title-s ds-font-bold ds-text-typo']")
            value_texts = [self.player_id,self.player_url]+[value.text for value in values]

            # Step 3: Create a DataFrame from the extracted data
            player_info = pd.DataFrame([value_texts], columns=header_names)

            end_time = time.time()
            print(f"Extracted player info in {end_time - start_time:.2f} seconds")

            return player_info
            
        except Exception as e:
            print(f"Error in extracting {self.player_name}'s personal info:", e)
            return None

    def get_player_stats(self, stats_type="all"):
        try:
            # Ensure that player ID or player URL is available
            if not (self.player_id or self.player_url):
                print("Player ID is not available. Run get_player_url() first.")
                return
            
            # Fetch personal information if 'personal_info' is passed
            if stats_type == "personal_info":
                self.player_info = self.extract_player_info()
            
            # Fetch batting stats if 'all' or 'batting' is passed
            if stats_type == "all" or stats_type == "batting":
                self.battingstats = self.extract_inns_data('batting')

            # Fetch bowling stats if 'all' or 'bowling' is passed
            if stats_type == "all" or stats_type == "bowling":
                self.bowlingstats = self.extract_inns_data('bowling')

            # Check if the player is an all-rounder and fetch all-round stats
            if stats_type == "all" or stats_type == "allround":
                self.player_info = self.extract_player_info()
                if self.player_info is not None and 'allround' in self.player_info['PLAYING ROLE'][0].lower():
                    self.allroundstats = self.extract_inns_data('allround')

            # Fetch fielding stats if 'all' or 'fielding' is passed
            if stats_type == "all" or stats_type == "fielding":
                self.fieldingstats = self.extract_inns_data('fielding')

        except Exception as e:
            print(f"Error in extracting stats for {self.player_name}: ", e)


    def __del__(self):
        try:
            self.driver.quit()
            print("WebDriver closed successfully.")
        except Exception as e:
            print("Error while closing the WebDriver:", e)



##### Extracting Ground Information

The following functions are built to extract ground information. However, these are very resource-intensive, so we will take it up later. 

In [9]:
def extract_ground_links(player_id):
    """
    This function extracts ground links from the player innings data, ensuring no duplicate ground info is scraped.
    """
    # Step 1: Initialize the DataFrame to store ground info
    ground_info_df = pd.DataFrame(columns=["Ground ID", "Stadium Name", "Location", "Home Team", "Image URL"])

    # Set up the WebDriver for scraping
    options = webdriver.ChromeOptions()
    options.add_argument("--headless")
    driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
    
    # Scrape Player Stats Page
    search_url = f"https://stats.espncricinfo.com/ci/engine/player/{player_id}.html?class=11;template=results;type=batting;view=innings"
    driver.get(search_url)
    
    # Extract ground links from innings data
    rows = driver.find_elements(By.XPATH, "(//tbody)[4]//tr")
    ground_links = []
    
    for row in rows:
        try:
            ground_name_element = row.find_element(By.XPATH, ".//td[contains(@class, 'left')][2]/a")
            ground_name = ground_name_element.text
            ground_link = ground_name_element.get_attribute('href')
            ground_links.append((ground_name, ground_link))
        except Exception as e:
            print(f"Error extracting ground data: {e}")
            continue

    # Step 2: Check if the ground has already been scraped (exists in ground_info DataFrame)
    for ground_name, ground_link in ground_links:
        if ground_name not in ground_info_df['Stadium Name'].values:
            # Create a new DataFrame for the new ground
            new_data = pd.DataFrame({"Stadium Name": [ground_name], "Ground Link": [ground_link]})
            ground_info_df = pd.concat([ground_info_df, new_data], ignore_index=True)
        else:
            print(f"Ground {ground_name} has already been scraped. Skipping.")

    # Step 3: Extract ground info for each link and append it to the ground_info_df
    for ground_link in ground_info_df['Ground Link']:
        ground_info_df = extract_ground_info(ground_link, ground_info_df)
    
    driver.quit()
    return ground_info_df

def extract_ground_info(ground_url, ground_info_df):
    """
    This function extracts ground information (ID, stadium name, location, home team, image URL)
    from a given ground URL and appends the data to the provided dataframe.
    """
    start_time = time.time()
    
    # Set up the WebDriver for scraping ground info
    options = webdriver.ChromeOptions()
    options.add_argument("--headless")
    driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
    
    driver.get(ground_url)
    
    try:
        # 1. Ground ID (numeric portion of the URL)
        ground_id = ground_url.split('/')[-1].split('.')[0]
        
        # 2. Ground image URL
        img_element = driver.find_element(By.XPATH, "//div[@class='ds-p-0']//img[1]")
        image_url = img_element.get_attribute("src")
        
        # 3. Stadium Name
        stadium_name = driver.find_element(By.XPATH, "//span[contains(@class, 'ds-text-title-m') and contains(@class, 'ds-font-bold')]").text
        
        # 4. Location (City)
        location = driver.find_element(By.XPATH, "//span[contains(@class, 'ds-text-compact-s') and contains(@class, 'ds-font-bold')]").text.strip().replace("\n", ", ")
        
        # 5. Home Team (Country)
        home_team_text = driver.find_element(By.XPATH, "//span[contains(text(), 'Grounds in')]").text
        home_team = home_team_text.split("Grounds in")[-1].strip()
        
        # Prepare the ground info as a dictionary
        ground_info = pd.DataFrame({
            "Ground ID": [ground_id],
            "Stadium Name": [stadium_name],
            "Location": [location],
            "Home Team": [home_team],
            "Image URL": [image_url]
        })
        
        # Append the ground info to the DataFrame
        ground_info_df = pd.concat([ground_info_df,ground_info], ignore_index=True)
        print(f"Extracted info for ground {stadium_name} in {time.time() - start_time:.2f} seconds.")
    
    except Exception as e:
        print(f"Error while extracting info for {ground_url}: {e}")
    
    driver.quit()
    return ground_info_df

In [5]:
ground_url = 'https://www.espncricinfo.com/cricket-grounds/rangiri-dambulla-international-stadium-59368'

# Set up the WebDriver for scraping ground info
options = webdriver.ChromeOptions()
options.add_argument("--headless")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
    
driver.get(ground_url)
    
try:
    # 1. Ground ID (numeric portion of the URL)
    ground_id = ground_url.split('/')[-1].split('.')[0]
        
    # 2. Ground image URL
    img_element = driver.find_element(By.XPATH, "//div[@class='ds-p-0']//img")
    image_url = img_element.get_attribute("src")
        
        # 3. Stadium Name
    stadium_name = driver.find_element(By.XPATH, "//span[contains(@class, 'ds-text-title-m') and contains(@class, 'ds-font-bold')]").text
        
    # 4. Location (City)
    location = driver.find_element(By.XPATH, "//span[contains(@class, 'ds-text-compact-s') and contains(@class, 'ds-font-bold')]").text.strip().replace("\n", ", ")
        
    # 5. Home Team (Country)
    home_team_text = driver.find_element(By.XPATH, "//span[contains(text(), 'Grounds in')]").text
    home_team = home_team_text.split("Grounds in")[-1].strip()
        
    # Prepare the ground info as a dictionary
    ground_info = pd.DataFrame({
            "Ground ID": [ground_id],
            "Stadium Name": [stadium_name],
            "Location": [location],
            "Home Team": [home_team],
            "Image URL": [image_url]
        })

except Exception as e : print(e)
    
ground_info

Unnamed: 0,Ground ID,Stadium Name,Location,Home Team,Image URL
0,rangiri-dambulla-international-stadium-59368,Rangiri Dambulla International Stadium,Dambulla,Sri Lanka,"https://img1.hscicdn.com/image/upload/f_auto,t..."


In [None]:
player_id = 253802
grounds = extract_ground_links(player_id)

grounds

## Transformation

In [68]:
import time
import pandas as pd
import numpy as np

class Cricketer_Stats_Transformer:
    
    def __init__(self, player_name):
        self.player_name = player_name
        self.player_info = None
        self.battingstats = None
        self.bowlingstats = None
        self.allroundstats = None
        self.fieldingstats = None
        self.player_id = None
        self.player_url = None

    #transforming data

    def transform_data(self,df):
        
        #STEP 1: Replacing incorrect values. 
        repl_dict = {
            '*':'',
            'DNB':np.nan,
            'TDNB':np.nan,
            'DNF':np.nan,
            'TDNF':np.nan,
            '-':np.nan
            }

        df = df.replace(repl_dict)

        #STEP 2: Opposition column
        df['Format'] = df['Opposition'].str.extract(r'(^.*?)\sv\s')
        df['Opposition'] = df['Opposition'].str.extract(r'\sv\s(.*?$)')

        #STEP 3: Ground column
        ground_mapping = {
        "Colombo (SSC)": "Colombo",
        "Colombo (PSS)": "Colombo",
        "Colombo (RPS)": "Colombo",
        "Eden Gardens": "Kolkata",
        "Wankhede": "Mumbai",
        "Brabourne": "Mumbai",
        "Kingston": "Kingston Jamaica",
        "The Oval": "London",
        "Lord's": "London",
        "W.A.C.A": "Perth",
        "Dharamsala": "Dharamshala",
        "Hamilton": "Hamilton Waikato",
        "Fatullah": "Fatullah Dhaka",
        "Providence": "Providence Guyana",
        "Dubai (DICS)": "Dubai",
        "Chattogram": "Chattogram Chittagong"
        }

        df['Ground']=df['Ground'].replace(ground_mapping)
        df = df.rename(columns={'Ground':'Location'})

        #STEP 4: START DATE
        df['Start Date'] = df['Start Date'].astype('datetime64[ns]')

        #STEP 5: MATCH ID
        df['Match id']='#'+df['Match id'].str.extract(r'(\d+$)')
        df = df.rename(columns={'Match id':'Match ID'})

        return df
    
    def final_df(self,df,common_cols,custom_cols):
        if df is not None:
            print("Starting transformation...")
            df = self.transform_data(df)
            print("Transformation complete.")
            return df[ common_cols[:-2] + custom_cols + common_cols[-2:] ]

    def process_data(self,type="all"):

        common = ['Match ID','Start Date','Format','Inns','Opposition','Location']
        batcols = ['Pos','Runs','BF','4s','6s','SR','Mins','Dismissal']
        bowlcols = ['Pos','Overs','Mdns','Runs','Wkts','Econ']
        fieldcols = ['Dis','Ct']
        allroundcols = ['Score','Overs','Conc','Wkts','Ct','St']

        try:
        
            # Process batting stats
            if type == 'all' or type == 'batting':
                self.battingstats = self.final_df(self.battingstats, common, batcols)

            # Process bowling stats
            if type == 'all' or type == 'bowling':
                self.bowlingstats = self.final_df(self.bowlingstats, common, bowlcols)

            # Process fielding stats
            if type == 'all' or type == 'fielding':
                self.fieldingstats = self.final_df(self.fieldingstats, common, fieldcols)

            # Process allround stats
            if type == 'all' or type == 'allround':
                if self.player_info is not None and 'allround' in self.player_info['PLAYING ROLE'][0].lower():
                    self.allroundstats = self.final_df(self.allroundstats, common, allroundcols)

           
        except Exception as e:
            print(f"Error in processing data for {self.player_name}: ", e)

## Loader

In [65]:
import pandas as pd
from google.cloud import storage
from io import BytesIO

class CricketerStatsLoader:
    def __init__(self, player_name, data_type="raw"):
        self.player_name = player_name.lower().replace(" ", "_")
        self.data_type = data_type  # 'raw' or 'tf'
        self.battingstats = None
        self.bowlingstats = None
        self.fieldingstats = None
        self.allroundstats = None
        self.player_info = None

    def ensure_bucket_exists(self, bucket_name):
        """Checks if the bucket exists; if not, creates it."""
        client = storage.Client()
        bucket = client.lookup_bucket(bucket_name)

        if not bucket:
            print(f"🛠️ Bucket '{bucket_name}' does not exist. Creating it...")
            bucket = client.create_bucket(bucket_name)
            print(f"✅ Bucket '{bucket_name}' created successfully.")
        else:
            print(f"✅ Bucket '{bucket_name}' already exists.")

    def upload_df_to_gcs(self, bucket_name, destination_blob_name, df):
        """Uploads a Pandas DataFrame as a CSV file to Google Cloud Storage."""
        if df is None or df.empty:
            print(f"⚠️ Warning: {destination_blob_name} is empty, skipping upload.")
            return

        client = storage.Client()
        bucket = client.bucket(bucket_name)
        blob = bucket.blob(destination_blob_name)

        # Convert DataFrame to CSV in memory (Binary Buffer)
        buffer = BytesIO()
        df.to_csv(buffer, index=False)
        buffer.seek(0)  # Reset buffer position

        # Upload directly from memory
        blob.upload_from_file(buffer, content_type="text/csv")
        print(f"✅ File uploaded to GCS: gs://{bucket_name}/{destination_blob_name}")

    def download_df_from_gcs(self, bucket_name, stat_type):
        """Downloads a specific type of cricket stats from GCS into a Pandas DataFrame.
    
        Args:
            bucket_name (str): Google Cloud Storage bucket name.
            stat_type (str): Type of data to fetch (e.g., "batting", "bowling", "fielding", "allround", "personal_info").
    
        Returns:
            pd.DataFrame: The downloaded DataFrame, or None if the file is missing.
        """
        # Construct the full GCS path based on the player name, data type, and stat type
        file_name_map = {
            "batting": "batting_stats.csv",
            "bowling": "bowling_stats.csv",
            "fielding": "fielding_stats.csv",
            "allround": "allround_stats.csv",
            "personal_info": "personal_info.csv"
                        }

        if stat_type not in file_name_map:
            print(f"❌ Error: Invalid stat_type '{stat_type}'. Choose from {list(file_name_map.keys())}.")
            return None

        # Define the GCS blob name based on the structure
        source_blob_name = f"{self.player_name}/{self.data_type}/{file_name_map[stat_type]}"

        # Initialize GCS client and fetch the file
        client = storage.Client()
        bucket = client.bucket(bucket_name)
        blob = bucket.blob(source_blob_name)

        try:
            csv_data = blob.download_as_text()
            df = pd.read_csv(BytesIO(csv_data.encode()))
            print(f"✅ Successfully downloaded {stat_type} data from gs://{bucket_name}/{source_blob_name}")
            return df
        except Exception as e:
            print(f"⚠️ Error downloading {stat_type} stats: {e}")
            return None


    def load_data(self, bucket_name):
        """
        Uploads data to GCS in a structured format.
        - bucket_name: Name of the GCS bucket.
        """
        print(f"🚀 Uploading {self.player_name}'s {self.data_type} data to GCS...")

        # Ensure bucket exists before uploading
        self.ensure_bucket_exists(bucket_name)

        # Define the base folder (`player_name/raw/` or `player_name/tf/`)
        base_folder = f"{self.player_name}/{self.data_type}/"

        if self.battingstats is not None:
            self.upload_df_to_gcs(bucket_name, base_folder + "batting_stats.csv", self.battingstats)

        if self.bowlingstats is not None:
            self.upload_df_to_gcs(bucket_name, base_folder + "bowling_stats.csv", self.bowlingstats)

        if self.fieldingstats is not None:
            self.upload_df_to_gcs(bucket_name, base_folder + "fielding_stats.csv", self.fieldingstats)

        if self.allroundstats is not None:
            self.upload_df_to_gcs(bucket_name, base_folder + "allround_stats.csv", self.allroundstats)

        if self.player_info is not None:
            self.upload_df_to_gcs(bucket_name, base_folder + "personal_info.csv", self.player_info)

        print(f"✅ All {self.data_type} data successfully uploaded to GCS in gs://{bucket_name}/{base_folder}")

## testing code

In [None]:
#scraping data
player_name = "Virat Kohli"
virat_raw = Cricketer_Stats_Scraper(player_name)
virat_raw.get_player_stats()

#saving raw data to bucket
bucket_name = "cricketer_stats"

virat_raw_loader = CricketerStatsLoader(player_name)
virat_raw_loader.battingstats = virat_raw.battingstats
virat_raw_loader.bowlingstats = virat_raw.bowlingstats
virat_raw_loader.fieldingstats = virat_raw.fieldingstats
virat_raw_loader.player_info = virat_raw.player_info

virat_raw_loader.load_data(bucket_name)


Setting up WebDriver...
Extracting Virat Kohli's player URL and Player ID....
Extraction Successful for Virat Kohli.
Time taken to extract URL: 9.47 seconds
WebDriver closed successfully.
Starting extraction of Virat Kohli's batting stats....
Extracted 646 records in 134.75 seconds
Starting extraction of Virat Kohli's bowling stats....
Extracted 663 records in 100.42 seconds
Starting extraction of Virat Kohli's personal info....
Extracted player info in 2.47 seconds
Starting extraction of Virat Kohli's fielding stats....
Extracted 663 records in 116.01 seconds


In [None]:
#downloading raw data from bucket and assigning to transformer object
virat_raw_downloader = CricketerStatsLoader(player_name, data_type="raw")
virat_tf = Cricketer_Stats_Transformer(player_name)

virat_tf.battingstats = virat_raw_downloader.download_df_from_gcs(bucket_name, "batting")
virat_tf.bowlingstats = virat_raw_downloader.download_df_from_gcs(bucket_name, "bowling")
virat_tf.fieldingstats = virat_raw_downloader.download_df_from_gcs(bucket_name, "fielding")
virat_tf.allroundstats = virat_raw_downloader.download_df_from_gcs(bucket_name, "allround")
virat_tf.player_info = virat_raw_downloader.download_df_from_gcs(bucket_name, "personal_info")

#transforming data
virat_tf.process_data()

#saving transformed data to bucket
virat_tf_loader = CricketerStatsLoader(player_name, data_type="tf")

virat_tf_loader.battingstats = virat_tf.battingstats
virat_tf_loader.bowlingstats = virat_tf.bowlingstats
virat_tf_loader.fieldingstats = virat_tf.fieldingstats
virat_tf_loader.allroundstats = virat_tf.allroundstats
virat_tf_loader.player_info = virat_tf.player_info

virat_tf_loader.load_data(bucket_name)