<a href="https://colab.research.google.com/github/kyleco/decision/blob/master/abtest-simulator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [33]:
%pip install jupysql --quiet

%pip install pyarrow pandas --quiet
%pip install duckdb --pre --upgrade --quiet
%pip install jupysql duckdb duckdb-engine --quiet

%load_ext sql
%config SqlMagic.displaycon = True

from typing import TypedDict
import pyarrow.parquet as pq
import pandas
import glob
import duckdb
import uuid
from sqlalchemy import create_engine, insert, values, table, column
import time 

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


In [2]:
%config SqlMagic.displaycon = True
engine = create_engine("duckdb:///:memory:")
%sql engine

In [8]:
%%sql 
show tables

*  duckdb:///:memory:
Done.


name
analytics_events


In [71]:
class AnalyticsEvent(TypedDict):
    created: float
    event: str
    user_id: str
    id: str


class AnalyticsDB:
  def __init__(self, dbengine):
    self._events_cache: list[AnalyticsEvent] = []
    self.dbengine = dbengine
    self.dbengine.execute("""
      CREATE TABLE IF NOT EXISTS analytics_events (
        created double,
        event varchar,
        user_id varchar,
        id varchar GENERATED ALWAYS AS (uuid())
      )
      """)
  
  def log(self, event: AnalyticsEvent):
    self._events_cache.append(event)
  
  def flush(self):
    try:
      events_cache_df = pd.DataFrame(self._events_cache)
      self.dbengine.execute("INSERT INTO analytics_events SELECT * FROM events_cache_df")
    except Exception as e:
      print(e)
    else:
      self._events = []
      print("Flushed cache to DB")

class User: 
  def __init__(self, id: str):
    self.user_id = id
  
  def get_success_event(self) -> AnalyticsEvent:
    return AnalyticsEvent(created=time.time(), event='success', user_id=self.user_id)

class UserFactory: 
  def __init__(self, dbengine):
    self.cumulative_users = 0
    self.dbengine = dbengine
    self.dbengine.execute("""
      CREATE TABLE IF NOT EXISTS users (
        created double,
        user_id varchar
      )
      """)
  
  def create(self):
    try:
      self.cumulative_users += 1
      id = f"u{self.cumulative_users}"
      user = User(id)
      stmt = (
          insert(table("users", column("user_id"), column("created"))).
          values(user_id=id, created=time.time())
      )
      print(stmt)
      self.dbengine.execute(stmt)
    except Exception as e:
      print("Exception:" , e)
    else:
      return user


INSERT INTO users (user_id, created) VALUES (:user_id, :created)


<__main__.User at 0x7f405618d040>

In [64]:
user1

None


In [69]:
engine.execute("DROP TABLE IF EXISTS analytics_events;")
analytics = AnalyticsDB(engine)  
user_factory = UserFactory(engine)
user1 = user_factory.create()
user2 = user_factory.create()

analytics.log(user1.get_success_event())
analytics.log(user2.get_success_event())
analytics.flush()


INSERT INTO users (user_id, created) VALUES (:user_id, :created)
INSERT INTO users (user_id, created) VALUES (:user_id, :created)
Flushed cache to DB


In [70]:
%%sql
select created, event, user_id from analytics_events


*  duckdb:///:memory:
Done.


created,event,user_id
1676146578.6551409,success,u1
1676146578.6551807,success,u2


In [40]:
class User: 
  def __init__(self):
    self.user_id = str(uuid.uuid1())
  
  def get_success_event(self) -> AnalyticsEvent:
    return AnalyticsEvent(created=time.time(), event='success', user_id=self.user_id)

user = User()
user.get_success_event()
#engine.execute("PRAGMA database_list;").fetchall()
#con.execute("SHOW TABLES;").fetchall()

{'created': 1676145308.8883953,
 'event': 'success',
 'user_id': 'fcc9814c-aa45-11ed-90bd-0242ac1c000c'}

In [None]:
#%sql -l
#%%sql duckdb:///:memory:
#SHOW TABLES
# PRAGMA database_list;

In [48]:
import pyarrow.parquet as pq
import pandas
import glob
import duckdb
%load_ext sql

# some DuckDB setup 
#con = duckdb.connect(database='my-db.duckdb')
con = duckdb.connect(database=':memory:')
# enable automatic query parallelization
con.execute("PRAGMA threads=2")
# enable caching of parquet metadata
con.execute("PRAGMA enable_object_cache")


#%%sql duckdb:///:memory:
#select * from analytics_events

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


<duckdb.DuckDBPyConnection at 0x7fa402112470>

In [7]:
%%sql duckdb:///:memory:
SHOW TABLES

