In [1]:
from typing import Dict, Union, List, Any
from dataclasses import dataclass
import numpy as np
import pandas as pd

In [2]:
class Metric:
    def __call__(self, df: pd.DataFrame) -> Dict[str, float]:
        return {}


@dataclass
class CountTotal(Metric):
    """Total number of rows in DataFrame"""

    def __call__(self, df: pd.DataFrame) -> Dict[str, float]:
        return {"total": len(df)}


@dataclass
class CountZeros(Metric):
    """Number of zeros in choosen column"""
    column: str

    def __call__(self, df: pd.DataFrame) -> Dict[str, float]:
        n = len(df)
        k = sum(df[self.column] == 0)
        return {"total": n, "count": k, "delta": k / n}

@dataclass
class CountNull(Metric):
    """Number of empty values in choosen columns"""

    columns: List[str]
    aggregation: str = "any"  # either "all", or "any"

    def __call__(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        df = df[self.columns].copy()
        if self.aggregation == 'all':
            k = sum(all(df.loc[i].isna())for i in range(n))
        else:
            k = sum(any(df.loc[i].isna())for i in range(n))
        return {"total": n, "count": k, "delta": k / n}
        
@dataclass
class CountDuplicates(Metric):
    """Number of duplicates in choosen columns"""

    columns: List[str]

    def __call__(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        k = len(df[df.duplicated(subset=self.columns)])
        return {"total": n, "count": k, "delta": k / n}
        
@dataclass
class CountValue(Metric):
    """Number of values in choosen column"""

    column: str
    value: Union[str, int, float]

    def __call__(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        k = (df[self.column] == self.value).sum()
        return {"total": n, "count": k, "delta": k / n}

In [3]:
@dataclass
class CountBelowValue(Metric):
    """Number of values below threshold"""

    column: str
    value: float
    strict: bool = False

    def __call__(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        if self.strict:
            count_below = (df[self.column] < self.value).sum()
        else:
            count_below = (df[self.column] <= self.value).sum()
        return {"total": n, "count": count_below, "delta": count_below / n}


In [4]:
sales = pd.read_csv('ke_daily_sales.csv')

In [5]:
relevance = pd.read_csv('ke_visits.csv')

In [9]:
c_t = CountTotal()
c_z = CountZeros('qty')
c_n = CountNull(['price', 'revenue'], 'any')
c_d = CountDuplicates(['item_id'])
c_v = CountValue('item_id', 100)

In [12]:
c_z(sales)

{'total': 7, 'count': 1, 'delta': 0.14285714285714285}

In [13]:
c_n(nan_sales)

{'total': 9, 'count': 2, 'delta': 0.2222222222222222}

In [14]:
c_d(sales)

{'total': 7, 'count': 4, 'delta': 0.5714285714285714}

In [15]:
from metrics import Metric, CountTotal, CountZeros, CountNull, CountDuplicates, CountBelowValue, CountBelowColumn, CountCB, CountRatioBelow, CountLag
from checklist import CHECKLIST


In [162]:
from typing import Dict, List, Tuple, Union
from dataclasses import dataclass
from metrics import Metric
import pandas as pd

LimitType = Dict[str, Tuple[float, float]]
CheckType = Tuple[str, Metric, LimitType]


@dataclass
class Report:
    """DQ report class."""

    checklist: List[CheckType]
    engine: str = "pandas"

    
    _results = {}

    def fit(self, tables: Dict[str, pd.DataFrame]) -> pd.DataFrame:
        """Calculate DQ metrics and build report."""
        self.report_ = {}
        report = self.report_
        results = []
        # Check if engine supported
        if self.engine != "pandas":
            raise NotImplementedError("Only pandas API currently supported!")
            
        report['title'] = f"DQ Report for tables {list(tables.keys())}"
        
        for table_name, metric, limits in self.checklist:
            key = hash(str(tables[table_name].values.tobytes()))  # Create a hash key based on table data
            if key in self._results:
                result = self._results[key]
            else:
                result = {}
                result['table_name'] = table_name
                result['metric'] = str(metric)
                result['limits'] = str(limits)
                try:
                    values = metric(tables[table_name])
                    result['values'] = values
                    passed = True
                    for key, (lower, upper) in limits.items():
                        if key not in values:
                            passed = False
                            break
                        value = values[key]
                        if not (lower <= value <= upper):
                            passed = False
                            break
                    if passed:
                        result['status'] = '.'
                        result['error'] = ''
                    else:
                        result['status'] = 'F'
                        result['error'] = ''
                except Exception as e:
                    result['status'] = 'E'
                    result['error'] = type(e).__name__
                
                results.append(result)
        report['result'] = pd.DataFrame.from_records(results)

        total_checks = len(report['result'])
        passed_checks = report['result']['status'].eq('.').sum()
        failed_checks = report['result']['status'].eq('F').sum()
        errors = report['result']['status'].eq('E').sum()

        report['passed'] = passed_checks
        report['failed'] = failed_checks
        report['errors'] = errors
        report['total'] = total_checks
        report['passed_pct'] = (passed_checks / total_checks) * 100 if total_checks != 0 else 0
        report['failed_pct'] = (failed_checks / total_checks) * 100 if total_checks != 0 else 0
        report['errors_pct'] = (errors / total_checks) * 100 if total_checks != 0 else 0

        return report

    def to_str(self) -> None:
        """Convert report to string format."""
        report = self.report_

        msg = (
            "This Report instance is not fitted yet. "
            "Call 'fit' before usong this method."
        )

        assert isinstance(report, dict), msg

        pd.set_option("display.max_rows", 500)
        pd.set_option("display.max_columns", 500)
        pd.set_option("display.max_colwidth", 20)
        pd.set_option("display.width", 1000)

        return (
            f"{report['title']}\n\n"
            f"{report['result']}\n\n"
            f"Passed: {report['passed']} ({report['passed_pct']}%)\n"
            f"Failed: {report['failed']} ({report['failed_pct']}%)\n"
            f"Errors: {report['errors']} ({report['errors_pct']}%)\n"
            "\n"
            f"Total: {report['total']}"
        )


In [163]:
tables = {'sales': sales, 'relevance':relevance}

In [164]:
dq_report = Report(CHECKLIST)

In [165]:
report_result = dq_report.fit(tables)

In [166]:
print(dq_report.to_str())

DQ Report for tables ['sales', 'relevance']

   table_name               metric               limits               values status     error
0       sales         CountTotal()  {'total': (1, 10...         {'total': 7}      .          
1       sales  CountLag(column=...      {'lag': (0, 3)}  {'today': '2024-...      F          
2       sales  CountDuplicates(...    {'total': (0, 0)}  {'total': 7, 'co...      F          
3       sales  CountNull(column...    {'total': (0, 0)}  {'total': 7, 'co...      F          
4       sales  CountRatioBelow(...  {'delta': (0, 0....  {'total': 7, 'co...      F          
5       sales  CountCB(column='...                   {}  {'lcb': 49.50000...      .          
6       sales  CountZeros(colum...  {'delta': (0, 0.3)}  {'total': 7, 'co...      .          
7       sales  CountBelowValue(...  {'delta': (0, 0.3)}  {'total': 7, 'co...      .          
8   relevance         CountTotal()  {'total': (1, 10...         {'total': 9}      .          
9   relevance  

In [156]:
report_result

{'title': "DQ Report for tables ['sales', 'relevance']",
 'results':    table_name               metric               limits               values status     error
 0       sales         CountTotal()  {'total': (1, 10...         {'total': 7}      .          
 1       sales  CountLag(column=...      {'lag': (0, 3)}  {'today': '2024-...      F          
 2       sales  CountDuplicates(...    {'total': (0, 0)}  {'total': 7, 'co...      F          
 3       sales  CountNull(column...    {'total': (0, 0)}  {'total': 7, 'co...      F          
 4       sales  CountRatioBelow(...  {'delta': (0, 0....  {'total': 7, 'co...      F          
 5       sales  CountCB(column='...                   {}  {'lcb': 49.50000...      .          
 6       sales  CountZeros(colum...  {'delta': (0, 0.3)}  {'total': 7, 'co...      .          
 7       sales  CountBelowValue(...  {'delta': (0, 0.3)}  {'total': 7, 'co...      .          
 8   relevance         CountTotal()  {'total': (1, 10...         {'total': 9}

# Адаптация под PySpark


In [3]:
"""Metrics."""

from typing import Any, Dict, Union, List
from dataclasses import dataclass
import datetime

import pandas as pd
import pyspark.sql as ps


@dataclass
class Metric:
    """Base class for Metric"""

    def __call__(self, df: Union[pd.DataFrame, ps.DataFrame]) -> Dict[str, Any]:
        if isinstance(df, pd.DataFrame):
            return self._call_pandas(df)

        if isinstance(df, ps.DataFrame):
            return self._call_pyspark(df)

        msg = (
            f"Not supported type of arg 'df': {type(df)}. "
            "Supported types: pandas.DataFrame, "
            "pyspark.sql.dataframe.DataFrame"
        )
        raise NotImplementedError(msg)

    def _call_pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        return {}

    def _call_pyspark(self, df: ps.DataFrame) -> Dict[str, Any]:
        return {}


@dataclass
class CountTotal(Metric):
    """Total number of rows in DataFrame"""

    def _call_pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        return {"total": len(df)}

    def _call_pyspark(self, df: ps.DataFrame) -> Dict[str, Any]:
        return {"total": df.count()}


@dataclass
class CountZeros(Metric):
    """Number of zeros in choosen column"""

    column: str

    def _call_pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        k = sum(df[self.column] == 0)
        return {"total": n, "count": k, "delta": k / n}

    def _call_pyspark(self, df: ps.DataFrame) -> Dict[str, Any]:
        from pyspark.sql.functions import col, count

        n = df.count()
        k = df.filter(col(self.column) == 0).count()
        return {"total": n, "count": k, "delta": k / n}

@dataclass
class CountNull(Metric):
     """Number of empty values in chosen columns"""

    columns: List[str]
    aggregation: str = "any"

    def _call__pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        if self.aggregation == 'all':
            k = sum(df[self.columns].isna().all(axis=1))
        else:
            k = sum(df[self.columns].isna().any(axis=1))
        return {"total": n, "count": k, "delta": k / n}

    def _call_pyspark(self, df: ps.DataFrame) -> Dict[str, Any]:
        from pyspark.sql.functions import col
        n = df.count()
        if self.aggregation == 'all':
            k = df.filter(col[self.columns].isNull()).count()
        else:
            k = df.filter(col[self.columns].isNull()).count()
        return {"total": n, "count": k, "delta": k / n}
    

@dataclass
class CountDuplicates(Metric):
    """Number of duplicates in choosen columns"""

    columns: List[str]

    def _call__pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        k = len(df[df.duplicated(subset=self.columns)])
        return {"total": n, "count": k, "delta": k / n}

    def _call_pyspark(self, df: ps.DataFrame) -> Dict[str, Any]:
        from pyspark.sql.functions import col
        n = df.count()
        k = df.groupBy(self.columns).count().filter(col('count') > 1).count()
        return {"total": n, "count": k, "delta": k / n}

@dataclass
class CountValue(Metric):
    """Number of values in chosen column"""

    column: str
    value: Union[str, int, float]

    def _call_pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        k = (df[self.column] == self.value).sum()
        return {"total": n, "count": k, "delta": k / n}

    def _call_pyspark(self, df: ps.DataFrame) -> Dict[str, Any]:
        from pyspark.sql.functions import col
        n = df.count()
        k = df.filter(col(self.column) == self.value).count()
        return {"total": n, "count": k, "delta": k / n}

@dataclass
class CountBelowValue(Metric):
    """Number of values below threshold"""

    column: str
    value: float
    strict: bool = False

    def _call_pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        if self.strict:
            count_below = (df[self.column] < self.value).sum()
        else:
            count_below = (df[self.column] <= self.value).sum()
        return {"total": n, "count": count_below, "delta": count_below / n}

    def _call_pyspark(self, df: ps.DataFrame) -> Dict[str, Any]:
        from pyspark.sql.functions import col

        n = df.count()
        if self.strict:
            count_below = df.filter(col(column) < self.value).count()
        else:
            count_below = df.filter(col(column) <= self.value).count()
            
        return {"total": n, "count": count_below, "delta": count_below / n}

@dataclass
class CountBelowColumn(Metric):
    """Count how often column X is below column Y"""

    column_x: str
    column_y: str
    strict: bool = False

    def _call_pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        if self.strict:
            count_below = (df[self.column_x] < df[self.column_y]).sum()
        else:
            count_below = (df[self.column_x] <= df[self.column_y]).sum()
        return {"total": n, "count": count_below, "delta": count_below / n}

    def _call_pyspark(self, df: pd.DataFrame) -> Dict[str, Any]:
        from pyspark.sql.functions import col

        n = df.count()
        if self.strict:
            count_below = df.filter(col(self.column_x) < col(self.column_y)).count()
        else:
            count_below = df.filter(col(self.column_x) <= col(self.column_y)).count()
        return {"total": n, "count": count_below, "delta": count_below / n}


@dataclass
class CountCB(Metric):
    """Calculate lower/upper bounds for N%-confidence interval"""

    column: str
    conf: float = 0.95

    def _call_pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        q_lower = (1 - self.conf) / 2
        q_upper = 1 - q_lower
        lcb = df[self.column].quantile(q_lower)
        ucb = df[self.column].quantile(q_upper)
        return {"lcb": lcb, "ucb": ucb}
        
    def _call_pyspark(self, df: pd.DataFrame) -> Dict[str, Any]:
        from pyspark.sql.functions import col

        n = df.count()
        quantiles = df.approxQuantile(self.column, [0.5 - self.conf/2, 0.5 + self.conf/2], 0.0)
















In [1]:
# Import the PySpark libraries
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext

# Create a SparkSession
spark = SparkSession.builder.appName("MyApp").getOrCreate()

# Create a SQLContext
sqlContext = SQLContext(spark)

# Load a CSV file
df = sqlContext.read.csv("./ke_daily_sales.csv", header=True)

# Count the number of rows in the DataFrame
rowCount = df.count()

# Print the row count
print("Number of rows:", rowCount)

# Print the schema of the DataFrame
df.printSchema()

# Show the first 10 rows of the DataFrame
df.show(10)




Number of rows: 7
root
 |-- day: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- qty: string (nullable = true)
 |-- price: string (nullable = true)
 |-- revenue: string (nullable = true)

+----------+-------+---+-----+-------+
|       day|item_id|qty|price|revenue|
+----------+-------+---+-----+-------+
|2022-10-24|    100|  5|120.0|  500.0|
|2022-10-24|    100|  6|120.0|  720.0|
|2022-10-24|    200|  2|200.0|  400.0|
|2022-10-24|    300| 10| 85.0|  850.0|
|2022-10-23|    100|  3|110.0|  330.0|
|2022-10-23|    200|  8|200.0| 1600.0|
|2022-10-23|    300|  0| 90.0|    0.0|
+----------+-------+---+-----+-------+



In [3]:
from pyspark.sql.functions import col, isnull, window

In [30]:
"""DQ Report."""

from typing import Dict, List, Tuple, Union
from dataclasses import dataclass
from metrics import Metric

import pandas as pd
import pyspark.sql as ps

LimitType = Dict[str, Tuple[float, float]]
CheckType = Tuple[str, Metric, LimitType]


@dataclass
class Report:
    """DQ report class."""

    checklist: List[CheckType]
    engine: str = "pandas"
    _results = {}
    def fit(self, tables: Dict[str, Union[pd.DataFrame, ps.DataFrame]]) -> Dict:
        """Calculate DQ metrics and build report."""

        if self.engine == "pandas":
            return self._fit_pandas(tables)

        if self.engine == "pyspark":
            return self._fit_pyspark(tables)

        raise NotImplementedError("Only pandas and pyspark APIs currently supported!")

    def _fit_pandas(self, tables: Dict[str, pd.DataFrame]) -> Dict:
        """Calculate DQ metrics and build report.  Engine: Pandas"""
        self.report_ = {}
        report = self.report_

        results = []
        report['title'] = f"DQ Report for tables {sorted(list(tables.keys()))}"
        for table_name, metric, limits in self.checklist:
            key = hash(str(tables[table_name].values.tobytes()))
            if key in self._results:
                result = self._results[key]
            else:
                result = {}
                result['table_name'] = table_name
                result['metric'] = str(metric)
                result['limits'] = str(limits)
                try:
                    values = metric(tables[table_name])
                    result['values'] = values
                    passed = True
                    for key, (lower, upper) in limits.items():
                        if key not in values:
                            passed = False
                            break
                        value = values[key]
                        if not (lower <= value <= upper):
                            passed = False
                            break
                    if passed:
                        result['status'] = '.'
                        result['error'] = ''
                    else:
                        result['status'] = 'F'
                        result['error'] = ''
                except Exception as e:
                    result['status'] = 'E'
                    result['error'] = type(e).__name__
                
                results.append(result)

        report['result'] = pd.DataFrame.from_records(results)

        total_checks = len(report['result'])
        passed_checks = report['result']['status'].eq('.').sum()
        failed_checks = report['result']['status'].eq('F').sum()
        errors = report['result']['status'].eq('E').sum()

        report['passed'] = passed_checks
        report['failed'] = failed_checks
        report['errors'] = errors
        report['total'] = total_checks
        report['passed_pct'] = (passed_checks / total_checks) * 100 if total_checks != 0 else 0
        report['failed_pct'] = (failed_checks / total_checks) * 100 if total_checks != 0 else 0
        report['errors_pct'] = (errors / total_checks) * 100 if total_checks != 0 else 0

        return report

    def _fit_pyspark(self, tables: Dict[str, pd.DataFrame]) -> Dict:
        """Calculate DQ metrics and build report.  Engine: PySpark"""
        self.report_ = {}
        report = self.report_

        results = []
        report['title'] = f"DQ Report for tables {sorted(list(tables.keys()))}"
        for table_name, metric, limits in self.checklist:
            key = hash(str(tables[table_name].collect()))
            if key in self._results:
                result = self._results[key]
            else:
                result = {}
                result['table_name'] = table_name
                result['metric'] = str(metric)
                result['limits'] = str(limits)
                try:
                    values = metric(tables[table_name])
                    result['values'] = values
                    passed = True
                    for key, (lower, upper) in limits.items():
                        if key not in values:
                            passed = False
                            break
                        value = values[key]
                        if not (lower <= value <= upper):
                            passed = False
                            break
                    if passed:
                        result['status'] = '.'
                        result['error'] = ''
                    else:
                        result['status'] = 'F'
                        result['error'] = ''
                except Exception as e:
                    result['status'] = 'E'
                    result['error'] = type(e).__name__
                
                results.append(result)

        report['result'] = pd.DataFrame.from_records(results)

        total_checks = len(report['result'])
        passed_checks = report['result']['status'].eq('.').sum()
        failed_checks = report['result']['status'].eq('F').sum()
        errors = report['result']['status'].eq('E').sum()

        report['passed'] = passed_checks
        report['failed'] = failed_checks
        report['errors'] = errors
        report['total'] = total_checks
        report['passed_pct'] = (passed_checks / total_checks) * 100 if total_checks != 0 else 0
        report['failed_pct'] = (failed_checks / total_checks) * 100 if total_checks != 0 else 0
        report['errors_pct'] = (errors / total_checks) * 100 if total_checks != 0 else 0

        return report

    def to_str(self) -> str:
        """Convert report to string format."""
        report = self.report_

        msg = (
            "This Report instance is not fitted yet. "
            "Call 'fit' before using this method."
        )

        assert isinstance(report, dict), msg

        pd.set_option("display.max_rows", 500)
        pd.set_option("display.max_columns", 500)
        pd.set_option("display.max_colwidth", 20)
        pd.set_option("display.width", 1000)

        return (
            f"{report['title']}\n\n"
            f"{report['result']}\n\n"
            f"Passed: {report['passed']} ({report['passed_pct']}%)\n"
            f"Failed: {report['failed']} ({report['failed_pct']}%)\n"
            f"Errors: {report['errors']} ({report['errors_pct']}%)\n"
            "\n"
            f"Total: {report['total']}"
        )

"""Metrics."""

from typing import Any, Dict, Union, List
from dataclasses import dataclass
import datetime

import pandas as pd
import pyspark.sql as ps


@dataclass
class Metric:
    """Base class for Metric"""

    def __call__(self, df: Union[pd.DataFrame, ps.DataFrame]) -> Dict[str, Any]:
        if isinstance(df, pd.DataFrame):
            return self._call_pandas(df)

        if isinstance(df, ps.DataFrame):
            return self._call_pyspark(df)

        msg = (
            f"Not supported type of arg 'df': {type(df)}. "
            "Supported types: pandas.DataFrame, "
            "pyspark.sql.dataframe.DataFrame"
        )
        raise NotImplementedError(msg)

    def _call_pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        return {}

    def _call_pyspark(self, df: ps.DataFrame) -> Dict[str, Any]:
        return {}


@dataclass
class CountTotal(Metric):
    """Total number of rows in DataFrame"""

    def _call_pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        return {"total": len(df)}

    def _call_pyspark(self, df: ps.DataFrame) -> Dict[str, Any]:
        return {"total": df.count()}


@dataclass
class CountZeros(Metric):
    """Number of zeros in choosen column"""

    column: str

    def _call_pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        k = sum(df[self.column] == 0)
        return {"total": n, "count": k, "delta": k / n}

    def _call_pyspark(self, df: ps.DataFrame) -> Dict[str, Any]:
        from pyspark.sql.functions import col, count

        n = df.count()
        k = df.filter(col(self.column) == 0).count()
        return {"total": n, "count": k, "delta": k / n}

@dataclass
class CountNull(Metric):
    columns: List[str]
    aggregation: str = "any"

    def _call_pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        if self.aggregation == 'all':
            k = sum(df[self.columns].isna().all(axis=1))
        else:
            k = sum(df[self.columns].isna().any(axis=1))
        return {"total": n, "count": k, "delta": k / n}

    def _call_pyspark(self, df: ps.DataFrame) -> Dict[str, Any]:
        from pyspark.sql.functions import col, isnull
        
        n = df.count()
        
        # Проверка наличия всех NULL значений в указанных колонках
        if self.aggregation == 'all':
            condition = isnull(col(self.columns[0]))
            for column in self.columns[1:]:
                condition &= isnull(col(column))
        else:
            # Проверка наличия хотя бы одного NULL значения в указанных колонках
            condition = isnull(col(self.columns[0]))
            for column in self.columns[1:]:
                condition |= isnull(col(column))

        count_null = df.filter(condition).count()
        
        return {"total": n, "count": count_null, "delta": count_null / n}

    

@dataclass
class CountDuplicates(Metric):
    """Number of duplicates in chosen columns"""

    columns: List[str]

    def _call_pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        k = len(df[df.duplicated(subset=self.columns)])
        return {"total": n, "count": k, "delta": k / n}

    def _call_pyspark(self, df: ps.DataFrame) -> Dict[str, Any]:
        from pyspark.sql.functions import count, col, window

        window_spec = window.partitionBy(self.columns).orderBy(col(self.columns[0]))
        duplicates_count = df.select(self.columns).withColumn("row_num", row_number().over(window_spec)).groupBy(*self.columns).agg(count("*").alias("count")).filter(col("count") > 1).count()
        
        n = df.count()
        return {"total": n, "count": duplicates_count, "delta": duplicates_count / n}


@dataclass
class CountValue(Metric):
    """Number of values in chosen column"""

    column: str
    value: Union[str, int, float]

    def _call_pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        k = (df[self.column] == self.value).sum()
        return {"total": n, "count": k, "delta": k / n}

    def _call_pyspark(self, df: ps.DataFrame) -> Dict[str, Any]:
        from pyspark.sql.functions import col
        n = df.count()
        k = df.filter(col(self.column) == self.value).count()
        return {"total": n, "count": k, "delta": k / n}

@dataclass
class CountBelowValue(Metric):
    """Number of values below threshold"""

    column: str
    value: float
    strict: bool = False

    def _call_pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        if self.strict:
            count_below = (df[self.column] < self.value).sum()
        else:
            count_below = (df[self.column] <= self.value).sum()
        return {"total": n, "count": count_below, "delta": count_below / n}

    def _call_pyspark(self, df: ps.DataFrame) -> Dict[str, Any]:
        from pyspark.sql.functions import col

        n = df.count()
        if self.strict:
            count_below = df.filter(col(self.column) < self.value).count()
        else:
            count_below = df.filter(col(self.column) <= self.value).count()
            
        return {"total": n, "count": count_below, "delta": count_below / n}

@dataclass
class CountBelowColumn(Metric):
    """Count how often column X is below column Y"""

    column_x: str
    column_y: str
    strict: bool = False

    def _call_pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        if self.strict:
            count_below = (df[self.column_x] < df[self.column_y]).sum()
        else:
            count_below = (df[self.column_x] <= df[self.column_y]).sum()
        return {"total": n, "count": count_below, "delta": count_below / n}

    def _call_pyspark(self, df: pd.DataFrame) -> Dict[str, Any]:
        from pyspark.sql.functions import col

        n = df.count()
        if self.strict:
            count_below = df.filter(col(self.column_x) < col(self.column_y)).count()
        else:
            count_below = df.filter(col(self.column_x) <= col(self.column_y)).count()
        return {"total": n, "count": count_below, "delta": count_below / n}


@dataclass
class CountCB(Metric):
    """Calculate lower/upper bounds for N%-confidence interval"""

    column: str
    conf: float = 0.95

    def _call_pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        q_lower = (1 - self.conf) / 2
        q_upper = 1 - q_lower
        lcb = df[self.column].quantile(q_lower)
        ucb = df[self.column].quantile(q_upper)
        return {"lcb": lcb, "ucb": ucb}
        
    def _call_pyspark(self, df: pd.DataFrame) -> Dict[str, Any]:
        from pyspark.sql.functions import col

        n = df.count()
        quantiles = df.approxQuantile(self.column, [0.5 - self.conf/2, 0.5 + self.conf/2], 0.0)
        lcb, ucb = quantiles[0], quantiles[1]
        return {"lcb": lcb, "ucb": ucb}


@dataclass
class CountRatioBelow(Metric):
    """Count how often X / Y is below Z"""

    column_x: str
    column_y: str
    column_z: str
    strict: bool = False

    def _call_pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        if self.strict:
            count_below = ((df[self.column_x] / df[self.column_y]) < df[self.column_z]).sum()
        else:
            count_below = ((df[self.column_x] / df[self.column_y]) <= df[self.column_z]).sum()
        return {"total": n, "count": count_below, "delta": count_below / n}

    def _call_pyspark(self, df: ps.DataFrame) -> Dict[str, Any]:
        from pyspark.sql.functions import col

        n = df.count()
        if self.strict:
            count_below = df.filter(col(self.column_x) / col(self.column_y) < col(self.column_z)).count()
        else:
            count_below = df.filter(col(self.column_x) / col(self.column_y) <= col(self.column_z)).count()
        return {"total": n, "count": count_below, "delta": count_below / n}

@dataclass
class CountLag(Metric):
    """A lag between latest date and today"""

    column: str
    fmt: str = "%Y-%m-%d"

    def _call_pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        today = datetime.datetime.now().strftime(self.fmt)
        df[self.column] = pd.to_datetime(df[self.column], format=self.fmt)
        last_day = df[self.column].max().strftime(self.fmt)
        last_day_datetime = df[self.column].max()
        lag = (datetime.datetime.strptime(today, self.fmt) - last_day_datetime).days
        return {"today": today, "last_day": last_day, "lag": lag}

    def _call_pyspark(self, df: ps.DataFrame) -> Dict[str, Any]:
        from pyspark.sql.functions import max, to_date, col

        today = datetime.datetime.now().strftime(self.fmt)

        # Преобразование столбца к типу данных DateType()
        df = df.withColumn(self.column, to_date(col(self.column), self.fmt))

        last_day = df.agg(max(col(self.column))).first()[0].strftime(self.fmt)
        last_day_datetime = datetime.datetime.strptime(last_day, self.fmt)

        # Исправление имени переменной
        today_datetime = datetime.datetime.strptime(today, self.fmt)

        lag = (today_datetime - last_day_datetime).days
        return {"today": today, "last_day": last_day, "lag": lag}











In [164]:
@dataclass
class CountDuplicates(Metric):
    """Number of duplicates in chosen columns"""

    columns: List[str]

    def _call_pandas(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        k = len(df[df.duplicated(subset=self.columns)])
        return {"total": n, "count": k, "delta": k / n}

    def _call_pyspark(self, df: ps.DataFrame) -> Dict[str, Any]:
        from pyspark.sql.functions import count, col, window
        
        n = df.count()
        duplicates_count = n - df.select(self.columns).count()
        return {"total": n, "count": duplicates_count, "delta": duplicates_count / n}

In [161]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

from pyspark.sql.functions import count, col, window, row_number


In [124]:
sales = sqlContext.read.csv('ke_daily_sales.csv', header=True)
relevance = sqlContext.read.csv('ke_visits.csv', header=True)

In [125]:
tables = {'sales': sales, 'relevance': relevance}

In [159]:
relevance.select('item_id').dropDuplicates().show()

+-------+
|item_id|
+-------+
|    200|
|    300|
|    100|
+-------+



In [111]:
relevance.show()

+----------+-------+-----+------+--------+
|       day|item_id|views|clicks|payments|
+----------+-------+-----+------+--------+
|2022-09-24|    100| 1000|   219|      56|
|2022-09-24|    200| 1248|   343|       1|
|2022-09-24|    300|  993|   102|      71|
|2022-09-23|    100| 3244|   730|      18|
|2022-09-23|    200|  830|   203|       9|
|2022-09-23|    300|    0|     0|       2|
|2022-09-22|    100| 2130|   123|      20|
|2022-09-22|    200| 5320|   500|      13|
|2022-09-22|    300|  777|    68|       2|
+----------+-------+-----+------+--------+



In [165]:
c_n = CountNull(['views', 'payments'], 'any')
c_d = CountDuplicates(['item_id'])

In [166]:
c_d(relevance)

{'total': 9, 'count': 0, 'delta': 0.0}

In [148]:
relevance.select(['views']).withColumn('row_num', row_number().over(window()))

PySparkTypeError: [NOT_WINDOWSPEC] Argument `window` should be a WindowSpec, got function.