In [None]:
# Import python packages
import pandas as pd
import numpy as np
import wheel_loader
import os

# Add custom wheels (if any)
wheel_loader.add_wheels()

from evidently import ColumnMapping

from evidently.report import Report
from evidently.metrics.base_metric import generate_column_metrics
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
from evidently.metrics import *

from evidently.test_suite import TestSuite
from evidently.tests.base_test import generate_column_tests
from evidently.test_preset import DataStabilityTestPreset, NoTargetPerformanceTestPreset
from evidently.tests import *

# We can also use Snowpark for our analyses!
# Get an active Snowpark session
from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
# Function to fetch data from Snowflake and convert to pandas DataFrame
def get_snowflake_data(session, table_name):
    query = f"""
    SELECT * FROM {table_name}
    """
    snow_df = session.sql(query)
    return snow_df.to_pandas()


In [None]:
housing_data = get_snowflake_data(session, 'DG_RAW_TST_DB.DG.HOUSING_DATA')

In [None]:
housing_data.head()

In [None]:
housing_data.rename(columns={'MEDHOUSEVAL': 'TARGET'}, inplace=True)
housing_data['PREDICTION'] = housing_data['TARGET'].values + np.random.normal(0, 5, housing_data.shape[0])

In [None]:
reference = housing_data.sample(n=5000, replace=False)
current = housing_data.sample(n=5000, replace=False)

In [None]:
report = Report(metrics=[
    DataDriftPreset(), 
])

report.run(reference_data=reference, current_data=current)
report

In [None]:
report.show()

In [None]:
#report.save_html('@DG_RAW_TST_DB.DG.HTML')

In [None]:
report = Report(metrics=[
    ColumnSummaryMetric(column_name='AVEROOMS'),
    ColumnQuantileMetric(column_name='AVEROOMS', quantile=0.25),
    ColumnDriftMetric(column_name='AVEROOMS')
])

report.run(reference_data=reference, current_data=current)
report

In [None]:
report = Report(metrics=[
    generate_column_metrics(ColumnQuantileMetric, parameters={'quantile':0.25}, columns=['AVEROOMS', 'AVEBEDRMS']),
])

report.run(reference_data=reference, current_data=current)
report

In [None]:
report = Report(metrics=[
    ColumnSummaryMetric(column_name='AVEROOMS'),
    generate_column_metrics(ColumnQuantileMetric, parameters={'quantile':0.25}, columns='num'),
    DataDriftPreset()
])

report.run(reference_data=reference, current_data=current)
report

In [None]:
--CREATE STAGE IF NOT EXISTS HTML;
CREATE OR REPLACE STAGE HTML;

In [None]:
from datetime import datetime
import pytz

def upload_report_to_stage(report, stage_name='HTML'):
    # Define the timezone
    tz = pytz.timezone('UTC')  # Change 'UTC' to your desired timezone if necessary

    # Generate timestamps
    timestamp = datetime.now(tz)
    timestamp_str = timestamp.strftime('%Y_%m_%d_%H_%M_%S')
    date_path = timestamp.strftime('%Y/%m/%d/')

    # Define file paths
    local_filename = f"/tmp/{timestamp_str}.html"
    stage_path = f"@{stage_name}/report"
    stage_filename = f"{stage_path}{timestamp_str}.html"

    try:
        # Save the report locally
        report.save_html(local_filename)
        
        # Upload the file to Snowflake stage
        result = session.file.put(local_filename, stage_path, auto_compress=False, overwrite=True)
        
        # Check the result of the put operation
        for file_result in result:
            if file_result.status != 'UPLOADED':
                print(f"Upload failed: {file_result.message}")
            else:
                print(f"Successfully uploaded to {stage_filename}")
        
        # Get the download URL
        download_url = session.sql(f"SELECT GET_PRESIGNED_URL('@{stage_name}', 'report/{date_path}{timestamp_str}.html') AS DOWNLOAD_LINK").collect()[0]['DOWNLOAD_LINK']
        print(f"Download URL: {download_url}")

        # Verify the file exists in the stage
        list_result = session.sql(f"LIST {stage_filename}").collect()
        if not list_result:
            print(f"Warning: File not found in stage after upload: {stage_filename}")
        else:
            print(f"File found in stage: {list_result[0]}")

    except Exception as e:
        print(f"An error occurred: {str(e)}")

    return stage_filename, download_url

# Usage
stage_filename, download_url = upload_report_to_stage(report)
print(f"Stage filename: {stage_filename}")
print(f"Download URL: {download_url}")

