# TPCDS: Preprocessing, DB Setup and Data Load Script

In [1]:
import sys, os, re
import psycopg2
import numpy as np
import pandas as pd
from psycopg2 import Error
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT


In [2]:
# set up connection variables
db_host = "localhost"
db_port = "5432"
db_user = "postgres"
db_pass = "password"
db_name = "postgres"

# function to connect with postgres
def connect_postgres(db_host, db_port, db_user, db_pass, db_name):
    try:
        # Connect to an existing database
        connection = psycopg2.connect(host = db_host,
                                      port = db_port,
                                      user = db_user,
                                      password = db_pass,
                                      database = db_name)
        # Set auto-commit
        connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT);
        # Create a cursor to perform database operations
        cur = connection.cursor()
        # Print PostgreSQL details
        print("PostgreSQL server information")
        print(connection.get_dsn_parameters(), "\n")
        # Executing a SQL query
        cur.execute("SELECT version();")
        # Fetch result
        record = cur.fetchone()
        print("You are connected to - ", record, "\n")

    except (Exception, Error) as error:
        print("Error while connecting to PostgreSQL", error)
    else:
        return cur

In [3]:
# connect to postgres

cur = connect_postgres(db_host, db_port, db_user, db_pass, db_name)

PostgreSQL server information
{'user': 'postgres', 'dbname': 'postgres', 'host': 'localhost', 'port': '5432', 'tty': '', 'options': '', 'sslmode': 'prefer', 'sslcompression': '0', 'gssencmode': 'prefer', 'krbsrvname': 'postgres', 'target_session_attrs': 'any'} 

You are connected to -  ('PostgreSQL 14.5, compiled by Visual C++ build 1914, 64-bit',) 



In [4]:
# drop tpcds db

db_name = "tpcds"

cur.execute(
    f"DROP DATABASE IF EXISTS {db_name} WITH (FORCE);"
)
print("SQL Status Output:\n", cur.statusmessage)

SQL Status Output:
 DROP DATABASE


In [5]:
# change win1252 encoding temp db to normal before drop
try:
    cur.execute(
        "ALTER DATABASE win1252_temp is_template false;"
    )
except Exception as e:
    print(e)
else:
    print("SQL Status Output:\n", cur.statusmessage)

SQL Status Output:
 ALTER DATABASE


In [6]:
# drop win1252 encoding temp db (after set to normal db)
cur.execute(
    "DROP DATABASE IF EXISTS win1252_temp WITH (FORCE);"
)
print("SQL Status Output:\n", cur.statusmessage)

SQL Status Output:
 DROP DATABASE


In [7]:
# create win1252 encoding temp db
cur.execute(
    """
    
    CREATE DATABASE win1252_temp
        WITH
        OWNER = postgres
        TEMPLATE = template0
        ENCODING = 'WIN1252'
        CONNECTION LIMIT = -1
        IS_TEMPLATE = True;

    """
)
print("SQL Status Output:\n", cur.statusmessage)

SQL Status Output:
 CREATE DATABASE


In [8]:
# create tpcds db
cur.execute(
    f"""

    CREATE DATABASE {db_name}
        WITH
        OWNER = postgres
        TEMPLATE = win1252_temp
        ENCODING = 'WIN1252'
        CONNECTION LIMIT = -1
        IS_TEMPLATE = False;
        
    """
)
print("SQL Status Output:\n", cur.statusmessage)

SQL Status Output:
 CREATE DATABASE


In [9]:
# connect to tpcds db

cur = connect_postgres(db_host, db_port, db_user, db_pass, db_name)

PostgreSQL server information
{'user': 'postgres', 'dbname': 'tpcds', 'host': 'localhost', 'port': '5432', 'tty': '', 'options': '', 'sslmode': 'prefer', 'sslcompression': '0', 'gssencmode': 'prefer', 'krbsrvname': 'postgres', 'target_session_attrs': 'any'} 

You are connected to -  ('PostgreSQL 14.5, compiled by Visual C++ build 1914, 64-bit',) 



In [10]:
# create tables for db

cur.execute(open("tools/tpcds.sql", "r").read())
print("SQL Status Output:\n", cur.statusmessage)
cur.execute(open("tools/tpcds_source.sql", "r").read())
print("SQL Status Output:\n", cur.statusmessage)

SQL Status Output:
 CREATE TABLE
SQL Status Output:
 CREATE TABLE


In [11]:
# get dir path

path = os.getcwd() + '\\tools\\tmp\\'
files = os.listdir(path)
print(path)

C:\Users\ahmad\Desktop\tpc_ds\tools\tmp\


In [12]:
# function to get full abosolute path of csv files containing data

def get_absolute_path(d):
    return [os.path.join(d, f) for f in os.listdir(d)]

In [13]:
# get full abosolute path of csv files containing data

files_abs_path = [p.replace('\\', '/') for p in get_absolute_path(path)]
print("Total files:", len(files_abs_path))
print("First few files...")
files_abs_path[:5]

Total files: 33
First few files...


['C:/Users/ahmad/Desktop/tpc_ds/tools/tmp/call_center_1_5.csv',
 'C:/Users/ahmad/Desktop/tpc_ds/tools/tmp/catalog_page_1_5.csv',
 'C:/Users/ahmad/Desktop/tpc_ds/tools/tmp/catalog_returns_1_5.csv',
 'C:/Users/ahmad/Desktop/tpc_ds/tools/tmp/catalog_sales_1_5.csv',
 'C:/Users/ahmad/Desktop/tpc_ds/tools/tmp/customer_1_5.csv']

In [14]:
# exclude extra delimiter for dbgen_version file

file_count = 0

for iteration in range(0, 1):
    for file in files_abs_path:
        file_open = open(file,'r')  
        all_text = file_open.read().replace(" ", "")
        file_open.close()

        if (all_text[-13] == '^' and 'dbgen_version' in file):
            file_open_read = open(file, 'r', encoding = 'latin-1')
            string_list = file_open_read.readlines()
            file_open_read.close()

            for i in range(len(string_list)):
                last_delimeter_index = string_list[i].rfind("^")
                string_list[i] = string_list[i][:last_delimeter_index] + "" + string_list[i][last_delimeter_index + 1:]

            file_open_write = open(file, 'w', encoding = 'latin-1')
            new_file_contents = ''.join(string_list)
            file_open_write.write(new_file_contents)
            file_open_write.close()
            
            file_count += 1
        else:
            pass
    print(f'\nIteration {iteration + 1} done!')
    print(f'{file_count} file(s) updated for extra column exclusion.')
    file_count = 0


Iteration 1 done!
1 file(s) updated for extra column exclusion.


In [15]:
# generate sql commands for loading data from csv to postgres db
# considers that csv files were generated in parallel stream

sql_commands_file = open('data_load_script.sql','w')

for file in files:
    underscore_index = [underscore_ind.start() for underscore_ind in re.finditer('_', file)]
    file_name = file[:underscore_index[-2]]
    file_path = path+file
    sql_command = "COPY public."+file_name+" FROM '"+file_path+"' delimiter '^' CSV;\n"
    sql_commands_file.write(sql_command)

sql_commands_file.close()

In [16]:
# load csv data into db

cur.execute(open("data_load_script.sql", "r").read())
print("SQL Status Output:\n", cur.statusmessage)

SQL Status Output:
 COPY 30


In [17]:
# add constraints to db

cur.execute(open("tools/tpcds_ri.sql", "r").read())
print("SQL Status Output:\n", cur.statusmessage)

SQL Status Output:
 ALTER TABLE


In [19]:
# close connection to db

cur.close()

#### End of script.