## Importing Libraries

In [1]:
import pandas as pd
from sqlalchemy import create_engine,text
import psycopg2
import os
import io
import time 
import numpy as np
from psycopg2 import sql
import timeit
from jinja2 import Template

In [2]:
import sys 
sys.path.append(".")

In [3]:
import config 

In [4]:
def create_postgres_engine(user, password, host, port, db_name):
    """Create a SQLAlchemy engine for PostgreSQL."""
    connection_string = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db_name}"
    engine = create_engine(connection_string)
    return engine

In [5]:
engine = create_postgres_engine(config.DB_USER,config.DB_PASSWORD,config.DB_HOST,config.DB_PORT,config.DB_NAME)

In [6]:
def run_sql_file(filename):
    # Open and read the file
    with open(filename, 'r') as file:
        sql_script = file.read()
    
    # Begin a connection
    with engine.connect() as connection:
        # Start a transaction
        with connection.begin():
            # Split script into individual statements
            statements = sql_script.split(';')
            
            # Execute each statement
            for statement in statements:
                # Remove whitespace
                clean_statement = statement.strip()
                
                # Skip empty statements
                if clean_statement:
                    try:
                        # Execute each statement
                        connection.execute(text(clean_statement))
                    except Exception as e:
                        print(f"Error executing statement: {clean_statement}")
                        print(f"Error details: {e}")
                        raise
        
        print(f"SQL file {filename} executed successfully!")


## Load Data Stats

In [7]:
def load_table_into_db(file_path, table_name, conn_params):

    metrics = {
        'file_name': file_path.split("/")[-1],
        'insertion_time_ms': 0,
        'wall_time_ms': 0
    }

    try:
        # Establish connection
        conn = psycopg2.connect(**conn_params)
        conn.set_session(autocommit=False)
        
        try:
            wall_start_time = time.time()

            with conn.cursor() as cur:
                # Start timing
               
                # Open the CSV file and copy
                with open(file_path, 'r') as f:
                    insertion_start = timeit.default_timer()
                    cur.copy_expert(
                        sql.SQL('COPY {} FROM STDIN WITH (FORMAT CSV, HEADER TRUE)').format(
                            sql.Identifier(table_name)
                        ), 
                        f
                    )
                    insertion_end = timeit.default_timer()
                    metrics['insertion_time_ms'] = (insertion_end - insertion_start) * 1000
                   
                # Commit the transaction
                conn.commit()
                
                # Calculate wall time
                wall_end_time = time.time()
                metrics['wall_time_ms'] = (wall_end_time - wall_start_time) * 1000

                # Print metrics
                print(f"Import Metrics for {file_path}:")
                print(f"Insertion Time: {metrics['insertion_time_ms']:.2f} ms")
                print(f"Wall Time: {metrics['wall_time_ms']:.2f} ms")

        
        
        except Exception as inner_e:
            # Rollback if any error occurs
            conn.rollback()
            print(f"Error importing {file_path}: {inner_e}")
        
        finally:
            # Ensure connection is closed
            conn.close()
            return metrics 
    
    except psycopg2.Error as conn_e:
        print(f"Database connection error: {conn_e}")
        return metrics 


In [8]:
conn_params = {
    'dbname': config.DB_NAME,
    'user': config.DB_USER,
    'password': config.DB_PASSWORD,
    'host': config.DB_HOST,
    'port': config.DB_PORT
}


In [9]:
table_names = {
    'ACC':'accelerometer_data',
    'BVP':'blood_volume_pulse',
    'Dexcom':'interstitial_glucose',
    'EDA':'electrodermal_activity',
    'HR':'heart_rate_data',
    'IBI':'ibi_data',
    'TEMP':'temperature_data'
}

In [10]:
## mention scale factor
scale_factor = config.SCALE_FACTOR

