# Overview

Welcome to a demo of snapshot and real time replication to Databricks.

Use this notebook customized schema, data, workload, and **legacy** Arcion.

**NOTE**: **Databricks Personal Access Token** and **Arcion License** are required. 

- Initial Setup
  - Open `Table of Contents` (Outline)
  - Enter `Arcion License`
  - Enter `Databricks Personal Access Token`
  - Click `Run All`
  - Click `View` -> `Results Only`
  - Click `View` -> `Web Terminal`, 
    - enter `tmux attach`.  
      - If fails with `session not found`, then wait a bit retry.
    - In the `tmux`'s console window, `htop` will be displayed during the setup.
    - Once the setup is complete, Arcion snapshot summary will be displayed.
    - Wait for the setup to finish and the snapshot to complete. 
    - Takes about 5 minutes in for the setup to finish.
- Iterate with the following:
  - Configure Schema and Data
  - Configure Workload
  - Configure Arcion

## Where is Data in Databricks
  - Spark (Delta Lake) uses **Hive Meta Store** catalog: 
    - Open new tab Catalog -> hive_metastore -> <your username>
    - find ycsbdense and ycsbsparse tables 
  - Lakehouse uses **Unity Catalog** catalog: 
    - Open new tab Catalog -> <your username> 
    - find ycsbdense and ycsbsparse tables 

## Frequent Demo Configurations
- Step 1
  - Click Real-Time
  - Run just Arcion
  - Change YCSB Size
  - Watch real-time performance
- Step 2
  - Click Unity Catalog target
  - Select full replication mode
  - Run just Arcion

# Personal Compute Cluster

Choose at least 16GB of RAM for a demo.

Processes use RAM.  The following is the minimum RAM usage.  The server needs enough RAM to avoid swapping.
- Databricks: 5GB 
- SQL Server: 2GB
- Arcion: 10% of server RAM.

Note:
- `vmstat 5`.  any non zero metrics under the `si` and `so` columns (swap in and swap out) indicate RAM shortage. 
- DBR 13 does not print output of subprocess.run 

In [1]:
CONFIG_FILE="./env/perf1-dbo.sh"

In [2]:
%pip install file-read-backwards 
%pip install deepdiff
%pip install bpytop
%pip install mlflow

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.
Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.
Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.
Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [24]:
# prep python env
import subprocess
import math
import pandas as pd
import re
import ipywidgets as widgets
import os
import pathlib
import json
import requests
import deepdiff
import time
from pathlib import Path
from ipywidgets import HBox, VBox, Label
from file_read_backwards import FileReadBackwards
from pylib.mlflowutils import *

def nine_char_id():
    # %s   seconds since 1970-01-01 00:00:00 UTC
    # %N   nanoseconds (000000000..999999999)
    return(hex(int(time.time_ns() / 100000000 ))[2:])

# all exp parameters 
def exp_params():
    all_params={
    # run
    "arcion_run_id": arcion_run_id,
    # arcion
    "arcion_download_url": arcion_download_url.value,
    "srcdb_arc_user": src_username.value,
    "repl_type": repl_mode.value,
    "extraction_method": extraction_method.value,
    "replicant_memory_percentage": ram_percent.value,
    "srcdb_snapshot_threads": snapshot_threads.value,
    "srcdb_realtime_threads": realtime_threads.value, 
    "srcdb_delta": delta_threads.value,
    "dstdb_type": dbx_destinations.value,
    "dstdb_stage": dbx_staging.value,
    "dbx_spark_url": dbx_spark_url.value,
    "dbx_databricks_url": dbx_databricks_url.value,
    "dbx_hostname": dbx_hostname.value,
    "dbx_dbfs_root": dbx_username.value,
    "dbx_username": dbx_username.value,

    # schema and data
    "sparse_cntstart": sparse_cntstart.value,
    "sparse_cnt": sparse_cnt.value , 
    "sparse_fieldcount": sparse_fieldcount.value, 
    "sparse_fieldlength": sparse_fieldlength.value, 
    "sparse_recordcount": sparse_recordcount.value, 
    "sparse_fillpct_start": sparse_fillpct.value[0],
    "sparse_fillpct_end": sparse_fillpct.value[1],
    "dense_cntstart": dense_cntstart.value, 
    "dense_cnt": dense_cnt.value, 
    "dense_fieldcount": dense_fieldcount.value, 
    "dense_fieldlength": dense_fieldlength.value, 
    "dense_recordcount": dense_recordcount.value, 
    "dense_fillpct_start": dense_fillpct.value[0],
    "dense_fillpct_end": dense_fillpct.value[1],

    # workload
    "sparse_tps": sparse_tps.value,
    "dense_tps": dense_tps.value,
    "sparse_threads": sparse_threads.value,
    "dense_threads": dense_threads.value,
    "sparse_multiUpdateSize": sparse_multiupdatesize.value,
    "sparse_multiInsertSize": sparse_multiinsertsize.value,
    "sparse_multiDeleteSize": sparse_multideletesize.value,
    "dense_multiUpdateSize": dense_multiupdatesize.value,
    "dense_multiInsertSize": dense_multiinsertsize.value,
    "dense_multiDeleteSize": dense_multideletesize.value,
    "ram_percent_ycsb": ram_percent_ycsb.value,

    # database
    "ram_mb_sqlserver": ram_mb_sqlserver.value,

    }

    # cluster
    try:
        all_params["spark.databricks.clusterUsageTags.clusterNodeType"] = spark.conf.get("spark.databricks.clusterUsageTags.clusterNodeType")
        all_params["spark.databricks.clusterUsageTags.cloudProvider"]  =  spark.conf.get("spark.databricks.clusterUsageTags.cloudProvider")
    except:
        pass

    return(all_params)

arcion_run_id=None

# used to start new MLFlow when parameters changes 
try:
    mlflow_proc_state
except:
    mlflow_proc_state={}

try:
    previous_exp_params
except:
    previous_exp_params={}
try:
    current_exp_params
except:
    current_exp_params={}

try:
    ycsb_logfile_positions
except:
    ycsb_logfile_positions={}
try:
    ycsb_metrics
except:
    ycsb_metrics={}
try:
    previous_log_time
except:
    previous_log_time=None    

# arcion statistics CSV
arcion_stats_csv_header_lines="catalog_name,schema_name,table_name,snapshot_start_range,snapshot_end_range,start_time,end_time,insert_count,update_count,upsert_count,delete_count,elapsed_time_sec,replicant_lag,total_lag"
arcion_key_index={'insert_count':7,'update_count':8,'upsert_count':9,'delete_count':10,'elapsed_time_sec':11,'replicant_lag':12,'total_lag':13}
arc_stat_catalog_name_idx=0
arc_stat_schema_name_idx=1
arc_stat_table_name_idx=2
arc_stat_start_time_idx=5
arc_stat_end_time_idx=6
arc_stat_insert_count_idx=7
arc_stat_update_count_idx=8
arc_stat_upsert_count_idx=9
arc_stat_delete_count_idx=10
arc_stat_replicant_lag_idx=12
arc_stat_total_lag_idx=13
arc_default_lag=9223372036854775807

try:
    arcion_stats_csv_positions
except:
    arcion_stats_csv_positions={}

# setup GUI elements

