<a href="https://colab.research.google.com/github/shomerituparna-pixel/data-quality-project/blob/main/Data_Quality_Check.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install required libraries. Run this cell first.
!pip install -q pandas numpy plotly jinja2 reportlab

In [None]:
import os
import pandas as pd
import numpy as np
import plotly.express as px
from jinja2 import Environment, FileSystemLoader
from IPython.display import IFrame, HTML, display
from reportlab.platypus import SimpleDocTemplate, Paragraph
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.lib.pagesizes import A4
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib import colors
from reportlab.lib.units import cm

# Create folders
os.makedirs("data", exist_ok=True)
os.makedirs("results", exist_ok=True)
os.makedirs("templates", exist_ok=True)

In [None]:
# Load Dataset

import pandas as pd

url = "/content/DQ_Wholesale.csv"

df_raw = pd.read_csv(url)

# Save locally (useful for report links)
df_raw.to_csv("data/wholesale_raw.csv", index=False)

print("Dataset Loaded!")
print("Shape:", df_raw.shape)
df_raw.head()


Dataset Loaded!
Shape: (55, 8)


Unnamed: 0,Channel,Region,Fresh,Milk,Grocery,Frozen,Detergents_Paper,Delicassen
0,2.0,3.0,12669.0,9656.0,7561.0,214.0,2674.0,1338.0
1,2.0,3.0,7057.0,9810.0,,,3293.0,1776.0
2,2.0,3.0,6353.0,,7684.0,,3516.0,7844.0
3,1.0,3.0,13265.0,1196.0,4221.0,6404.0,507.0,
4,2.0,3.0,,5410.0,7198.0,,1777.0,5185.0


In [None]:
print(df_raw.columns)

Index(['Channel', 'Region', 'Fresh', 'Milk', 'Grocery', 'Frozen',
       'Detergents_Paper', 'Delicassen'],
      dtype='object')


In [None]:
class DataQualityChecker:

    def __init__(self, df):
        self.df = df
        self.results = {}

    def check_missing_values(self):
        missing = self.df.isnull().sum()
        missing_pct = (missing / len(self.df)) * 100
        self.results["missing_count"] = missing.to_dict()
        self.results["missing_pct"] = missing_pct.round(2).to_dict()
        return missing_pct

    def check_duplicates(self):
        dups = self.df.duplicated().sum()
        self.results["duplicate_rows"] = int(dups)
        return dups

    def check_dtypes(self):
        dtypes = self.df.dtypes.astype(str)
        self.results["dtypes"] = dtypes.to_dict()
        return dtypes

    def uniqueness_stats(self):
        uniq = self.df.nunique(dropna=False)
        self.results["nunique"] = uniq.to_dict()
        return uniq

    def detect_outliers_iqr(self):
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        out = {}

        for col in numeric_cols:
            Q1 = self.df[col].quantile(0.25)
            Q3 = self.df[col].quantile(0.75)
            IQR = Q3 - Q1
            lower = Q1 - 1.5 * IQR
            upper = Q3 + 1.5 * IQR
            count = ((self.df[col] < lower) | (self.df[col] > upper)).sum()
            out[col] = int(count)

        self.results["outliers_iqr"] = out
        return out

    def compute_score(self):
        # Missing penalty
        missing_pen = pd.Series(self.results["missing_pct"]).mean()

        # Duplicate penalty
        dup_pen = (self.results["duplicate_rows"] / len(self.df)) * 100

        # Outlier penalty (scaled)
        out_pen = sum(self.results["outliers_iqr"].values()) * 0.004

        score = 100 - (missing_pen * 0.4) - (dup_pen * 0.2) - out_pen
        score = max(0, round(score, 2))

        self.results["quality_score"] = score
        return score


