In [0]:
%pip install beautifulsoup4
%pip install lxml
%pip install html5lib 
%restart_python 

In [0]:
import os
import requests
from difflib import unified_diff
import pandas as pd
from bs4 import BeautifulSoup
from email.mime.text import MIMEText
import smtplib
from pyspark.sql.functions import current_timestamp

class ClubRoyaleOffers:
    def __init__(self, unity_catalog_volume, sender_email, sender_password,uc_catalog,uc_schema,uc_table):
        """
        Initializes the ClubRoyaleOffers class with necessary configurations.

        Args:
            unity_catalog_volume (str): The base path to the Unity Catalog volume.
            sender_email (str): The sender's email address.
            sender_password (str): The sender's email password (use app password).
        """
        self.uc_catalog = uc_catalog
        self.uc_schema = uc_schema
        self.uc_table = uc_table
        self.unity_catalog_volume = unity_catalog_volume
        self.sender_email = sender_email
        self.sender_password = sender_password
        self.real_column_names = "Name","RewardNumber","OfferName","OfferType","Expires","Certificate","OfferCode","TradeInValue"
        self.include_diff_in_email=False
        self.testing=False
    def read_file_from_unity_catalog(self, filepath):
        """
        Reads the content of a file from Unity Catalog volume if it exists.

        Args:
            filepath (str): The full path to the file in Unity Catalog volume.

        Returns:
            str or None: The content of the file if it exists, otherwise None.
        """
        try:
            with open(filepath, 'r') as f:
                return f.read()
        except Exception:
            print(f"File not found or inaccessible: {filepath}")
            return None

    def write_file_to_unity_catalog_volume(self, filepath, content):
        """
        Writes content to a file in Unity Catalog volume.

        Args:
            filepath (str): The full path to the file in Unity Catalog volume.
            content (str): The content to write to the file.
        """
        directory = os.path.dirname(filepath)
        if not os.path.exists(directory):
            os.makedirs(directory)

        with open(filepath, 'w') as f:
            f.write(content)
        print(f"File written to: {filepath}")

    def send_email(self, subject, body, receiver_email):
        """
        Sends an email using Gmail.

        Args:
            subject (str): The subject of the email.
            body (str): The body of the email.
            receiver_email (str): The recipient's email address.
        """
        msg = MIMEText(body)
        msg['Subject'] = subject
        msg['From'] = self.sender_email
        msg['To'] = receiver_email

        try:
            # Use Google's SMTP server with TLS
            server = smtplib.SMTP('smtp.gmail.com', 587)
            server.starttls()
            server.login(self.sender_email, self.sender_password)
            server.sendmail(self.sender_email, receiver_email, msg.as_string())
            server.quit()
            print("Email sent successfully!")
        except Exception as e:
            print(f"Failed to send email: {e}")

    def html_to_dataframes(self, html_string):
        """
        Takes an HTML string as input and returns a list of pandas DataFrames,
        where each DataFrame represents a table found in the HTML.

        Args:
            html_string (str): The HTML content as a string.

        Returns:
            list: A list of pandas DataFrames, or an empty list if no tables are found
                  or if the input is invalid.
        """
        if not isinstance(html_string, str):
            print("Error: Input is not a string.")
            return []

        soup = BeautifulSoup(html_string, 'html.parser')
        tables = soup.find_all('table')

        dataframes = []
        for table in tables:
            try:
                df = pd.read_html(str(table))[0]
                dataframes.append(df)
            except Exception as e:
                print(f"Could not convert table to DataFrame: {e}")
                continue

        return dataframes

    def transform_dataframe(self, df, new_column_names):
        """
        Transforms the dataframe by removing specific columns and renaming the remaining columns.

        Args:
            df (pd.DataFrame): The input dataframe.
            new_column_names (list): The new column names.

        Returns:
            pd.DataFrame: The transformed dataframe.
        """
        if df.empty:
            print("Input dataframe is empty, no reason to cleanse")
            return df
        # Remove the last column
        df = df.iloc[:, :-1]
        # Remove the first two columns
        df = df.iloc[:, 2:]
        # Rename the remaining columns
        df.columns = new_column_names
        # Remove the first two rows if the row count is greater than zero
        if len(df) > 0:
            df = df.iloc[2:]
        # Convert EXPIRES column from string to date
        df['Expires'] = pd.to_datetime(df['Expires'], errors='coerce')

        # Remove the leading $ from the column "TRADE IN VALUE" and convert to a number
        df['TradeInValue'] = df['TradeInValue'].replace('[\$,]', '', regex=True).astype(float)
        
        # Remove the 6th column
        df = df.drop(df.columns[5], axis=1)
        return df

    def check_html_and_update_file(self, url, post_data, unity_catalog_filepath, receiver_email, player_id_to_lookup):
        """
        Retrieves HTML from a URL with POST data, extracts tables as dataframes,
        compares the last dataframe to a stored version (derived from a stored HTML file),
        updates the file if the last dataframe changed, displays the differences,
        and sends an alert notification if the last dataframe changed.

        Args:
            url (str): The URL to fetch HTML from.
            post_data (dict): The data to send in the POST request.
            unity_catalog_filepath (str): The full path to the file in Unity Catalog to store the HTML.
            receiver_email (str): The recipient's email address.
            player_id_to_lookup (str): The player ID to lookup.
        """
        print(f"Checking for changes for URL: {url}")

        try:
            response = requests.post(url, data=post_data)
            response.raise_for_status()
            current_html = response.text
        except requests.exceptions.RequestException as e:
            print(f"Failed to retrieve current HTML: {e}")
            return

        # Extract dataframes from current HTML
        current_dataframes = self.html_to_dataframes(current_html)

        if not current_dataframes:
            print("No tables found in the current HTML content.")
            return

        current_last_df = current_dataframes[-1]  # Get the last dataframe

        # Read stored HTML and extract dataframes
        stored_html = self.read_file_from_unity_catalog(unity_catalog_filepath)
        stored_last_df = None

        if stored_html:
            stored_dataframes = self.html_to_dataframes(stored_html)
            if stored_dataframes:
                stored_last_df = stored_dataframes[-1]  # Get the last dataframe from stored HTML

        # Compare the last dataframes
        if stored_last_df is None or not stored_last_df.equals(current_last_df):
            print("The content of the last table has changed or the file did not exist. Updating file.")
            clean_df = self.transform_dataframe(current_last_df, self.real_column_names)
            if not clean_df.empty:
                clean_spark_df = spark.createDataFrame(clean_df)
                clean_spark_df = clean_spark_df.withColumn("DateIngested", current_timestamp())
                full_table_name = f"{self.uc_catalog}.{self.uc_schema}.{self.uc_table}"
                display(clean_spark_df)
                if not spark.catalog.tableExists(full_table_name):
                    print("Creating table")
                    clean_spark_df.createOrReplaceTempView("clean_spark_df")
                    spark.sql(f"""
                        CREATE TABLE {full_table_name} 
                        USING DELTA 
                        TBLPROPERTIES (delta.enableChangeDataFeed = true)
                        AS SELECT * FROM clean_spark_df WHERE 1=0
                    """)
                print("appending to table")
                clean_spark_df.write.format("delta").mode("append").saveAsTable(full_table_name)
                print("saved as table")
                #display(spark.sql(f"SELECT * from {full_table_name} ORDER BY DateIngested DESC LIMIT 10"))
            else:
                print("The last table is empty.")

            # Display differences if the file existed and the last dataframe changed
            diff_body = ""
            if stored_last_df is not None:
                print("\n--- Differences in Last DataFrame ---")
                stored_df_str = stored_last_df.to_string()
                current_df_str = current_last_df.to_string()

                diff = list(unified_diff(stored_df_str.splitlines(keepends=True),
                                         current_df_str.splitlines(keepends=True),
                                         fromfile='stored_dataframe',
                                         tofile='current_dataframe'))
                for line in diff:
                    print(line.strip())
                    if self.include_diff_in_email:
                        diff_body += line
                print("-----------------------------------")

            # Send alert notification if the last dataframe changed or file didn't exist
            subject = f"Change in offers for {player_id_to_lookup} at {url}"
            body = f"The content of the last table for {player_id_to_lookup} at {url} has changed.\n\nFile updated: {unity_catalog_filepath}"

            if self.include_diff_in_email and diff_body:
                body += "\n\n--- Differences in Last DataFrame ---\n" + diff_body

            self.send_email(subject, body, receiver_email)

            # Always write the new HTML content if the last dataframe changed or file didn't exist
            if self.testing:
                print("Not saving file to volume, will send emails each time it runs")
            else:
                self.write_file_to_unity_catalog_volume(unity_catalog_filepath, current_html)
        else:
            print("The content of the last table has not changed.")

    def lookup_player_data(self, last_name_to_lookup, player_id_to_lookup, receiver_email):
        """
        Looks up player data by sending a POST request with the player's last name and ID,
        then checks for HTML changes, updates the file, and sends an email notification if changes are found.

        Args:
            last_name_to_lookup (str): The last name of the player to lookup.
            player_id_to_lookup (str): The player ID to lookup.
            receiver_email (str, optional): The recipient's email address. Defaults to "alan.dennis@gmail.com".
        """
        url = "https://www.clubroyaleoffers.com/PlayerLookup.asp"
        payload = {
            "tbxLNameLookup": last_name_to_lookup,
            "tbxPlayerLookup": player_id_to_lookup
        }

        # Define the path in Unity Catalog where you want to store the HTML file
        unity_catalog_filepath = f'{self.unity_catalog_volume}/ClubRoyaleHTML/{last_name_to_lookup}_{player_id_to_lookup}_html.txt'

        # Check for HTML changes, update file, and send email

        self.check_html_and_update_file(url, payload, unity_catalog_filepath, receiver_email, player_id_to_lookup)