UsageError: An error happened while creating the connection: Can't load plugin: sqlalchemy.dialects:duckdb.

To fix it:

Pass a valid connection string:
    Example: %sql postgresql://username:password@hostname/dbname

For technical support: https://ploomber.io/community
Documentation: https://jupysql.ploomber.io/en/latest/connecting.html


In [1]:
import pyarrow.parquet as pq
import pandas
import glob
import duckdb

# some DuckDB setup 
#con = duckdb.connect()
con = duckdb.connect(database=':memory:')
# enable automatic query parallelization
con.execute("PRAGMA threads=2")
# enable caching of parquet metadata
con.execute("PRAGMA enable_object_cache")

#from fugue_notebook import setup
#import fugue_duckdb
#{"fugue.sql.compile.ignore_case": True}
#setup(fsql_ignore_case=True) #conf={"fugue.sql.compile.ignore_case": True})

ModuleNotFoundError: ignored

In [3]:
%%time
con.execute("SELECT * FROM 'taxi/*.parquet' LIMIT 5").df()

CPU times: user 32 ms, sys: 6.62 ms, total: 38.6 ms
Wall time: 45.7 ms


Unnamed: 0,vendor_id,pickup_at,dropoff_at,passenger_count,trip_distance,rate_code_id,store_and_fwd_flag,pickup_location_id,dropoff_location_id,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2019-04-01 00:04:09,2019-04-01 00:06:35,1,0.5,1,N,239,239,1,4.0,3.0,0.5,1.0,0.0,0.3,8.8,2.5
1,1,2019-04-01 00:22:45,2019-04-01 00:25:43,1,0.7,1,N,230,100,2,4.5,3.0,0.5,0.0,0.0,0.3,8.3,2.5
2,1,2019-04-01 00:39:48,2019-04-01 01:19:39,1,10.9,1,N,68,127,1,36.0,3.0,0.5,7.95,0.0,0.3,47.75,2.5
3,1,2019-04-01 00:35:32,2019-04-01 00:37:11,1,0.2,1,N,68,68,2,3.5,3.0,0.5,0.0,0.0,0.3,7.3,2.5
4,1,2019-04-01 00:44:05,2019-04-01 00:57:58,1,4.8,1,N,50,42,1,15.5,3.0,0.5,3.85,0.0,0.3,23.15,2.5


In [4]:
pq.write_table(pq.ParquetDataset('taxi/').read(), 'alltaxi.parquet', row_group_size=100000)

In [5]:
%%time
con.execute("SELECT * FROM 'alltaxi.parquet' LIMIT 5").df()

CPU times: user 27.4 ms, sys: 5.58 ms, total: 32.9 ms
Wall time: 28 ms


Unnamed: 0,vendor_id,pickup_at,dropoff_at,passenger_count,trip_distance,rate_code_id,store_and_fwd_flag,pickup_location_id,dropoff_location_id,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2019-04-01 00:04:09,2019-04-01 00:06:35,1,0.5,1,N,239,239,1,4.0,3.0,0.5,1.0,0.0,0.3,8.8,2.5
1,1,2019-04-01 00:22:45,2019-04-01 00:25:43,1,0.7,1,N,230,100,2,4.5,3.0,0.5,0.0,0.0,0.3,8.3,2.5
2,1,2019-04-01 00:39:48,2019-04-01 01:19:39,1,10.9,1,N,68,127,1,36.0,3.0,0.5,7.95,0.0,0.3,47.75,2.5
3,1,2019-04-01 00:35:32,2019-04-01 00:37:11,1,0.2,1,N,68,68,2,3.5,3.0,0.5,0.0,0.0,0.3,7.3,2.5
4,1,2019-04-01 00:44:05,2019-04-01 00:57:58,1,4.8,1,N,50,42,1,15.5,3.0,0.5,3.85,0.0,0.3,23.15,2.5


In [19]:
from fugue_notebook import setup
import fugue_duckdb
#{"fugue.sql.compile.ignore_case": True}
setup(fsql_ignore_case=True) #conf={"fugue.sql.compile.ignore_case": True})



<IPython.core.display.Javascript object>

In [73]:
%%fsql duck
df = LOAD "/content/alltaxi.parquet"
SELECT * FROM df
YIELD DATAFRAME AS result


FloatProgress(value=0.0, layout=Layout(width='100%'), style=ProgressStyle(bar_color='black'))

In [60]:
%%fsql duck
select * from result
print