In [None]:
def clean_dataset(df, drop_thresh=0.5, numeric_impute='median'):
    """
    Cleans a DataFrame in a reproducible/simple way:
    - Drops columns with missing% > drop_thresh
    - Imputes numeric columns with median (or mean) and categorical with mode
    - Drops exact duplicate rows
    - Resets index
    Returns cleaned_df and a dict of actions performed (for the PDF)
    """
    actions = {}
    df_work = df.copy()

    # 1) Drop columns with too many missing values
    missing_pct = df_work.isnull().mean()
    to_drop = missing_pct[missing_pct > drop_thresh].index.tolist()
    actions['dropped_columns'] = to_drop
    df_work = df_work.drop(columns=to_drop)

    # 2) Impute numeric columns
    num_cols = df_work.select_dtypes(include=[np.number]).columns.tolist()
    for col in num_cols:
        # only impute if there are missing values
        if df_work[col].isnull().any():
            df_work[col] = df_work[col].fillna(0)

    # 3) Impute categorical columns with mode
    cat_cols = df_work.select_dtypes(include=['object', 'category']).columns.tolist()
    for col in cat_cols:
        if df_work[col].isnull().any():
            try:
                mode_val = df_work[col].mode().iloc[0]
            except Exception:
                mode_val = ""
            df_work[col] = df_work[col].fillna(mode_val)

    # 4) Convert common date-like columns if any (safe attempt)
    #for col in df_work.columns:
        #if 'date' in col.lower() or 'day' in col.lower():
            #try:
           #     df_work[col] = pd.to_datetime(df_work[col], errors='coerce')
           # except Exception:
            #    pass

    # 5) Drop exact duplicate rows
    dup_before = df_work.duplicated().sum()
    df_work = df_work.drop_duplicates().reset_index(drop=True)
    dup_after = df_work.duplicated().sum()
    actions['duplicates_removed'] = int(dup_before - dup_after)

    # 6) Final stats
    actions['rows_before'] = int(len(df))
    actions['rows_after'] = int(len(df_work))
    actions['missing_pct_after'] = (df_work.isnull().mean().mean() * 100).round(3)

    return df_work, actions