In [None]:
# # CLEAN UP
# def cleanup_incorrect_files(stage_name='HTML'):
#     # List all files in the stage
#     all_files = session.sql(f"LIST @{stage_name}").collect()
    
#     for file in all_files:
#         file_path = file['name']
#         if file_path.count('.html') > 1:  # Check for duplicated .html in the path
#             # Construct the removal command
#             remove_command = f"REMOVE @{stage_name}/{file_path}"
#             print(f"Removing incorrect file: {file_path}")
#             session.sql(remove_command).collect()

# # Run the cleanup
# cleanup_incorrect_files()

# # Verify the cleanup
# print("Remaining files in stage:")
# remaining_files = session.sql("LIST @HTML").collect()
# for file in remaining_files:
#     print(file['name'])

# TROUBLE SHOOT

In [None]:
# List files in the HTML stage
result = session.sql("LIST @HTML").collect()
for row in result:
    print(row)

# If you want to check a specific path within the stage:
specific_path = "LIST @HTML/report/2024/08/06/"
result = session.sql(specific_path).collect()
for row in result:
    print(row)

# STREAMLIT

In [None]:
import streamlit as st
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import udf, lit
from snowflake.snowpark.types import StringType
import base64

# Ensure you have an active Snowpark session


@st.cache_data
def get_html_content(stage_path):
    try:
        # First, check if the file exists
        file_exists = session.sql(f"LIST @DG_RAW_TST_DB.DG.HTML/{stage_path}").collect()
        if not file_exists:
            return None, f"File not found: {stage_path}"

        # If file exists, get the presigned URL
        url_result = session.sql(f"SELECT GET_PRESIGNED_URL('@DG_RAW_TST_DB.DG.HTML', '{stage_path}') as file_url").collect()
        if not url_result:
            return None, "Failed to generate presigned URL"
        
        file_url = url_result[0]['FILE_URL']

        # UDF to read file content
        @udf(name="read_file_udf", is_permanent=False, stage_location="@DG_RAW_TST_DB.DG.HTML", packages=['requests'])
        def read_file(url: str) -> str:
            import requests
            try:
                response = requests.get(url)
                response.raise_for_status()
                return response.text
            except requests.RequestException as e:
                return f"Error fetching content: {str(e)}"

        # Create a DataFrame with the URL and apply the UDF
        df = session.create_dataframe([file_url], schema=["url"])
        result = df.select(read_file("url").alias("content")).collect()

        if not result:
            return None, "Failed to read file content"
        
        return result[0]['CONTENT'], None  # Return content and no error
    except Exception as e:
        return None, f"An error occurred: {str(e)}"

st.title("HTML Report Viewer")

# List available reports
try:
    reports = session.sql("LIST @DG_RAW_TST_DB.DG.HTML/report/").collect()
    report_files = [row['name'].split('/')[-1] for row in reports if row['name'].endswith('.html')]
except Exception as e:
    st.error(f"Failed to list reports: {str(e)}")
    report_files = []

if report_files:
    # Dropdown to select a report
    selected_report = st.selectbox("Select a report to view", report_files)

    if selected_report:
        # Get the HTML content
        html_content, error = get_html_content(f"report/{selected_report}")
        
        if error:
            st.error(error)
        elif html_content:
            # Display options
            display_option = st.radio("Choose display option:", ["View as text", "View rendered HTML"])
            
            if display_option == "View as text":
                st.text_area("HTML Content", html_content, height=400)
            else:
                st.markdown(html_content, unsafe_allow_html=True)

            # Provide a download button
            b64 = base64.b64encode(html_content.encode()).decode()
            href = f'<a href="data:text/html;base64,{b64}" download="{selected_report}">Download HTML file</a>'
            st.markdown(href, unsafe_allow_html=True)
        else:
            st.warning("No content found in the selected report.")
else:
    st.warning("No reports found in the specified stage.")

# Display Streamlit version
st.sidebar.text(f"Streamlit version: {st.__version__}")

In [None]:
tests = TestSuite(tests=[
    TestNumberOfColumnsWithMissingValues(),
    TestNumberOfRowsWithMissingValues(),
    TestNumberOfConstantColumns(),
    TestNumberOfDuplicatedRows(),
    TestNumberOfDuplicatedColumns(),
    TestColumnsType(),
    TestNumberOfDriftedColumns(),
])

tests.run(reference_data=reference, current_data=current)
tests