repl_mode = widgets.Dropdown(options=['snapshot', 'real-time', 'full'],value='real-time',
    description='Replication:',
)
cdc_mode = widgets.Dropdown(options=['change', 'cdc'],value='change',
    description='CDC Method:',
)
ram_percent = widgets.BoundedIntText(value=10,min=10,max=80,
    description='RAM %:',
)

extraction_method = widgets.Dropdown(options=['BCP', 'QUERY'],value='QUERY',
    description='Extraction Method:',
)

ram_percent_ycsb = widgets.BoundedIntText(value=1,min=1,max=80,
    description='RAM %:',
)

ram_mb_sqlserver = widgets.BoundedIntText(value=1024,min=1,max=8096,
    description='RAM MB:',
)

snapshot_threads = widgets.BoundedIntText(value=1,min=1,max=8,
    description='Snapshot Threads:',
)

realtime_threads = widgets.BoundedIntText(value=1,min=1,max=8,
    description='Real Time Threads:',
)    

delta_threads = widgets.BoundedIntText(value=1,min=1,max=8,
    description='Delta Snapshot Threads:',
)    

dbx_destinations = widgets.Dropdown(options=['null', 'deltalake', 'unitycatalog'],value='null',
    description='Destinations:',
)
try:
    if spark.conf.get("spark.databricks.unityCatalog.enabled")=='false':
        dbx_destinations = widgets.Dropdown(options=['null', 'deltalake'],value='null', description='Destinations:',)
except:
    pass

dbx_staging = widgets.Dropdown(options=['dbfs'],value='dbfs',
    description='Staging:',
)

sparse_cnt = widgets.BoundedIntText(value=4,min=1,max=1000,
    description='Tbl End:',
)
sparse_cntstart = widgets.BoundedIntText(value=1,min=1,max=1000,
    description='Tbl Start:',
)

sparse_fieldcount = widgets.BoundedIntText(value=50,min=0,max=9000,
    description='# of Fields:',
)
sparse_fieldlength = widgets.BoundedIntText(value=10,min=1,max=1000,
    description='Field Len:',
)

sparse_tps = widgets.BoundedIntText(value=1,min=0,max=10000,
    description='TPS:',
)
sparse_threads = widgets.BoundedIntText(value=1,min=1,max=8,
    description='Threads:',
)
sparse_recordcount = widgets.Text(value="1M",
    description='Rec Cnt:',
)

sparse_fillpct = widgets.IntRangeSlider(value=[0,0],min=0,max=100,step=1,
    description='Fill Range:', orientation='horizontal', readout=False
)

dense_cnt = widgets.BoundedIntText(value=2,min=1,max=1000,
    description='Tbl End:',
)
dense_cntstart = widgets.BoundedIntText(value=1,min=1,max=1000,
    description='Tbl Start:',
)

dense_fieldcount = widgets.BoundedIntText(value=10,min=0,max=9000,
    description='# of Fields:',
)
dense_fieldlength = widgets.BoundedIntText(value=100,min=1,max=1000,
    description='Field Len:',
)
dense_recordcount = widgets.Text(value="100K",
    description='Rec Cnt:',
)

dense_tps = widgets.BoundedIntText(value=1,min=0,max=10000,
    description='TPS:',
)
dense_threads = widgets.BoundedIntText(value=1,min=1,max=8,
    description='Threads:',
)

delupdins_proportion = widgets.IntRangeSlider(value=[1,999],min=0,max=1000,step=1,
    description='Del Upd Ind:', orientation='horizontal', readout=True
)

# sqlserver max is 2100 total perpared parameters
dense_multiupdatesize = widgets.BoundedIntText(value=100,min=0,max=2000, description='Upd TPS:')
dense_multiinsertsize = widgets.BoundedIntText(value=1,min=0,max=2000, description='Ins TPS:')
dense_multideletesize = widgets.BoundedIntText(value=1,min=0,max=2000, description='Del TPS:')

sparse_multiupdatesize = widgets.BoundedIntText(value=100,min=0,max=2000, description='Upd TPS:')
sparse_multiinsertsize = widgets.BoundedIntText(value=1,min=0,max=2000, description='Ins TPS:')
sparse_multideletesize = widgets.BoundedIntText(value=1,min=0,max=2000, description='Del TPS:')

dense_fillpct = widgets.IntRangeSlider(value=[1,99],min=0,max=100,step=1,
    description='Fill Range:', orientation='horizontal', readout=False
)

ycsb_data_gen = widgets.Dropdown(options=['special char', 'char and num'],value='char and num', description='Data Type',)

dbx_spark_url = widgets.Textarea(value='',
    description='Spark URL:',
)

dbx_databricks_url = widgets.Textarea(value='',
    description='Databricks URL:',
)

dbx_hostname = widgets.Textarea(value='',
    description='Hostname:',
)

src_username = widgets.Textarea(value='',
    description='SRC User:',
)

dbx_username = widgets.Textarea(value='',
    description='DST User:',
)

arcion_license = widgets.Textarea(value='',
    description='Lic',
)

arcion_download_url = widgets.Textarea(value='https://arcion-releases.s3.us-west-1.amazonaws.com/general/replicant/replicant-cli-24.01.25.20.zip',
    description='Download URL',
)

dbx_access_token = widgets.Password(value='',
    description='Access Token',
)

dbx_default_catalog = widgets.Textarea(value='',
    description='HMS Catalog',
)


# cluster where the notebook is running to auto populate the destinations
spark_url=""
databricks_url=""
workspaceUrl=""
username=""
try:
    cluster_id = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")
    workspace_id =spark.conf.get("spark.databricks.clusterUsageTags.clusterOwnerOrgId")

    # clusterName = spark.conf.get("spark.databricks.clusterUsageTags.clusterName")

    workspaceUrl = json.loads(dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson())['tags']['browserHostName']

    # below does not work on GCP
    # sc.getConf().getAll() to see what is avail
    # workspaceUrl = spark.conf.get("spark.databricks.workspaceUrl") # host name

    http_path = f"sql/protocolv1/o/{workspace_id}/{cluster_id}"

    spark_url=f"jdbc:spark://{workspaceUrl}:443/default;transportMode=http;ssl=1;httpPath={http_path};AuthMech=3;UID=token;"
    databricks_url=f"jdbc:databricks://{workspaceUrl}:443/default;transportMode=http;ssl=1;httpPath={http_path};AuthMech=3;UID=token;"

except:
    pass
dbx_spark_url.value = spark_url
dbx_databricks_url.value = databricks_url
dbx_hostname.value = workspaceUrl

try:
    username = spark.sql("SELECT current_user()").collect()[0][0]
    dbx_username.value = re.sub('[.@]','_',username)
    src_username.value = re.sub('[.@]','_',username)
except:
    src_username.value='arcsrc'
    dbx_username.value='arcdst'

try:
    dbx_default_catalog.value=spark.conf.get("spark.databricks.sql.initial.catalog.name")
except:
    pass

# check arcion license via os env
try:
    arclicenv=os.environ["ARCION_LICENSE"]
    if arclicenv != "": 
        arcion_license.value=arclicenv
except:
    pass

# check arcion license via dbx widget
try:
    arclicwidget=dbutils.widgets.get("Arcion License")
    if arclicwidget != "": 
        arcion_license.value=arclicwidget
        arcion_license.disabled = True
except:
    pass