def generate_pdf_quality_summary(results, df_original, cleaned_df, cleaning_actions,
                                 output_path="results/data_quality_summary.pdf"):
    """
    Create a structured PDF summary that includes:
      - Overall metrics
      - Missing-values table (top columns)
      - Business rule violations
      - Outlier summary
      - Cleaning actions performed
      - Recommendations
    """
    styles = getSampleStyleSheet()
    # small custom styles
    title_style = styles['Title']
    h2 = styles['Heading2']
    normal = styles['Normal']
    monospace = ParagraphStyle('Monospace', parent=normal, fontName='Courier', fontSize=8)

    doc = SimpleDocTemplate(output_path, pagesize=A4,
                            rightMargin=2*cm,leftMargin=2*cm,
                            topMargin=2*cm,bottomMargin=2*cm)
    story = []

    # Title
    story.append(Paragraph("Data Quality Summary", title_style))
    story.append(Spacer(1, 8))

    # High-level metrics table
    rows = []
    rows.append(["Metric", "Value"])
    rows.append(["Rows (original)", str(len(df_original))])
    rows.append(["Columns (original)", str(df_original.shape[1])])
    rows.append(["Rows (cleaned)", str(len(cleaned_df))])
    rows.append(["Columns (cleaned)", str(cleaned_df.shape[1])])
    rows.append(["Duplicate rows removed", str(cleaning_actions.get('duplicates_removed', 0))])
    # overall missing pct before and after
    missing_before = (pd.DataFrame(results.get('missing_pct')) if isinstance(results.get('missing_pct'), dict)
                      else pd.Series(results.get('missing_pct')) )
    try:
        overall_missing_before = (pd.Series(results['missing_pct']).mean()).round(3)
    except Exception:
        overall_missing_before = (df_original.isnull().mean().mean() * 100).round(3)
    rows.append(["Overall missing % (before)", f"{overall_missing_before} %"])
    rows.append(["Overall missing % (after)", f"{cleaning_actions.get('missing_pct_after', 'N/A')} %"])
    rows.append(["Quality score", str(results.get('quality_score', 'N/A'))])

    t = Table(rows, hAlign='LEFT', colWidths=[8*cm, 6*cm])
    t.setStyle(TableStyle([
        ('BACKGROUND',(0,0),(-1,0),colors.lightgrey),
        ('GRID', (0,0), (-1,-1), 0.5, colors.grey),
        ('VALIGN',(0,0),(-1,-1),'MIDDLE'),
        ('FONTNAME', (0,0), (-1,0), 'Helvetica-Bold')
    ]))
    story.append(t)
    story.append(Spacer(1, 12))

    # Top missing columns (table)
    story.append(Paragraph("Top Missing Values by Column", h2))
    missing_counts = pd.Series(results.get('missing_count'))
    missing_pct = pd.Series(results.get('missing_pct'))
    miss_df = pd.DataFrame({
        'column': missing_counts.index,
        'missing_count': missing_counts.values,
        'missing_pct': missing_pct.values
    }).sort_values('missing_pct', ascending=False).reset_index(drop=True)
    # show top 10
    topN = miss_df.head(10)
    data = [list(topN.columns)]
    for _, r in topN.iterrows():
        data.append([str(r['column']), str(int(r['missing_count'])), f"{r['missing_pct']} %"])
    tbl = Table(data, colWidths=[7*cm, 3*cm, 4*cm])
    tbl.setStyle(TableStyle([
        ('GRID', (0,0), (-1,-1), 0.5, colors.grey),
        ('BACKGROUND',(0,0),(-1,0),colors.lightgrey),
        ('FONTNAME', (0,0), (-1,0), 'Helvetica-Bold')
    ]))
    story.append(tbl)
    story.append(Spacer(1, 12))

    # Business rule violations
    #story.append(Paragraph("Business Rule Violations", h2))
    #br = results.get('business_rules', {})
    #if not br:
    #    story.append(Paragraph("No business rule summary available.", normal))
    #else:
    #    br_rows = [["Rule", "Invalid Count"]]
    #    for k,v in br.items():
    #        br_rows.append([str(k), str(v)])
    #    br_tbl = Table(br_rows, colWidths=[9*cm, 5*cm])
    #    br_tbl.setStyle(TableStyle([('GRID', (0,0), (-1,-1), 0.5, colors.grey),
    #                               ('BACKGROUND',(0,0),(-1,0),colors.lightgrey),
    #                               ('FONTNAME', (0,0), (-1,0), 'Helvetica-Bold')]))
    #    story.append(br_tbl)
   # story.append(Spacer(1, 12))

    # Outlier summary
    story.append(Paragraph("Outlier Summary (IQR counts by numeric column)", h2))
    out = results.get('outliers_iqr', {})
    if not out:
        story.append(Paragraph("No outlier summary available.", normal))
    else:
        out_rows = [["Column","Outlier Count"]]
        for k,v in out.items():
            out_rows.append([str(k), str(v)])
        out_tbl = Table(out_rows, colWidths=[9*cm, 5*cm])
        out_tbl.setStyle(TableStyle([('GRID', (0,0), (-1,-1), 0.5, colors.grey),
                                    ('BACKGROUND',(0,0),(-1,0),colors.lightgrey),
                                    ('FONTNAME', (0,0), (-1,0), 'Helvetica-Bold')]))
        story.append(out_tbl)
    story.append(Spacer(1, 12))

    # Cleaning actions
    story.append(Paragraph("Cleaning Actions Performed", h2))
    ca_rows = [["Action","Details"]]
    ca_rows.append(["Dropped columns (high missing%)", ", ".join(cleaning_actions.get('dropped_columns',[]) ) or "None"])
    ca_rows.append(["Duplicates removed", str(cleaning_actions.get('duplicates_removed',0))])
    ca_rows.append(["Imputation method (numeric)", "median"])
    ca_rows.append(["Imputation method (categorical)", "mode"])
    ca_tbl = Table(ca_rows, colWidths=[7*cm,7*cm])
    ca_tbl.setStyle(TableStyle([('GRID', (0,0), (-1,-1), 0.5, colors.grey),
                                ('BACKGROUND',(0,0),(-1,0),colors.lightgrey),
                                ('FONTNAME', (0,0), (-1,0), 'Helvetica-Bold')]))
    story.append(ca_tbl)
    story.append(Spacer(1, 12))

    # Recommendations
    story.append(Paragraph("Recommendations", h2))
    recs = [
        "Investigate columns with > 20% missing values; consider dropping or collecting more data.",
        "For columns with 5-20% missingness consider model-based imputation (KNN/MICE).",
        "Validate business-rule violations by checking raw ingestion logs or source system.",
        "For outliers, check data entry or measurement units before removing.",
        "Add data-quality checks earlier in the ETL pipeline (schema enforcement, null checks)."
    ]
    for r in recs:
        story.append(Paragraph("- " + r, normal))
        story.append(Spacer(1,4))

    # Footer / end
    story.append(PageBreak())
    doc.build(story)
    return output_path

In [None]:
html_template = """
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Titanic Data Quality Report</title>
<style>
body { font-family: Arial; padding: 20px; }
.section { padding: 15px; background: #f8f8f8; border-radius: 8px; margin-bottom: 20px; }
pre { background: #eee; padding: 10px; border-radius: 5px; }
</style>
</head>
<body>

<h1>Data Quality Report</h1>
<h2>Data Quality Score: {{ score }}</h2>

<div class="section">
<h2>Missing Values (%)</h2>
<pre>{{ results["missing_pct"] }}</pre>
<iframe src="missing_plot.html" width="900" height="400"></iframe>
</div>

<div class="section">
<h2>Duplicate Rows</h2>
<pre>{{ results["duplicate_rows"] }}</pre>
</div>

<div class="section">
<h2>Outliers (IQR)</h2>
<pre>{{ results["outliers_iqr"] }}</pre>
</div>

<div class="section">
<h2>Business Rule Violations</h2>
<pre>{{ results["business_rules"] }}</pre>
</div>

<div class="section">
<h2>Age Distribution</h2>
<iframe src="age_hist.html" width="900" height="400"></iframe>
</div>

</body>
</html>
"""