In [11]:
def integer_to_places_string(number):
    
    # Ensure the input is a valid integer within range
    if not isinstance(number, int) or not (0 <= number <= 999):
        raise ValueError("Input must be an integer between 0 and 999.")

    # Extract hundreds, tens, and ones
    hundreds = number // 100
    tens = (number // 10) % 10
    ones = number % 10

    # Format into the desired string
    result = f"{hundreds}{tens}{ones}"
    return result

In [12]:
folder_to_use = [integer_to_places_string(i) for i in range(1,scale_factor+1)]

In [13]:
accepted_files = ['ACC','BVP','Dexcom','EDA','HR','IBI','TEMP']  ## if want to ignore a table remove it from the list

In [14]:
## Create Schema
run_sql_file(os.path.join(config.SQL_SCRIPTS_PATH,'create_schema.sql'))

## Load Demographics Data not to be included in data insertion timings - one time load
demographic_path = os.path.join(config.TRANSFORM_DATA_PATH,'Demographics.csv') 
load_table_into_db(demographic_path,'demographics',conn_params)

SQL file sql_scripts/create_schema.sql executed successfully!
Import Metrics for ../new_data/Demographics.csv:
Insertion Time: 4.16 ms
Wall Time: 8.75 ms


{'file_name': 'Demographics.csv',
 'insertion_time_ms': 4.155767001066124,
 'wall_time_ms': 8.748769760131836}

In [15]:
list_of_metrics = []
for i in range(0,scale_factor):
    folder_path = os.path.join(config.TRANSFORM_DATA_PATH,folder_to_use[i])
    
    for file in accepted_files:
         
        file_path = os.path.join(folder_path,f'{file}_{folder_to_use[i]}.csv')
        metrics = load_table_into_db(file_path,table_names[file],conn_params)

        list_of_metrics.append(metrics)


report_df = pd.DataFrame(list_of_metrics)
total_df =pd.DataFrame(report_df.select_dtypes(include=['float','int']).sum()).T 
total_df.insert(0,'file_name',['Total'])
report_df = pd.concat([report_df,total_df],axis=0).reset_index(drop=True)
report_df.to_csv(os.path.join(config.RESULTS_PATH,f"insertion_stats_scale_{scale_factor}.csv"),index=False)

Import Metrics for ../new_data/001/ACC_001.csv:
Insertion Time: 86601.98 ms
Wall Time: 86627.15 ms
Import Metrics for ../new_data/001/BVP_001.csv:
Insertion Time: 165619.92 ms
Wall Time: 165627.30 ms
Import Metrics for ../new_data/001/Dexcom_001.csv:
Insertion Time: 59.76 ms
Wall Time: 62.28 ms
Import Metrics for ../new_data/001/EDA_001.csv:
Insertion Time: 10118.97 ms
Wall Time: 10126.05 ms
Import Metrics for ../new_data/001/HR_001.csv:
Insertion Time: 2344.69 ms
Wall Time: 2355.97 ms
Import Metrics for ../new_data/001/IBI_001.csv:
Insertion Time: 1204.44 ms
Wall Time: 1209.76 ms
Import Metrics for ../new_data/001/TEMP_001.csv:
Insertion Time: 10859.72 ms
Wall Time: 10865.52 ms
Import Metrics for ../new_data/002/ACC_002.csv:
Insertion Time: 86441.91 ms
Wall Time: 86459.82 ms
Import Metrics for ../new_data/002/BVP_002.csv:
Insertion Time: 164390.67 ms
Wall Time: 164396.16 ms
Import Metrics for ../new_data/002/Dexcom_002.csv:
Insertion Time: 79.66 ms
Wall Time: 83.27 ms
Import Metrics f

In [17]:
## compress the data 
report_df

Unnamed: 0,file_name,insertion_time_ms,wall_time_ms
0,ACC_001.csv,86601.98,86627.15
1,BVP_001.csv,165619.9,165627.3
2,Dexcom_001.csv,59.75688,62.28352
3,EDA_001.csv,10118.97,10126.05
4,HR_001.csv,2344.692,2355.971
5,IBI_001.csv,1204.438,1209.763
6,TEMP_001.csv,10859.72,10865.52
7,ACC_002.csv,86441.91,86459.82
8,BVP_002.csv,164390.7,164396.2
9,Dexcom_002.csv,79.65911,83.2715


In [18]:
def get_rows_inserted(conn_params, table_name):
    conn = psycopg2.connect(**conn_params)
    try:
        with conn.cursor() as cur:
            # Get row count before compression
            cur.execute(sql.SQL('SELECT COUNT(*) FROM {}').format(sql.Identifier(table_name)))
            row_count = cur.fetchone()[0]
            
            # Fetch all chunks
            cur.execute("""
                SELECT chunk 
                FROM show_chunks(%s) AS chunk
            """, (table_name,))
            chunks = cur.fetchall()
            
            # Compress each chunk
            compressed_count = 0
            for (chunk,) in chunks:
                try:
                    cur.execute("SELECT compress_chunk(%s)", (chunk,))
                    compressed_count += 1
                except Exception as e:
                    print(f"Could not compress chunk {chunk}: {e}")
            
            conn.commit()
            
            return row_count
    except Exception as e:
        print(f"Error in compression process: {e}")
        conn.rollback()
        raise
    finally:
        conn.close()

In [19]:
row_info = {}
for name in table_names.values():

    row_info[name] = get_rows_inserted(conn_params,name)
row_df = pd.DataFrame(row_info.items(),columns=['table_name','number_of_rows_inserted'])
row_df.to_csv(os.path.join(config.RESULTS_PATH,f"insertion_stats_num_rows_scale_{scale_factor}.csv"),index=False)

In [20]:
row_df

Unnamed: 0,table_name,number_of_rows_inserted
0,accelerometer_data,91510446
1,blood_volume_pulse,183020849
2,interstitial_glucose,11702
3,electrodermal_activity,11438736
4,heart_rate_data,2859141
5,ibi_data,1382424
6,temperature_data,11438656


In [21]:
def get_hypertable_sizes(conn_params):
    query_1 = """
    SELECT
        hypertable_name AS table_name,
        ROUND(hypertable_size(hypertable_schema || '.' || hypertable_name)/(1024.0*1024), 4) AS total_size_mb
        FROM
        timescaledb_information.hypertables
        ORDER BY
        total_size_mb DESC;
    """
    
    conn = psycopg2.connect(**conn_params)
    df_1 = pd.read_sql_query(query_1, conn)
    
    conn.close()
    
    return df_1

In [22]:
ind_size_df = get_hypertable_sizes(conn_params)
total_df = pd.DataFrame(ind_size_df.select_dtypes(include=['float']).sum()).T
total_df.insert(0,'table_name',['Total'])
ind_size_df = pd.concat([ind_size_df,total_df],axis=0)
ind_size_df.to_csv(os.path.join(config.RESULTS_PATH,f"compression_stats_size_scale_{scale_factor}.csv"),index=False)

  df_1 = pd.read_sql_query(query_1, conn)


In [23]:
def render_query(sql_file_path, params):
    # Read the SQL template
    with open(sql_file_path, 'r') as file:
        template_content = file.read()
    
    # Render the template with parameters
    template = Template(template_content)
    query = template.render(params)
    
    return query

def execute_sql_file(conn_params, sql_file_path, params=None):
   
    try:
        query = render_query(sql_file_path,params)        
        # Establish database connection
        conn = psycopg2.connect(**conn_params)

        with conn.cursor() as cur:
                # Start timing
                execution_start = timeit.default_timer()
                cur.execute(sql.SQL(query))
                execution_end = timeit.default_timer()
        # Close the connection

        execution_time_taken = (execution_end-execution_start)*1000

        print("Time of Execution:",execution_time_taken)
        conn.close()
        
        return execution_time_taken
    
    except (Exception, psycopg2.Error) as error:
        print(f"Error executing SQL file: {error}")
        return None
    
    finally:
         conn.close()

In [24]:
number_of_queries = 9 
number_of_times_to_run = config.NUMBER_TIMES_TO_RUN_QUERY

In [25]:
list_of_participants = [i for i in range(1,scale_factor+1)]

In [26]:
execution_summary = {}

for _ in range(number_of_times_to_run):

    for i in range(number_of_queries): 

        execution_time = execute_sql_file(conn_params,os.path.join(config.SQL_SCRIPTS_PATH,f"query_{i}.sql"),{'list_of_participants':tuple(list_of_participants)})

        if i not in execution_summary.keys():
            execution_summary[i] = [execution_time]
        else:
            execution_summary[i].append(execution_time)

query_df = pd.DataFrame({
    'query_number': execution_summary.keys(),
    'execution_times': execution_summary.values()
})
runs_df = pd.DataFrame(execution_summary).T
total_run_time = runs_df.sum(axis=0).tolist()
query_df = pd.concat([query_df,pd.DataFrame({'query_number':['total'],'execution_times':[total_run_time]})]).reset_index(drop=True)
query_df['min_time'] = query_df['execution_times'].apply(min)
query_df['median_time'] = query_df['execution_times'].apply(np.median)
query_df['mean_time'] = query_df['execution_times'].apply(np.mean)
query_df['std_dev'] = query_df['execution_times'].apply(np.std)
query_df['max_time'] = query_df['execution_times'].apply(max)
query_df.to_csv(os.path.join(config.RESULTS_PATH,f"stats_query_run_time_scale_{scale_factor}.csv"),index=False)

Time of Execution: 8446.659896999336
Time of Execution: 15.766841999720782
Time of Execution: 69844.81431200038
Time of Execution: 76109.06143600005
Time of Execution: 18.493386000045575
Time of Execution: 3663.4492429984675
Time of Execution: 11289.980662000744
Time of Execution: 462.6918400026625
Time of Execution: 3429.5022919977782
Time of Execution: 8639.549784002156
Time of Execution: 14.443983000091976
Time of Execution: 69534.11541499736
Time of Execution: 74968.17872999964
Time of Execution: 19.282425000710646
Time of Execution: 3672.237688999303
Time of Execution: 11312.797072001558
Time of Execution: 457.3032750013226
Time of Execution: 3365.700743997877
Time of Execution: 8425.37604899917
Time of Execution: 14.79256700258702
Time of Execution: 69538.11593799765
Time of Execution: 74150.35592999993
Time of Execution: 18.4987419997924
Time of Execution: 3657.7705099989544
Time of Execution: 11428.534763999778
Time of Execution: 447.80755400279304
Time of Execution: 3342.32869