# check access token via dbx widget
try:
    acctokwidget=dbutils.widgets.get("Access Token")
    if acctokwidget != "": 
        dbx_access_token.value=acctokwidget
        dbx_access_token.disabled = True
except:
    pass

try:
    CONFIG_FILE=dbutils.widgets.get("Config")
except:
    pass

# check dpkg dir via dbx widget
pkg_src_dir=widgets.Textarea(value='',
    description='Pkg Src Dir:',
)
try:
    pkgsrcdirwidget=dbutils.widgets.get("Package Source Dir")
    if pkgsrcdirwidget != "": 
        pkg_src_dir.value=pkgsrcdirwidget
        pkg_src_dir.disabled = True
except:
    pass

# check if os env has ARCION_LICENSE
try:
    arclicenv=os.getenv('ARCION_LICENSE')
    if arclicenv != "": 
        arcion_license.value=arclicenv
except:
    pass

# gcp does not change cwd to notebook path
pwd_result= subprocess.run(f"""pwd""",capture_output = True, text = True )
cwd=pwd_result.stdout.splitlines()[-1]
if (pwd_result.stdout == "/databricks/driver\n"):
    notebookpath="/Workspace" + str(pathlib.Path(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()).parent)
else:
    notebookpath = None

# optional MLflow
experiment_id=None
try:
    import mlflow
    experiment_id=dbutils.widgets.get("Experiment ID")
except:
    pass

# src_db
src_db_type = widgets.Dropdown(value='sqlserver', options=['mysql', 'postgresql', 'sqlserver'])
src_db_host = widgets.Text(value='localhost', placeholder='hostname or IP')
src_db_port = widgets.Text(value='', placeholder='port #')
src_db_user = widgets.Text(value='', placeholder='username')
src_db_pass = widgets.Text(value='', placeholder='user password')
src_db_root_user = widgets.Text(value='', placeholder='root username')
src_db_root_pass = widgets.Text(value='', placeholder='root password')

# dst_db

# change defaults based on the dropdown
data = pd.read_csv('resources/map.csv',dtype=str) 
def update_db_defaults(args=None):
    x=data.loc[(data['group']==src_db_type.value)]
    src_db_port.value = x['port'].values[0] #
    src_db_root_user.value = x['root_user'].values[0] #
    src_db_root_pass.value = x['root_pw'].values[0] #
src_db_type.observe(update_db_defaults, 'value')
update_db_defaults()

# Setup
  - Enter `Arcion License`
  - Enter `Personal Access Token` (generate **One Day** and delete afterwards)
  - Click **Menu Bar** ->  Run -> Run All Below 

## Configure

In [32]:
if (CONFIG_FILE is not None) and (CONFIG_FILE != ""):
     CONFIG_FILE=Path(CONFIG_FILE).resolve()

In [4]:
# enter license and DBX personal access token

VBox([HBox([Label('Arcion'), arcion_license, arcion_download_url,pkg_src_dir]),
      HBox([Label('DBX'), dbx_access_token, dbx_default_catalog]),
      HBox([Label('Username'), src_username, dbx_username]),
      HBox([Label('Workspace'), dbx_spark_url, dbx_databricks_url, dbx_hostname, ]),
      HBox([Label('DB RAM'), ram_mb_sqlserver, ]),
       ])