with open("templates/report_template.html", "w") as f:
    f.write(html_template)


In [None]:
def generate_html_report(df, results, outfolder="results"):

    # Missing values chart
    missing = df.isnull().mean() * 100
    px.bar(missing, title="Missing Values (%)").write_html(
        f"{outfolder}/missing_plot.html", include_plotlyjs="cdn"
    )

    # Age histogram
   # px.histogram(df, x="Age", nbins=30, title="Age Distribution").write_html(
   #     f"{outfolder}/age_hist.html", include_plotlyjs="cdn"
  #  )

    # Render HTML
    env = Environment(loader=FileSystemLoader("templates"))
    template = env.get_template("report_template.html")

    html = template.render(results=results, score=results["quality_score"])

    with open(f"{outfolder}/report.html", "w") as f:
        f.write(html)

    return f"{outfolder}/report.html"


In [None]:
def generate_missing_values_report(df):
    mc = df.isnull().sum()
    mp = (mc / len(df)) * 100
    table = pd.DataFrame({"Column": mc.index, "Missing %": mp.values})
    table.to_html("results/missing_table.html")
    return "results/missing_table.html"


def generate_missing_heatmap(df):
    plt.figure(figsize=(10, 6))
    sns.heatmap(df.isnull(), cbar=False)
    plt.savefig("results/missing_heatmap.png")
    return "results/missing_heatmap.png"


def generate_top10_missing(df):
    mp = (df.isnull().sum() / len(df)) * 100
    mp.sort_values(ascending=False).head(10).to_frame("Missing %").to_html("results/top10_missing.html")
    return "results/top10_missing.html"


def generate_imputation_report(df):
    before = df.isnull().sum()
    df2 = df.copy()
    for col in df2.columns:
        df2[col] = df2[col].fillna(df2[col].median() if df2[col].dtype != 'object' else df2[col].mode()[0])
    after = df2.isnull().sum()
    comp = pd.DataFrame({"Before": before, "After": after})
    comp.to_html("results/imputation_compare.html")
    return "results/imputation_compare.html"


In [None]:
def clean_dataset(df, drop_thresh=0.5, numeric_impute='median'):
    actions = {}
    df_clean = df.copy()

    # Drop high-missing columns
    missing_pct = df_clean.isnull().mean()
    to_drop = missing_pct[missing_pct > drop_thresh].index.tolist()
    actions['dropped_columns'] = to_drop
    df_clean = df_clean.drop(columns=to_drop)

    # Numeric imputations (always fill with 0)
    numeric_cols = df_clean.select_dtypes(include=[np.number]).columns
    for col in numeric_cols:
      df_clean[col] = df_clean[col].fillna(0)

    # Categorical imputations (mode)
    cat_cols = df_clean.select_dtypes(include=['object']).columns
    for col in cat_cols:
      df_clean[col] = df_clean[col].fillna(df_clean[col].mode()[0])

    # Drop duplicates
    dup_before = df_clean.duplicated().sum()
    df_clean = df_clean.drop_duplicates().reset_index(drop=True)
    actions["duplicates_removed"] = int(dup_before)

    actions["missing_pct_after"] = (df_clean.isnull().mean().mean() * 100).round(3)

    return df_clean, actions


In [None]:
def compute_dq_after_cleaning(cleaned_df):
    dq2 = DataQualityChecker(cleaned_df)
    dq2.check_missing_values()
    dq2.check_duplicates()
    dq2.check_dtypes()
    dq2.uniqueness_stats()
    dq2.detect_outliers_iqr()
    dq2.compute_score()
    return dq2.results


In [None]:
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.lib.pagesizes import A4
from reportlab.lib import colors
from reportlab.lib.units import cm

