In [25]:
import time
import pandas as pd

In [26]:
nval = 5  #number of validation runs to build runtime average over
sf = "3M"
df = pd.read_csv(f"../data/hotel_search_logs_{sf}_small.csv")

## DuckDQ & DuckDB

In [3]:
import duckdb
import duckdq
import multiprocessing

In [4]:
con = duckdb.connect(":memory:")
con.register("hotel_search_logs", df)

<duckdb.DuckDBPyConnection at 0x7ff7d9b265b0>

In [5]:
runtimes = 0
for i in range(nval):
    start = time.time()
    checkResult = duckdq.VerificationSuite() \
        .on_table(con,"hotel_search_logs") \
        .add_check(
            duckdq.Check(duckdq.CheckLevel.EXCEPTION, "Check Error")
              .is_complete("date_time") 
              .is_complete("site_name")  
              .is_complete("posa_continent") 
              .is_complete("user_location_country")
              .is_complete("user_location_city") 
              .is_unique("orig_destination_distance")
              .is_unique("user_id")
              .is_unique("is_mobile") 
              .is_unique("is_package") 
              .is_unique("channel")).run()
    end = time.time()
    runtimes += end-start
print(f"DDQ+DDB (Validation, {sf} rows): {runtimes/nval}")

DDQ+DDB (Validation, 3M rows): 0.634633207321167


## great_expectations & DuckDB

In [6]:
import great_expectations as ge
import sqlalchemy
import duckdb
from sqlalchemy import create_engine
from sqlalchemy.engine.url import registry
import duckdb_engine

def execute_patch(self, statement, parameters, context):
        new_statement = ""
        i = 0
        for c in statement:
            if c == "?":
                new_statement += str(parameters[i])
                i += 1
            else:
                new_statement += c
        self.c.execute(new_statement)
duckdb_engine.ConnectionWrapper.execute = execute_patch

def connect_patch(self, *args, **kwargs):
        con = duckdb.connect(*args, **kwargs)
        con.register("hotel_search_logs", df)
        return duckdb_engine.ConnectionWrapper(con)
duckdb_engine.Dialect.connect = connect_patch

get_columns_original = sqlalchemy.engine.reflection.Inspector.get_columns
def get_columns_patch(self, table_name, schema=None, **kw):
    raise KeyError()
sqlalchemy.engine.reflection.Inspector.get_columns = get_columns_patch

In [7]:
registry.register("duckdb", "duckdb_engine", "dialect")
eng = create_engine("duckdb:///:memory:")

In [8]:
ds = ge.dataset.SqlAlchemyDataset(table_name="hotel_search_logs", engine=eng, connection_string="duckdb:///:memory:")

In [9]:
runtimes = 0
for i in range(nval):
    start = time.time()
    ds.expect_column_values_to_not_be_null("date_time")
    ds.expect_column_values_to_not_be_null("site_name")
    ds.expect_column_values_to_not_be_null("posa_continent")
    ds.expect_column_values_to_not_be_null("user_location_country")
    ds.expect_column_values_to_not_be_null("user_location_city")
    ds.expect_column_values_to_be_unique("orig_destination_distance")
    ds.expect_column_values_to_be_unique("user_id")
    ds.expect_column_values_to_be_unique("is_mobile")
    ds.expect_column_values_to_be_unique("is_package")
    ds.expect_column_values_to_be_unique("channel")
    end = time.time()
    runtimes += end-start
print(f"GE+DDB (Validation, {sf} rows): {runtimes/nval}")

GE+DDB (Validation, 3M rows): 1.7348392963409425


## Data Transfer for MySQL

In [10]:
import sqlalchemy
import pymysql
from sqlalchemy import create_engine
sqlalchemy.engine.reflection.Inspector.get_columns = get_columns_original

In [9]:
engine = create_engine("mysql+pymysql://root:root@172.28.0.2/test")
con = engine.connect()

In [None]:
start = time.time()
df[0:1000000].to_sql(con=con, name='hotel_search_logs', if_exists='replace')
df[1000000:2000000].to_sql(con=con, name='hotel_search_logs', if_exists='append')
df[2000000:].to_sql(con=con, name='hotel_search_logs', if_exists='append')
end = time.time()
print(f"DDQ/GE+MYSQL (Data Transfer, {sf} rows): {end-start}")

## DuckDQ & MySQL

In [None]:
import sqlalchemy
import duckdq
import pymysql
from sqlalchemy import create_engine

In [None]:
runtimes = 0
for i in range(nval):
    start = time.time()
    checkResult = duckdq.VerificationSuite() \
        .on_table(con,"hotel_search_logs") \
        .add_check(
            duckdq.Check(duckdq.CheckLevel.EXCEPTION, "Check Error")
              .is_complete("date_time") 
              .is_complete("site_name")  
              .is_complete("posa_continent") 
              .is_complete("user_location_country")
              .is_complete("user_location_city") 
              .is_unique("orig_destination_distance")
              .is_unique("user_id")
              .is_unique("is_mobile") 
              .is_unique("is_package") 
              .is_unique("channel")).run()
    end = time.time()
    runtimes += end-start
print(f"DDQ+MYSQL (Validation, {sf} rows): {runtimes/nval}")

## great_expectations & MySQL

In [None]:
import sqlalchemy
import pymysql
from sqlalchemy import create_engine
import great_expectations as ge

In [None]:
ds = ge.dataset.SqlAlchemyDataset(table_name="hotel_search_logs", engine=engine, connection_string="mysql+pymysql://root:root@172.28.0.2/test")

In [None]:
runtimes = 0
nval = 1
for i in range(nval):
    start = time.time()
    ds.expect_column_values_to_not_be_null("date_time")
    ds.expect_column_values_to_not_be_null("site_name")
    ds.expect_column_values_to_not_be_null("posa_continent")
    ds.expect_column_values_to_not_be_null("user_location_country")
    ds.expect_column_values_to_not_be_null("user_location_city")
    ds.expect_column_values_to_be_unique("orig_destination_distance")
    ds.expect_column_values_to_be_unique("user_id")
    ds.expect_column_values_to_be_unique("is_mobile")
    ds.expect_column_values_to_be_unique("is_package")
    ds.expect_column_values_to_be_unique("channel")
    end = time.time()
    runtimes += end-start
print(f"GE+MYSQL (Validation, {sf} rows): {runtimes/nval}")

## Data Transfer Vaex, Dask, datatable

In [39]:
from dask import dataframe as dd 
start = time.time()
sd = dd.from_pandas(df, npartitions=3)
sd.compute()
end = time.time()
print(f"Dask (Pandas Rountrip, {sf} rows): {end-start}")

Dask (Pandas Rountrip, 3M rows): 0.7147440910339355


In [40]:
import datatable as dt
start = time.time()
dtt = dt.Frame(df)
dtt.to_pandas()
end = time.time()
print(f"datatable (Pandas Roundtrip, {sf} rows): {end-start}")

datatable (Pandas Roundtrip, 3M rows): 1.2152934074401855


In [41]:
import vaex
start = time.time()
vxx = vaex.from_pandas(df)
vxx.to_pandas_df()
end = time.time()
print(f"Vaex (Pandas Roundtrip, {sf} rows): {end-start}")

Vaex (Pandas Roundtrip, 3M rows): 1.1421279907226562
