In [48]:
import pandas as pd
import numpy as np
import polars as pl

def read_dataframe(input_value):
    """
    Accept a pandas DataFrame, numpy ndarray, polars DataFrame, or Python dictionary and return a polars DataFrame.
    Supported types are: pandas DataFrame, numpy ndarray, polars DataFrame, and Python dictionary.
    """
    input_types = {
        pd.DataFrame: lambda x: pl.from_pandas(x),
        np.ndarray: lambda x: pl.from_numpy(x),
        pl.DataFrame: lambda x: x,
        dict: lambda x: pl.from_dict(x),
        list : lambda x: pl.from_records(x)
    }
    
    try:
        input_type = type(input_value)
        if input_type not in input_types:
            print("Supported types are: pandas DataFrame, numpy ndarray, polars DataFrame, and Python dictionary.")
            raise TypeError("Unsupported input type: {}".format(input_type))
        return input_types[input_type](input_value)
    except Exception as e:
        return "Error: {}".format(e)

In [35]:

from polars.testing import assert_frame_equal
def test_read_dataframe():
    # Define test inputs
    df = pd.DataFrame({'col1': [1, 2], 'col2': [3, 4]})
    arr = np.array([[1, 2, 3], [4, 5, 6]])
    pl_df = pl.DataFrame({'col1': [1, 2], 'col2': [3, 4]})
    d = {'a': 1, 'b': 2}
    lst = [[1, 2, 3],[2,3,4]]
    
    # Define expected outputs
    expected_df = pl.DataFrame({'col1': [1, 2], 'col2': [3, 4]})
    expected_arr = pl.DataFrame(np.array([[1, 2, 3], [4, 5, 6]]))
    expected_pl_df = pl.DataFrame({'col1': [1, 2], 'col2': [3, 4]})
    expected_d = pl.DataFrame({'a': [1], 'b': [2]})
    expected_lst = pl.DataFrame([[1, 2, 3],[2,3,4]])
    
    # Call the function on each input and check the output
    assert_frame_equal(read_dataframe(df), expected_df)
    assert_frame_equal(read_dataframe(arr), expected_arr)
    assert_frame_equal(read_dataframe(pl_df), expected_pl_df)
    assert_frame_equal(read_dataframe(d), expected_d)
    assert_frame_equal(read_dataframe(lst), expected_lst)
   