VBox(children=(HBox(children=(Label(value='Arcion'), Textarea(value='', description='Lic'), Textarea(value='ht…

## Start

In [5]:
# setup tmux, arcion, ycsb
subprocess.run(f""". ./bin/setup-tmux.sh; setup_tmux '{dbx_username.value}'""",shell=True,executable="bash",cwd=notebookpath)
subprocess.run(f"""bin/download-jars.sh""",shell=True,executable="bash",cwd=notebookpath)
subprocess.run(f"""ARCION_LICENSE='{arcion_license.value}' ARCION_DOWNLOAD_URL='{arcion_download_url.value}' bin/install-arcion.sh""",shell=True,executable="bash",cwd=notebookpath)
subprocess.run(f"""bin/install-ycsb.sh""",shell=True,executable="bash",cwd=notebookpath)

# mysql

# pg


# sqlserver
subprocess.run(f"""export CONFIG_FILE="{CONFIG_FILE}"; SQL_SERVER_DPKG='{pkg_src_dir.value}'; bin/install-sqlserver.sh""",shell=True,executable="bash",cwd=notebookpath)
subprocess.run(f"""export SRCDB_ARC_USER={src_username.value}; export CONFIG_FILE="{CONFIG_FILE}"; . ./demo/sqlserver/run-ycsb-sqlserver-source.sh; ping_sql_cli;""",shell=True,executable="bash",cwd=notebookpath)
subprocess.run(f"""export SRCDB_ARC_USER={src_username.value}; export CONFIG_FILE="{CONFIG_FILE}"; . ./demo/sqlserver/run-ycsb-sqlserver-source.sh; create_user;""",shell=True,executable="bash",cwd=notebookpath)
subprocess.run(f"""export SRCDB_ARC_USER={src_username.value}; export CONFIG_FILE="{CONFIG_FILE}"; . ./demo/sqlserver/run-ycsb-sqlserver-source.sh; MAX_SQLSERVER_RAM={ram_mb_sqlserver.value} set_sqlserver_ram '{dbx_username.value}';""",shell=True,executable="bash",cwd=notebookpath)
subprocess.run(f"""export SRCDB_ARC_USER={src_username.value}; export CONFIG_FILE="{CONFIG_FILE}"; bin/install-prometheus.sh""",shell=True,executable="bash",cwd=notebookpath)

tmux session ready. session arcdst already exists
deltalake /opt/stage/libs/SparkJDBC42.jar found
lakehouse  /opt/stage/libs/DatabricksJDBC42.jar found
postgres  /opt/stage/libs/postgresql-42.7.1.jar found
mariadb  /opt/stage/libs/mariadb-java-client-3.3.2.jar found
oracle /opt/stage/libs/ojdbc8.jar found
log4j /opt/stage/libs/log4j-1.2.17.jar found
sqlserver /opt/stage/libs/mssql-jdbc-12.6.1.jre8.jar found
arcion  /opt/stage/arcion/replicant-cli-24.01.25.20/replicant-cli/bin/replicant found
checking jar(s) in /opt/stage/arcion/replicant-cli-24.01.25.7/replicant-cli/lib for updates
checking jar(s) in /opt/stage/arcion/replicant-cli-24.01.25.1/replicant-cli/lib for updates
checking jar(s) in /opt/stage/arcion/replicant-cli-24.01.25.20/replicant-cli/lib for updates
Arcion license found
YCSB  /opt/stage/ycsb/ycsb-jdbc-binding-0.18.0-SNAPSHOT  found
numfmt found
bc found
checking jar(s) in /opt/stage/ycsb/ycsb-jdbc-binding-0.18.0-SNAPSHOT/lib for updates
apt-utils already installed
mssql-s

open terminal failed: not a terminal


/opt/stage/arcion/replicant-cli-24.01.25.20/replicant-cli/bin/replicant 24.01.25.20 24.01
PATH=/opt/stage/bin/jsqsh-dist-3.0-SNAPSHOT/bin added
source ./env/perf1-dbo.sh
/opt/stage/arcion/replicant-cli-24.01.25.20/replicant-cli/bin/replicant 24.01.25.20 24.01
PATH=/opt/stage/bin/jsqsh-dist-3.0-SNAPSHOT/bin added
creating user cdcadmin
Msg 15025, Level 16, State 1, Server arcion-perf-uswest2, Line 1
The server principal 'cdcadmin' already exists.
Msg 1801, Level 16, State 1, Server arcion-perf-uswest2, Line 1
Database 'perf1' already exists. Choose a different database name.
Msg 40508, Level 16, State 1, Server arcion-perf-uswest2, Line 1
USE statement is not supported to switch between databases. Use a new connection to connect to a different database.
Msg 15023, Level 16, State 5, Server arcion-perf-uswest2, Line 1
User, group, or role 'cdcadmin' already exists in the current database.
Msg 15151, Level 16, State 1, Server arcion-perf-uswest2, Line 1
Cannot alter the role 'db_owner', b

CompletedProcess(args='export SRCDB_ARC_USER=arcsrc; export CONFIG_FILE="./env/perf1-dbo.sh"; bin/install-prometheus.sh', returncode=0)

In [15]:
# TODO: clean up the hack need for mlflow ycsb log parsing
x=subprocess.run(f"""export CONFIG_FILE="{CONFIG_FILE}"; . ./demo/sqlserver/run-ycsb-sqlserver-source.sh; echo $SRCDB_SCHEMA""",
                 capture_output=True, shell=True,executable="bash",cwd=notebookpath)
srcdb_schema=x.stdout.decode("utf-8").splitlines()[-1]
x=subprocess.run(f"""export CONFIG_FILE="{CONFIG_FILE}"; . ./demo/sqlserver/run-ycsb-sqlserver-source.sh; echo $SRCDB_ARC_USER""",
                 capture_output=True, shell=True,executable="bash",cwd=notebookpath)
src_username.value=x.stdout.decode("utf-8").splitlines()[-1]

# Schema and Data

Existing tables will be appended with additional rows if the `Fill Range` is the same.  
Increase the `Table Count` to create additional tables.  

The following options are available:
- Table count (Table Cnt): The number of tables to create.  
  - Table names are `ycsbdense`, `ycsbdense2`, `ycsbdense3`, ... and `ycssparse`, `ycsbdense2`, and `ycsbdense3` ...
- Number of Fields (# of Fields): The number of fields per table.  
  - The field names are `FIELD0`, `FIELD1`, `FIELD2`, ...
  - Note the use of `K`,`M`,`B` ... suffix at the end.
- Field Length (Field Len): The length of random character data populated per field.  
  - Note the use of `K`,`M`,`B` ... suffix at the end.
- Record Count (Rec Cnt): The number of records per table generated.
  - Note the use of `K`,`M`,`B` ... suffix at the end.
- Fill Range: The relative start and end range of fields that are populated with data.  Be default: 
    - sparse tables are all NULLs by having the fill range be 0% to 0% ranges
    - dense tables have all fields populated by having the fill range be 0% to 100% of ranges 

```sql
[localhost][arcsrc] 1> \describe ycsbsparse
+-------------+-------------+-----------+-------------+----------------+-------------+
| TABLE_SCHEM | COLUMN_NAME | TYPE_NAME | COLUMN_SIZE | DECIMAL_DIGITS | IS_NULLABLE |
+-------------+-------------+-----------+-------------+----------------+-------------+
| dbo         | YCSB_KEY    | int       |          10 |              0 | NO          |
| dbo         | FIELD0      | text      |  2147483647 |         [NULL] | YES         |
| dbo         | FIELD1      | text      |  2147483647 |         [NULL] | YES         |
```

## Configure
Make changes below and click `Run All Below`.  

In [7]:
# show YCSB Data Controls
VBox([HBox([Label('Sparse'), sparse_cntstart,sparse_cnt, sparse_fieldcount, sparse_fieldlength, sparse_recordcount, sparse_fillpct]),
    HBox([Label('Dense'),  dense_cntstart, dense_cnt, dense_fieldcount, dense_fieldlength, dense_recordcount, dense_fillpct])])

VBox(children=(HBox(children=(Label(value='Sparse'), BoundedIntText(value=1, description='Tbl Start:', max=100…

## Start

In [35]:
# run load_sparse_data_cnt and load_dense_data_cnt 
subprocess.run(f"""export SRCDB_ARC_USER={src_username.value}; 
    export CONFIG_FILE="{CONFIG_FILE}";
    . ./demo/sqlserver/run-ycsb-sqlserver-source.sh; 
    list_table_counts;
    y_fieldcount={sparse_fieldcount.value} 
    y_fieldlength={sparse_fieldlength.value}  
    y_recordcount={sparse_recordcount.value} 
    y_fillstart={math.ceil((sparse_fillpct.value[0] * sparse_fieldcount.value) / 100)}      
    y_fillend={int((sparse_fillpct.value[1] * sparse_fieldcount.value) / 100)}      
    load_sparse_data_cnt {sparse_cnt.value} {sparse_cntstart.value};
    y_fieldcount={dense_fieldcount.value} 
    y_fieldlength={dense_fieldlength.value} 
    y_recordcount={dense_recordcount.value} 
    y_fillstart={math.ceil((dense_fillpct.value[0] * dense_fieldcount.value) / 100)}      
    y_fillend={int((dense_fillpct.value[1] * dense_fieldcount.value) / 100)}      
    load_dense_data_cnt {dense_cnt.value} {dense_cntstart.value};
    dump_schema;
    list_table_counts""",
    shell=True,executable="bash",cwd=notebookpath) 
# show tables
pd.read_csv (f"/var/tmp/{src_username.value}/sqlserver/config/list_table_counts.csv",header=None, names= ['table name','min key','max key','field count'])

source /home/rslee/github/dbx/ingestion/env/perf1-dbo.sh
/opt/stage/arcion/replicant-cli-24.01.25.20/replicant-cli/bin/replicant 24.01.25.20 24.01
PATH=/opt/stage/bin/jsqsh-dist-3.0-SNAPSHOT/bin added


table count at /var/tmp/cdcadmin/sqlserver/config/list_table_counts.csv


starting sparse load. /var/tmp/cdcadmin/sqlserver/logs/ycsb/ycssparse.load.log 1 2 3 4
starting dense load. /var/tmp/cdcadmin/sqlserver/logs/ycsb/ycsbdense.load.log 1 2


schema dump at /var/tmp/cdcadmin/sqlserver/config/schema_dump.csv
table count at /var/tmp/cdcadmin/sqlserver/config/list_table_counts.csv


Unnamed: 0,table name,min key,max key,field count


# Workload

Choose the options in the UI and run the cell below it to start the workload (YCSB).  

YCSB update (workload A) controls for Dense and Sparse table groups separated. Each group has a separate control.  However, all of the tables in the group use the same controls.  
1. Each table's TPS (throughput per second)
   1. 0=fast as possible
   2. 1=1 TPS
   3. 10=10 TPS
2. Each table's threads (concurrency) used to achieve the desired TPS.

## Configure

In [9]:
# show YCSB run controls
VBox([
      HBox([Label('Sparse'), sparse_threads,  sparse_multideletesize, sparse_multiupdatesize, sparse_multiinsertsize, ]), 
      HBox([Label('Dense'),  dense_threads, dense_multideletesize, dense_multiupdatesize, dense_multiinsertsize, ]),
      HBox([Label('YCSB'), ram_percent_ycsb, ycsb_data_gen]),
      ])


VBox(children=(HBox(children=(Label(value='Sparse'), BoundedIntText(value=1, description='Threads:', max=8, mi…

## Start

In [10]:
# start/restart YCSB run
# start the actual run
subprocess.run(f"""export SRCDB_ARC_USER={src_username.value};
    export CONFIG_FILE="{CONFIG_FILE}";
    . ./demo/sqlserver/run-ycsb-sqlserver-source.sh; 
    kill_ycsb;
    list_table_counts;       
    y_data_type="{ycsb_data_gen.value}"
    y_threads_dense={dense_threads.value} 
    y_threads_sparse={sparse_threads.value}               
    y_multiinsertsize_dense={dense_multiinsertsize.value} 
    y_multiupdatesize_dense={dense_multiupdatesize.value} 
    y_multideletesize_dense={dense_multideletesize.value} 
    y_multiinsertsize_sparse={sparse_multiinsertsize.value} 
    y_multiupdatesize_sparse={sparse_multiupdatesize.value} 
    y_multideletesize_sparse={sparse_multideletesize.value} 
    y_fieldlength_sparse={sparse_fieldlength.value} 
    y_fieldlength_dense={dense_fieldlength.value} 
    y_MinRAMPercentage={ram_percent_ycsb.value}.0
    y_MaxRAMPercentage={ram_percent_ycsb.value}.0
    start_ycsb;""",
    shell=True,executable="bash",cwd=notebookpath)

source ./env/perf1-dbo.sh
/opt/stage/arcion/replicant-cli-24.01.25.20/replicant-cli/bin/replicant 24.01.25.20 24.01
PATH=/opt/stage/bin/jsqsh-dist-3.0-SNAPSHOT/bin added
running ycsb on /var/tmp/cdcadmin/sqlserver/config/list_table_counts.csv
ycsb can be killed with . ./demo/sqlserver/run-ycsb-sqlserver-source.sh; kill_ycsb)


table count at /var/tmp/cdcadmin/sqlserver/config/list_table_counts.csv


CompletedProcess(args='export SRCDB_ARC_USER=arcsrc;\n    export CONFIG_FILE="./env/perf1-dbo.sh";\n    . ./demo/sqlserver/run-ycsb-sqlserver-source.sh; \n    kill_ycsb;\n    list_table_counts;       \n    y_data_type="char and num"\n    y_threads_dense=1 \n    y_threads_sparse=1               \n    y_multiinsertsize_dense=1 \n    y_multiupdatesize_dense=100 \n    y_multideletesize_dense=1 \n    y_multiinsertsize_sparse=1 \n    y_multiupdatesize_sparse=100 \n    y_multideletesize_sparse=1 \n    y_fieldlength_sparse=10 \n    y_fieldlength_dense=100 \n    y_MinRAMPercentage=1.0\n    y_MaxRAMPercentage=1.0\n    start_ycsb;', returncode=0)

# Arcion

Choose the options in the UI and run the cell below it to start the replication.  

The following control are avail in the demo.  
- Arcion - replication type and CDC methods  
- Threads - control the parallelism.
- Target - null, unity catalog or delta lake

NOTE: Full mode does not work at this time.

For SQL Server, change tracking, cdc are available for demo.  

Performance is mainly controlled by the thread count by the extract and apply process.
Additional controls are customizable via modifying the YAML files directly below.
- [CDC YAML files](./demo/sqlserver/yaml/cdc/)
- [Change Tracking YAML files](./demo/sqlserver/yaml/change/)

## Configure
Make changes below and click `Run All Below`.  

In [11]:
# show Arcion and DBX controls
VBox([
      HBox([Label('RAM'), ram_percent]),
      HBox([Label('Modes'), repl_mode, cdc_mode, extraction_method]),
      HBox([Label('Target'), dbx_destinations, dbx_staging ]),
      HBox([Label('Threads'), snapshot_threads, realtime_threads, delta_threads]),
      ])

VBox(children=(HBox(children=(Label(value='RAM'), BoundedIntText(value=10, description='RAM %:', max=80, min=1…

## Start

In [12]:
# start/restart Arcion

if ( f"{dbx_access_token.value}" == "" ) and ( f"{dbx_destinations.value}" != "null" ):
    print("personal access token not entered.")
else:
    arcion_run_id=nine_char_id()
    # start a new run
    print (f"""{cdc_mode.value} {repl_mode.value}""")
    arcion_run = subprocess.run(f"""export CONFIG_FILE="{CONFIG_FILE}"; 
    export ARCION_DOWNLOAD_URL='{arcion_download_url.value}';        
    export SRCDB_ARC_USER={src_username.value};
    . ./demo/sqlserver/run-ycsb-sqlserver-source.sh; 
    kill_arcion;
    disable_cdc;
    disable_change_tracking;
    echo prog_dir=$PROG_DIR arcion_bin=$ARCION_BIN;
    cd $PROG_DIR;
    NINE_CHAR_ID='{arcion_run_id}'
    a_repltype='{repl_mode.value}'
    EXTRACTION_METHOD='{extraction_method.value}'
    REPLICANT_MEMORY_PERCENTAGE='{ram_percent.value}.0'
    SRCDB_SNAPSHOT_THREADS='{snapshot_threads.value}' 
    SRCDB_REALTIME_THREADS='{realtime_threads.value}' 
    SRCDB_DELTA='{delta_threads.value}'
    DSTDB_TYPE='{dbx_destinations.value}'
    DSTDB_STAGE='{dbx_staging.value}'
    DBX_SPARK_URL='{dbx_spark_url.value}'
    DBX_DATABRICKS_URL='{dbx_databricks_url.value}'
    DBX_ACCESS_TOKEN='{dbx_access_token.value}'
    DBX_HOSTNAME='{dbx_hostname.value}'
    DBX_DBFS_ROOT='/{dbx_username.value}'
    DBX_USERNAME='{dbx_username.value}'
    start_{cdc_mode.value}_arcion;""",
    shell=True,executable="bash",cwd=notebookpath)

change real-time
source ./env/perf1-dbo.sh
/opt/stage/arcion/replicant-cli-24.01.25.20/replicant-cli/bin/replicant 24.01.25.20 24.01
PATH=/opt/stage/bin/jsqsh-dist-3.0-SNAPSHOT/bin added
Msg 40508, Level 16, State 1, Server arcion-perf-uswest2, Line 4
USE statement is not supported to switch between databases. Use a new connection to connect to a different database.
disable/drop trigger replicate_io_audit_ddl_trigger
DISABLE TRIGGER "replicate_io_audit_ddl_trigger" ON DATABASE
drop trigger "replicate_io_audit_ddl_trigger" on DATABASE
following tables still have change trakcing enabled and change tracking can't be disabled
ALTER TABLE cdc_benchmark1.perf_table_1 DISABLE CHANGE_TRACKING;
ALTER TABLE cdc_benchmark3.perf_table_1 DISABLE CHANGE_TRACKING;
change tracking on database already disabled
prog_dir=/home/rslee/github/dbx/ingestion/demo/sqlserver arcion_bin=/opt/stage/arcion/replicant-cli-24.01.25.20/replicant-cli/bin/replicant
enable change tracking on database perf1
ALTER DATABASE

+ cd /var/tmp/cdcadmin/sqlserver/logs/3fdf8d788
+ set +x
+ JAVA_HOME=
+ REPLICANT_MEMORY_PERCENTAGE=10.0
+ JAVA_OPTS='"-Djava.security.egd=file:/dev/urandom" "-Doracle.jdbc.javaNetNio=false" "-XX:-UseCompressedOops"'
+ /opt/stage/arcion/replicant-cli-24.01.25.20/replicant-cli/bin/replicant real-time /var/tmp/cdcadmin/sqlserver/logs/3fdf8d788/src.yaml /var/tmp/cdcadmin/sqlserver/logs/3fdf8d788/dst.yaml --applier /var/tmp/cdcadmin/sqlserver/logs/3fdf8d788/applier.yaml --general /var/tmp/cdcadmin/sqlserver/logs/3fdf8d788/general.yaml --extractor /var/tmp/cdcadmin/sqlserver/logs/3fdf8d788/extractor.yaml --filter /var/tmp/cdcadmin/sqlserver/logs/3fdf8d788/filter.yaml --statistics /var/tmp/cdcadmin/sqlserver/logs/3fdf8d788/statistics.yaml --metadata /var/tmp/cdcadmin/sqlserver/logs/3fdf8d788/metadata.yaml --overwrite --id 3fdf8d788 --append-existing


# MLFLow

Save the artifacts in MLFlow.

Artifacts are collected for 5 min (600 sec).

## Start

In [13]:
# use process to run MLflow without blocking the notebook.  thread does not work with mlflow

import mlflow
import time
import os
import numpy as np
from multiprocessing import Process

def log_artifacts():
    pass

from file_read_backwards import FileReadBackwards
import datetime

# convert ycsb log mlflow metric
# time                    elapsed  cumulative      time period                                   per operations metric
#                         sec      operations      ops/sec
# 2024-03-07 10:05:38:240 410 sec: 409 operations; 1 current ops/sec; est completion in 116 days [UPDATE: Count=10, Max=15383, Min=6792, Avg=9264.6, 90=15359, 99=15383, 99.9=15383, 99.99=15383]
# ycsb_tablename_[update|update-failed]_count=x
# ycsb_tablename_[update|update-failed]_avg_microsec=x 

ycsb_date_time_pattern = r"^(?P<dt>[0-9\-]+ [0-9\:]+)"  # at the beginning
ycsb_op_val_pattern = r'\[([^]]*)\]'                    # [Update: ] [Insert: ] ...

from file_read_backwards import FileReadBackwards
import glob

def tablulate_arc_stat_line(count, stat_type, cat_sch_tbl, arc_stats, replicant_lag, total_lag, replicant_lag_weights, total_lag_weights):
    if count > 0:
        #  per table DML stat
        # catalog_schema_tablename
        try:
            arc_stats[f"arcion/{stat_type}_{cat_sch_tbl}"] += count
        except:
            arc_stats[f"arcion/{stat_type}_{cat_sch_tbl}"] = count
        # overall DML 
        try:
            arc_stats[f"arcion/{stat_type}"] += count
        except:
            arc_stats[f"arcion/{stat_type}"] = count

        # skip defaut_lag value of 9223372036854775807   
        if replicant_lag != arc_default_lag:
            try:
                replicant_lag_weights[f"arcion/{stat_type}_{cat_sch_tbl}"] += count * replicant_lag
            except:
                replicant_lag_weights[f"arcion/{stat_type}_{cat_sch_tbl}"] = count * replicant_lag
            # overall DML 
            try:
                replicant_lag_weights[f"arcion/{stat_type}"] += count * replicant_lag
            except:
                replicant_lag_weights[f"arcion/{stat_type}"] = count * replicant_lag

        # skip defaut_lag value of 9223372036854775807   
        if total_lag != arc_default_lag:
            try:
                total_lag_weights[f"arcion/{stat_type}_{cat_sch_tbl}"] += count * total_lag
            except:
                total_lag_weights[f"arcion/{stat_type}_{cat_sch_tbl}"] = count * total_lag
            # overall DML 
            try:
                total_lag_weights[f"arcion/{stat_type}"] += count * total_lag
            except:
                total_lag_weights[f"arcion/{stat_type}"] = count * total_lag

        # per table weighted avg = weight / count
        # could be case where replicant_lag_weight is not known 9223372036854775807
        if f"arcion/{stat_type}_{cat_sch_tbl}" in replicant_lag_weights:
            arc_stats[f"arcion/{stat_type}_lag_replicant_{cat_sch_tbl}"] = \
                replicant_lag_weights[f"arcion/{stat_type}_{cat_sch_tbl}"] / arc_stats[f"arcion/{stat_type}_{cat_sch_tbl}"]
            arc_stats[f"arcion/{stat_type}_lag_total_{cat_sch_tbl}"] = \
                total_lag_weights[f"arcion/{stat_type}_{cat_sch_tbl}"] / arc_stats[f"arcion/{stat_type}_{cat_sch_tbl}"]

        # overall DML 
        # could be case where replicant_lag_weight is not known 9223372036854775807            
        if f"arcion/{stat_type}" in total_lag_weights:
            arc_stats[f"arcion/{stat_type}_lag_replicant"] = \
                replicant_lag_weights[f"arcion/{stat_type}"] / arc_stats[f"arcion/{stat_type}"]
            arc_stats[f"arcion/{stat_type}_lag_total"] = \
                total_lag_weights[f"arcion/{stat_type}"] / arc_stats[f"arcion/{stat_type}"]

def set_previous_log(log_stat:dict, table_name:str=""):
    marker_key=f"marker_{table_name}"
    first_key=f"first_{table_name}"
    try:
        log_stat[marker_key] = log_stat[first_key]
    except:
        # marker could not not defined if the file was empty
        log_stat[marker_key] = None
    # clear the first line read
    log_stat[first_key] = None
    
def reached_previous_log(log_stat:dict, line:str, table_name:str="", header_line=None):
    marker_key=f"marker_{table_name}"
    first_key=f"first_{table_name}"    

    # reached header
    if header_line is None:
        pass
    elif line==header_line:
        return(True)

    # will be the highwater maker for the next run
    try:
        if log_stat[first_key] is None:
            log_stat[first_key] = line
    except:
        log_stat[first_key] = line
        
    # done when reached previous processed line
    try:
        if line==log_stat[marker_key]:
            return(True)
    except:
        # not previous marker
        pass

    return(False)

def parse_arcion_stats(run_id, user_id, db_type,arcion_stats_csv_positions):

    file_list = glob.glob(f"/var/tmp/{user_id}/{db_type}/logs/{run_id}/stats/{run_id}/{run_id}/replication_statistics_history_*.CSV")
    
    # file is not ready yet
    if len(file_list) == 0:
        return({})
    
    firstlineread = None
    arc_stats = {}
    # temp dict used for weighted average
    replicant_lag_weights = {}
    total_lag_weights = {}
    start_time=None
    end_time=None

    with FileReadBackwards(file_list[0], encoding="utf-8") as BigFile:
        for line in BigFile:
            if reached_previous_log(log_stat=arcion_stats_csv_positions, line=line, header_line=arcion_stats_csv_header_lines):
                break

            csvline=line.split(",")
            if (len(csvline)) < 13:
                continue

            if end_time is None:
                end_time = csvline[arc_stat_end_time_idx]
            start_time = csvline[arc_stat_start_time_idx]

            # arcion_key_index={'insert_count':7,'update_count':8,'upsert_count':9,'delete_count':10,'elapsed_time_sec':11,'replicant_lag':12,'total_lag':13}

            cat_sch_tbl=f"{csvline[arc_stat_catalog_name_idx]}_{csvline[arc_stat_schema_name_idx]}_{csvline[arc_stat_table_name_idx]}"
            try:
                insert_count=int(csvline[arc_stat_insert_count_idx])
            except:
                # could be header or some unknown format
                continue

            update_count=int(csvline[arc_stat_update_count_idx])
            upsert_count=int(csvline[arc_stat_upsert_count_idx])
            delete_count=int(csvline[arc_stat_delete_count_idx])
            replicant_lag=int(csvline[arc_stat_replicant_lag_idx])
            total_lag=int(csvline[arc_stat_total_lag_idx])

            tablulate_arc_stat_line(count=insert_count, stat_type="insert", cat_sch_tbl=cat_sch_tbl, arc_stats=arc_stats, replicant_lag=replicant_lag, total_lag=total_lag, replicant_lag_weights=replicant_lag_weights, total_lag_weights=total_lag_weights)
            tablulate_arc_stat_line(count=update_count, stat_type="update", cat_sch_tbl=cat_sch_tbl, arc_stats=arc_stats, replicant_lag=replicant_lag, total_lag=total_lag, replicant_lag_weights=replicant_lag_weights, total_lag_weights=total_lag_weights)
            tablulate_arc_stat_line(count=upsert_count, stat_type="upsert", cat_sch_tbl=cat_sch_tbl, arc_stats=arc_stats, replicant_lag=replicant_lag, total_lag=total_lag, replicant_lag_weights=replicant_lag_weights, total_lag_weights=total_lag_weights)
            tablulate_arc_stat_line(count=delete_count, stat_type="delete", cat_sch_tbl=cat_sch_tbl, arc_stats=arc_stats, replicant_lag=replicant_lag, total_lag=total_lag, replicant_lag_weights=replicant_lag_weights, total_lag_weights=total_lag_weights)


    # set the end marker
    set_previous_log(log_stat=arcion_stats_csv_positions)

    # calculate count / s metric 
    try:
        time_diff = (datetime.datetime.strptime(end_time, '%Y-%m-%dT%H:%M:%S.%fZ') -
            datetime.datetime.strptime(start_time, '%Y-%m-%dT%H:%M:%S.%fZ')).total_seconds()
    except:
        time_diff = 0

    for key in ["insert","update","upsert","delete"]:
        try:
            if time_diff > 1:
                arc_stats[f"arcion/{key}_s"] = arc_stats[f"arcion/{key}"] / time_diff
            else:
                arc_stats[f"arcion/{key}_s"] = arc_stats[f"arcion/{key}"]
        except:
            pass
        
    # return the stat
    return(arc_stats)


def tablulate_ycsb_stat_line(line,metrics,table_name):
    # parse [update: ...]
    m = re.findall(ycsb_op_val_pattern, line.lower())
    if m is None:
        return
    
    # [UPDATE: Count=891, Max=63423, Min=4, Avg=194.94, 90=210, 99=350, 99.9=715, 99.99=63423]
    for ops in m:
        op_vals=ops.split(":")                  # update: ....
        if len(op_vals) != 2:
            break

        vals_array=op_vals[1].split(",")        # count=?, max=?, ...
        if len(vals_array) != 8:
            break
        
        # count
        try:    
            op_count=float(vals_array[0].split("=")[1])    # [0] count=?
        except:
            op_count=0.0
        # per operation
        try:
            metrics[f"ycsb/{op_vals[0]}_{table_name}"] += op_count
        except:
            metrics[f"ycsb/{op_vals[0]}_{table_name}"] = op_count
        # overall
        try:
            metrics[f"ycsb/{op_vals[0]}"] += op_count
        except:
            metrics[f"ycsb/{op_vals[0]}"] = op_count

        # max
        try:
            op_max=float(vals_array[1].split("=")[1])      # [1] max=? if count=0, then this will be not defined
        except:
            op_max=0.0
        #pertable
        try:
            if metrics[f"ycsb/{op_vals[0]}_max_microsec_{table_name}"] < op_max:
                metrics[f"ycsb/{op_vals[0]}_max_microsec_{table_name}"] = op_max
        except:
            metrics[f"ycsb/{op_vals[0]}_max_microsec_{table_name}"] = op_max
        #overall
        try:
            if metrics[f"ycsb/{op_vals[0]}_max_microsec"] < op_max:
                metrics[f"ycsb/{op_vals[0]}_max_microsec"] = op_max
        except:
            metrics[f"ycsb/{op_vals[0]}_max_microsec"] = op_max


def parse_ycsb_log_to_metric(ycsb_logfile_positions,
                    start_time,
                    end_time,         
                    table_name="ycsbsparse",
                    file="/var/tmp/arcsrc/sqlserver/logs/ycsb/ycsb.ycsbsparse.log",
                    metrics={},
                    ):
    
    with FileReadBackwards(file, encoding="utf-8") as ycsb_log_file:
        count=0
        for line in ycsb_log_file:      
            if reached_previous_log(log_stat=ycsb_logfile_positions, line=line, table_name=table_name):
                break
            # endtime time
            if end_time is None:
                try:
                    # 2024-03-27 14:18:57:038
                    end_time = datetime.datetime.strptime(line[0:23], '%Y-%m-%d %H:%M:%S:%f')
                    print(f"end time from {table_name}:{end_time}")
                except:
                    pass
            
            # start time
            try:
                start_time = datetime.datetime.strptime(line[0:23], '%Y-%m-%d %H:%M:%S:%f')
                print(f"start time from {table_name}:{start_time}")
            except:
                pass
            tablulate_ycsb_stat_line(line=line,metrics=metrics,table_name=table_name)

    # set the end marker
    set_previous_log(log_stat=ycsb_logfile_positions, table_name=table_name)  
    return(start_time, end_time, metrics)            

def calc_count_s_ycsb(metrics, start_time, end_time):
    # calculate count / s metric       
    try:
        time_diff = (end_time - start_time).total_seconds()
    except:
        time_diff = 0

    print(start_time)
    print(end_time)
    print(time_diff)
    print(metrics.keys())
    for key in ["insert","update","delete"]:
        try:
            if time_diff > 1:
                metrics[f"ycsb/{key}_s"] = metrics[f"ycsb/{key}"] / time_diff
            else:
                metrics[f"ycsb/{key}_s"] = metrics[f"ycsb/{key}"]
        except:
            pass

def get_arcion_metrics():
    arc_stats=parse_arcion_stats(
        run_id=arcion_run_id,
        user_id=src_username.value,
        db_type=src_db_type.value,
        arcion_stats_csv_positions=arcion_stats_csv_positions)
    return(arc_stats)

def get_ycsb_metrics(metrics={}):
    ycsb_current_metrics={}
    start_time=None
    end_time=None
    ycsb_tables = pd.read_csv (f"/var/tmp/{src_username.value}/sqlserver/config/list_table_counts.csv",header=None, names= ['table name','min key','max key','field count'])
    for table_name in ycsb_tables['table name']:
        table_name = table_name.lower()

        if (srcdb_schema is not None) and (srcdb_schema == ""):
            log_file_name=f"/var/tmp/{src_username.value}/sqlserver/logs/ycsb/ycsb.{table_name}.log"
        else:
            log_file_name=f"/var/tmp/{src_username.value}/sqlserver/logs/ycsb/ycsb.{srcdb_schema}.{table_name}.log"

        start_time, end_time, _ = parse_ycsb_log_to_metric(ycsb_logfile_positions,start_time, end_time,
            table_name=table_name, 
            file=log_file_name,
            metrics=ycsb_current_metrics,
            )
    calc_count_s_ycsb(ycsb_current_metrics, start_time, end_time)
    return(ycsb_current_metrics)

def get_prom_metrics(prom_metric_url=None,metric_prefix="",metric_step=None):
    # there is a limit on the number of metrics that you can log in a single log_batch call. This limit is typically 1000. 
    # timestamp=If unspecified, the number of milliseconds since the Unix epoch is used.
    # step=If unspecified, the default value of zero is used
    contents = requests.get(prom_metric_url)
    all_metrics = {}
    metrics_count = 0
    for line in contents.text.splitlines():
        if line.startswith("#"):
            continue
        key,val=line.rsplit(' ', 1)       # split from the end in case the key has spaces
        key=re.sub('[" {}=,]', "_", key)  # change space,{},=,and comma into _
        key=key.replace("_", "/", 1)      # change the first _ to / to group based on the name space
        all_metrics[key]=float(val)
        metrics_count += 1
    return(all_metrics)


def start_mlflow(max_intervals=0,experiment_id=None, log_interval_sec=10, all_params={}, step=0):
    # stop previous run
    # max_intervals=0 makes the mlflow run forever
    mlflow_run = mlflow.active_run()
    if not(mlflow_run is None):
        # upload final artifacts
        log_artifacts()
        print(f"""stopping previous MLflow {mlflow_run.info.run_id}""")
        mlflow.end_run()

    # start a new run
    if experiment_id == '':
        experiment_id=None
    mlflow.start_run(experiment_id=experiment_id, log_system_metrics=True)

    # params
    mlflow.log_params(params=all_params)

    # schema
    dataset_source=f"/var/tmp/{src_username.value}/sqlserver/config/list_table_counts.csv"
    mlflow.log_artifact(dataset_source)
    
    # data
    dataset_shape = pd.read_csv(dataset_source, header=None, names= ['table name','min key','max key','field count'])
    dataset = mlflow.data.from_pandas(dataset_shape, source=dataset_source)
    mlflow.log_input(dataset, context="training")    

    # wait to end
    # TODO: Make this smarter by checking whether the process is still running
    wait_count=0
    while (max_intervals == 0) or (wait_count < max_intervals):
        mlflow.log_metrics(metrics=get_prom_metrics(prom_metric_url="http://localhost:9399/metrics"),step=step)
        mlflow.log_metrics(metrics=get_prom_metrics(prom_metric_url="http://localhost:9100/metrics"),step=step)
        mlflow.log_metrics(metrics=get_ycsb_metrics(),step=step)
        mlflow.log_metrics(metrics=get_arcion_metrics(),step=step)
        time.sleep(log_interval_sec)
        wait_count += 1
        step += 1

    # upload the rest of the artifacts generated /var/tmp/{src_username.value}/sqlserver/logs
    log_artifacts()
    # experiment done
    mlflow.end_run()

def register_mlflow(exp_params):
    mlflow_proc = Process(target=start_mlflow, kwargs={"experiment_id":experiment_id, "all_params":current_exp_params})
    mlflow_proc.start()   
    try:
        mlflow_proc_state['proc'].terminate()
        print("previous MLFlow process terminated")
    except:
        pass
    mlflow_proc_state['proc']       = mlflow_proc
    mlflow_proc_state['exp_params'] = exp_params


current_exp_params=exp_params()
if not ('exp_params' in mlflow_proc_state):
    print("first run of mlflow")
    register_mlflow(current_exp_params)
elif current_exp_params != mlflow_proc_state['exp_params']:
    print("param changed. starting new mlflow")
    register_mlflow(current_exp_params)
elif not(mlflow_proc_state['proc'].is_alive()):
    print("mlflow stopped. starting new mlflow with new step")
    register_mlflow(current_exp_params)
else:
    print("no parameters changed. New MLFLow experiment not needed.")

first run of mlflow


end time from ycsbdense:2024-05-01 13:03:55.047000
start time from ycsbdense:2024-05-01 13:03:55.047000
start time from ycsbdense:2024-05-01 13:03:45.047000
start time from ycsbdense:2024-05-01 13:03:35.047000
start time from ycsbdense:2024-05-01 13:03:25.047000
start time from ycsbdense:2024-05-01 13:03:15.047000
start time from ycsbdense:2024-05-01 13:03:05.047000
start time from ycsbdense:2024-05-01 13:02:55.047000
start time from ycsbdense:2024-05-01 13:02:45.047000
start time from ycsbdense:2024-05-01 13:02:35.047000
start time from ycsbdense:2024-05-01 13:02:25.047000
start time from ycsbdense:2024-05-01 13:02:15.047000
start time from ycsbdense:2024-05-01 13:02:05.047000
start time from ycsbdense:2024-05-01 13:01:55.047000
start time from ycsbdense:2024-05-01 13:01:45.047000
start time from ycsbdense:2024-05-01 13:01:35.047000
start time from ycsbdense:2024-05-01 13:01:25.047000
start time from ycsbdense:2024-05-01 13:01:15.047000
start time from ycsbdense:2024-05-01 13:01:05.04

Process Process-1:
Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_94483/863413452.py", line 378, in start_mlflow
    mlflow.log_metrics(metrics=get_ycsb_metrics(),step=step)
  File "/tmp/ipykernel_94483/863413452.py", line 319, in get_ycsb_metrics
    start_time, end_time, _ = parse_ycsb_log_to_metric(ycsb_logfile_positions,start_time, end_time,
  File "/tmp/ipykernel_94483/863413452.py", line 252, in parse_ycsb_log_to_metric
    with FileReadBackwards(file, encoding="utf-8") as ycsb_log_file:
  File "/home/rslee/.local/lib/python3.10/site-packages/file_read_backwards/file_read_backwards.py", line 41, in __init__
    self.iterator = FileReadBackwardsIterator(io.open(self.path, mode="rb"), self.encoding, self.chunk_size)
FileNotFoundError: [Errno 2] No such file o

# Manually Kill Processes
Uncomment below to kill desired processes

In [None]:
# subprocess.run(f""". ./demo/sqlserver/run-ycsb-sqlserver-source.sh; kill_arcion;""",shell=True,executable="bash",cwd=notebookpath)
# subprocess.run(f""". ./demo/sqlserver/run-ycsb-sqlserver-source.sh; kill_ycsb;""",shell=True,executable="bash",cwd=notebookpath)
# subprocess.run(f"""export SRCDB_ARC_USER={src_username.value}; . ./demo/sqlserver/run-ycsb-sqlserver-source.sh; drop_all_ycsb_tables;""",shell=True,executable="bash",cwd=notebookpath)

  return _dataset_source_registry.resolve(
  string_columns = trimmed_df.columns[(df.applymap(type) == str).all(0)]