Unnamed: 0,vendor_id:str,pickup_at:datetime,dropoff_at:datetime,passenger_count:byte,trip_distance:float,rate_code_id:str,store_and_fwd_flag:str,pickup_location_id:int,dropoff_location_id:int,payment_type:str,fare_amount:float,extra:float,mta_tax:float,tip_amount:float,tolls_amount:float,improvement_surcharge:float,total_amount:float,congestion_surcharge:float
0,1,2019-04-01 00:04:09,2019-04-01 00:06:35,1,0.5,1,N,239,239,1,4.0,3.0,0.5,1.0,0.0,0.3,8.8,2.5
1,1,2019-04-01 00:22:45,2019-04-01 00:25:43,1,0.7,1,N,230,100,2,4.5,3.0,0.5,0.0,0.0,0.3,8.3,2.5
2,1,2019-04-01 00:39:48,2019-04-01 01:19:39,1,10.9,1,N,68,127,1,36.0,3.0,0.5,7.95,0.0,0.3,47.75,2.5
3,1,2019-04-01 00:35:32,2019-04-01 00:37:11,1,0.2,1,N,68,68,2,3.5,3.0,0.5,0.0,0.0,0.3,7.3,2.5
4,1,2019-04-01 00:44:05,2019-04-01 00:57:58,1,4.8,1,N,50,42,1,15.5,3.0,0.5,3.85,0.0,0.3,23.15,2.5
5,1,2019-04-01 00:29:16,2019-04-01 00:38:00,1,1.7,1,N,95,196,2,8.5,0.5,0.5,0.0,0.0,0.3,9.8,0.0
6,1,2019-04-01 00:06:47,2019-04-01 00:08:15,1,0.0,1,N,211,211,3,3.0,3.0,0.5,0.0,0.0,0.3,6.8,2.5
7,1,2019-04-01 00:52:16,2019-04-01 00:55:10,1,0.2,1,N,237,162,1,4.0,3.0,0.5,0.0,0.0,0.3,7.8,2.5
8,2,2019-04-01 00:52:28,2019-04-01 01:11:24,1,4.15,1,N,148,37,2,16.5,0.5,0.5,0.0,0.0,0.3,20.299999,2.5
9,1,2019-04-01 00:02:19,2019-04-01 00:03:05,1,0.0,5,N,265,265,2,0.01,0.0,0.0,0.0,0.0,0.3,0.31,0.0


In [76]:
%%fsql duck
--df = LOAD "/content/alltaxi.parquet"
select
pickup_location_id, count(*) as n
from result
group by 1
print

Unnamed: 0,pickup_location_id:int,n:long
0,237,972022
1,236,872437
2,162,789572
3,239,570101
4,138,592378
5,161,895439
6,164,500130
7,140,383446
8,186,757319
9,246,376202


CPU times: user 476 ms, sys: 5.31 ms, total: 481 ms
Wall time: 413 ms


In [52]:
import pandas as pd

class AnalyticsDB:
  def __init__(self):
    self._events = []
  
  def log(self, event):
    self._events.append(event)
  
  def flush(self):
    try:
      df_events = pd.DataFrame(self._events)
      engine.execute("CREATE TABLE IF NOT EXISTS analytics_events AS SELECT * FROM df_events")
      #con.execute("INSERT INTO my_table SELECT * FROM my_df")
    except Exception as e:
      print(e)
    else:
      self._events = []
    
analytics = AnalyticsDB()  
import time 

e = {'created': time.time(), 'event': 'success'}
analytics.log(e)
analytics.flush()

In [53]:
analytics = AnalyticsDB()

In [54]:
#pd.Timestamp.now()
import time 

e = {'created': time.time(), 'event': 'success'}
analytics.log(e)

In [55]:
analytics.flush()


In [56]:
%%sql
SHOW TABLES;

   duckdb://
*  duckdb:///:memory:
Done.


name


In [58]:
#engine.execute("SELECT * FROM analytics_events").fetchall()
engine.execute("SELECT * FROM analytics_events").fetchall()

[(1676143470.7690535, 'success')]

In [29]:
print(con.fetchall())

[(1676142822.303907, 'success')]


In [59]:
%sql engine

In [61]:
%%sql
select * from analytics_events

   duckdb://
*  duckdb:///:memory:
Done.


created,event
1676143470.7690537,success


In [32]:
#pip install jupysql
con.execute("PRAGMA database_list;").fetchall()


[(4, 'memory', None)]

In [31]:
%load_ext sql


In [33]:
%%sql duckdb:///:memory:
select * from analytics_events

Connection info needed in SQLAlchemy format, example:
               postgresql://username:password@hostname/dbname
               or an existing connection: dict_keys([])
Can't load plugin: sqlalchemy.dialects:duckdb
Connection info needed in SQLAlchemy format, example:
               postgresql://username:password@hostname/dbname
               or an existing connection: dict_keys([])


In [35]:
%sql -l

{}