def generate_pdf_quality_summary(results_before, results_after,
                                 df_original, cleaned_df, cleaning_actions,
                                 output_path="results/data_quality_summary.pdf"):

    styles = getSampleStyleSheet()
    normal = styles["Normal"]
    title = styles["Title"]
    h2 = styles["Heading2"]

    doc = SimpleDocTemplate(output_path, pagesize=A4)
    story = []

    story.append(Paragraph("Titanic Data Quality Summary", title))
    story.append(Spacer(1, 12))

    # BEFORE vs AFTER table
    table_data = [
        ["Metric", "Before", "After"],
        ["DQ Score", results_before["quality_score"], results_after["quality_score"]],
        ["Rows", len(df_original), len(cleaned_df)],
        ["Columns", df_original.shape[1], cleaned_df.shape[1]],
        ["Missing % (avg)",
         f"{pd.Series(results_before['missing_pct']).mean():.2f}%",
         f"{cleaning_actions['missing_pct_after']:.2f}%"]
    ]

    t = Table(table_data, colWidths=[5*cm, 4*cm, 4*cm])
    t.setStyle(TableStyle([
        ("BACKGROUND", (0,0),(-1,0), colors.lightgrey),
        ("GRID", (0,0),(-1,-1), 0.5, colors.grey)
    ]))

    story.append(t)
    story.append(Spacer(1, 12))

    # Cleaning actions
    story.append(Paragraph("Cleaning Actions", h2))
    story.append(Paragraph(f"Dropped columns: {cleaning_actions['dropped_columns']}", normal))
    story.append(Paragraph(f"Duplicates removed: {cleaning_actions['duplicates_removed']}", normal))

    doc.build(story)
    return output_path


In [None]:
import os
import pandas as pd
import numpy as np
import plotly.express as px
from jinja2 import Environment, FileSystemLoader
from IPython.display import IFrame, HTML, display

# Redefine html_template for the Wholesale Data
html_template = """
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Wholesale Data Quality Report</title>
<style>
body { font-family: Arial; padding: 20px; }
.section { padding: 15px; background: #f8f8f8; border-radius: 8px; margin-bottom: 20px; }
pre { background: #eee; padding: 10px; border-radius: 5px; }
</style>
</head>
<body>

<h1>ðŸ“¦ Wholesale Data Quality Report</h1>
<h2>Data Quality Score: {{ score }}</h2>

<div class="section">
<h2>Missing Values (%)</h2>
<pre>{{ results["missing_pct"] }}</pre>
<iframe src="missing_plot.html" width="900" height="400"></iframe>
</div>

<div class="section">
<h2>Duplicate Rows</h2>
<pre>{{ results["duplicate_rows"] }}</pre>
</div>

<div class="section">
<h2>Outliers (IQR)</h2>
<pre>{{ results["outliers_iqr"] }}</pre>
</div>

<div class="section">
<h2>Business Rule Violations</h2>
<pre>{{ results["business_rules"] }}</pre>
</div>

<div class="section">
<h2>Fresh Product Distribution</h2>
<iframe src="fresh_hist.html" width="900" height="400"></iframe>
</div>

</body>
</html>
"""

with open("templates/report_template.html", "w") as f:
    f.write(html_template)

# Redefine generate_html_report to use 'Fresh' column
def generate_html_report(df, results, outfolder="results"):

    # Missing values chart
    missing = df.isnull().mean() * 100
    px.bar(missing, title="Missing Values (%)").write_html(
        f"{outfolder}/missing_plot.html", include_plotlyjs="cdn"
    )

    # Fresh product histogram
    px.histogram(df, x="Fresh", nbins=30, title="Fresh Product Distribution").write_html(
        f"{outfolder}/fresh_hist.html", include_plotlyjs="cdn"
    )

    # Render HTML
    env = Environment(loader=FileSystemLoader("templates"))
    template = env.get_template("report_template.html")

    html = template.render(results=results, score=results["quality_score"])

    with open(f"{outfolder}/report.html", "w") as f:
        f.write(html)

    return f"{outfolder}/report.html"

# Original code from the selected cell
df = df_raw.copy()

dq = DataQualityChecker(df)
dq.check_missing_values()
dq.check_duplicates()
dq.check_dtypes()
dq.uniqueness_stats()
dq.detect_outliers_iqr()
dq.compute_score()


html_report = generate_html_report(df, dq.results)

html_report

'results/report.html'

In [None]:
from google.colab import files

cleaned_df, cleaning_actions = clean_dataset(df)
cleaned_path = "results/cleaned_data.csv"
cleaned_df.to_csv(cleaned_path, index=False)

results_after = compute_dq_after_cleaning(cleaned_df)

pdf_path = generate_pdf_quality_summary(
    results_before=dq.results,
    results_after=results_after,
    df_original=df,
    cleaned_df=cleaned_df,
    cleaning_actions=cleaning_actions,
    output_path="results/data_quality_summary.pdf"
)

files.download(cleaned_path)
files.download(pdf_path)

pdf_path


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

'results/data_quality_summary.pdf'