In [28]:
from collections import namedtuple
class compareDataFrames:
    def __init__(self, tested, cast_numeric=True, tolerance=6):
        self.tested = tested
        self._tolerance = 6
        self.cast_numeric = cast_numeric
        self._numeric_types = [pl.Int64, pl.Float64, pl.UInt64, pl.Int32, pl.Float32, pl.UInt32, pl.Int16, pl.UInt16, pl.Int8, pl.UInt8]
        self._result = {}
        self._mistmatched_schema = {}
        
    def compareSchemaOfDataFrames (self, expected):
      if self.tested.schema == expected.schema:
        self._result['schemaTest'] = True
        print("Schema for the tested and the expected dataframes are the same.")
      else:
        self._result['schemaTest'] = False
        self._mistmatched_schema =  {name: [self.tested.schema[left], expected.schema[right]] for name, left, right in zip(('Tested', 'Expected'), self.tested.schema, expected.schema) if self.tested.schema[left] != expected.schema[right]}        
    
    def partitionColumnsOnType (self, expected) -> namedtuple:      
      typePartitioned = namedtuple('type_partitioned', 'numeric non_numeric')
      return typePartitioned(expected.select(pl.col(self._numeric_types)), expected.select(pl.col("*").exclude(self._numeric_types))) 
    
    def getCommonColumns (self, expected) -> list:
      common_columns = set(self.tested.columns).intersection(expected.columns)
      type_mismatched_columns = set(self._mistmatched_schema.keys())
      type_mismatched_non_numeric_columns = set(key for key, tested_type, expected_type in self._mistmatched_schema.values() if tested_type not in self._numeric_types and expected_type not in self._numeric_types)
      if common_columns in type_mismatched_columns:
        print("There are columns with the same name but different data types.")
        if self.cast_numeric:
          print("The numeric columns will be casted to the numeric type of the expected dataframes and non-numeric columns will be dropped.")
          common_columns = common_columns - type_mismatched_non_numeric_columns    
      return common_columns
    
    def castNumericColumnsIntoSameType (self, expected, common_columns) -> pl.DataFrame:
      type_mismatched_numeric_columns = set(key for key, tested_type, expected_type in self._mistmatched_schema.values() if tested_type in self._numeric_types and expected_type in self._numeric_types and tested_type != expected_type)
      for column in type_mismatched_numeric_columns:
          self.tested = self.tested.with_column(self.tested[column].cast(expected[column].dtype))
      return self.tested
    
    def generateDifferencedDataFrameForNumericColumns (self, expected) -> pl.DataFrame:
      differenced = self.tested - expected
      return differenced
    
    def getMismatchedColumns (self, expected) -> list:
      differenced = self.generateDifferencedDataFrameForNumericColumns(expected)
      mismatched_column = namedtuple('mismatched_column', ['column_name','number_of_mismatches, relative_number_of_mismatches, maximum_deviation, minimum_deviation, mean_deviation, median_deviation, stdDev_deviation, relative_mean_deviation'])
      mismatched_columns = []
      for column in differenced.columns:
        print(f"Testing column: {column}")
        max_diff = differenced[column].max().round(self._tolerance)
        min_diff = differenced[column].min().round(self._tolerance)
        if max_diff == min_diff == 0:
          pass
        else:
          number_of_mismatches = differenced[column].filter(pl.col(column) != 0).count()
          relative_number_of_mismatches = number_of_mismatches / differenced[column].count()
          maximum_deviation = max_diff
          minimum_deviation = min_diff
          mean_deviation = differenced[column].mean().round(self._tolerance)
          median_deviation = differenced[column].median().round(self._tolerance)
          stdDev_deviation = differenced[column].std().round(self._tolerance)
          relative_mean_deviation = mean_deviation / expected[column].mean().round(self._tolerance)
      mismatched_columns.append(mismatched_column(column, number_of_mismatches, relative_number_of_mismatches, maximum_deviation, minimum_deviation, mean_deviation, median_deviation, stdDev_deviation, relative_mean_deviation))
      print(f"{len(mismatched_columns)} columns have mismatched values between the tested and the expected dataframes.")
      return mismatched_columns
    
       
       
          
          
          
          
        
        
    
    
    
        
      
    
    
    def __call__(self, func):
        def wrapper(*args, **kwargs):
          expected = func(*args, **kwargs)
          return expected
        return wrapper
    
    


In [110]:
from collections import namedtuple
def partition_columns_on_type (df) -> namedtuple:
  numeric_columns = [pl.Int64, pl.Float64, pl.UInt64, pl.Int32, pl.Float32, pl.UInt32, pl.Int16, pl.UInt16, pl.Int8, pl.UInt8]
  typePartitioned = namedtuple('type_partitioned', 'numeric non_numeric')
  return typePartitioned(df.select(pl.col(numeric_columns)), df.select(pl.col("*").exclude(numeric_columns)))

In [114]:
df = pl.read_parquet('glassdoor.parquet')
df1 = pl.read_parquet('glassdoor.parquet')

In [117]:
df = partition_columns_on_type(df).numeric
df1 = partition_columns_on_type(df1).numeric

In [119]:
diff = df - df1

In [138]:
diff.sum().to_dicts()[0]

{'Jobs': 0, 'Salaries': 0, 'Reviews': 0, 'Rating': 0.0, 'Employees': 0.0}

In [144]:
def get_mismatched_columns (diff):
  misma
  return [key for key in  if diff.sum().to_dicts()[0][key] == 0]

2.067050999999992