In [2]:
import psycopg2
import os
import sys
import math
from psycopg2 import sql

RANGE_TABLE_PREFIX = 'range_ratings_part'
RROBIN_TABLE_PREFIX = 'round_robin_ratings_part'

### loadRatings( ):- 
Takes a file system path that contains the rating file as input. loadRatings() then load all ratings into a table (saved in PostgreSQL) named ratings that has the following schema userid(int) – movieid(int) – rating(float)

In [3]:
def getOpenConnection(user='postgres', password='1234', dbname='postgres'):
    return psycopg2.connect("dbname='" + dbname + "' user='" + user + "' host='localhost' password='" + password + "'")

In [36]:
def loadRatings(ratingstablename, ratingsfilepath, openconnection):
    with openconnection:
        with openconnection.cursor() as cur:
            # Drop table if it exists
            cur.execute(
                sql.SQL("DROP TABLE IF EXISTS {}")
                    .format(sql.Identifier(ratingstablename))
            )
            
            # Create a table
            cur.execute(
                sql.SQL("CREATE TABLE {} (userid integer, empty1 varchar(10), movieid integer, empty2 varchar(10), rating float, empty3 varchar(10), timestamp integer)")
                    .format(sql.Identifier(ratingstablename))
            )
            
            # Copy data from file to table
            with open(ratingsfilepath) as f:
                cur.copy_from(
                    f, ratingstablename, ':', columns=('userid', 'empty1', 'movieid', 'empty2', 'rating', 'empty3', 'timestamp')
                )
            
            cur.execute(
                sql.SQL("ALTER TABLE {} DROP COLUMN empty1, DROP COLUMN empty2, DROP COLUMN empty3, DROP COLUMN timestamp")
                    .format(sql.Identifier(ratingstablename))
            )
            
            # Create metadata table
            cur.execute("DROP TABLE IF EXISTS ratings_metadata")
            cur.execute("CREATE TABLE ratings_metadata (description varchar(100), amt integer)")

In [37]:
# Test1: Check if drop works
# Test2: Check if create table works
ratings_filename = 'test_data1.txt'

# IMP: use this in actual .py file
# ratings_filepath = os.path.join(os.path.dirname(__file__), '..', '..', ratings_filename)
ratings_filepath = "C:\\Users\\rrgore\\Documents\\cse512-dds\\Assignment1\\test_data1.txt"
conn = getOpenConnection(dbname='cse512')
loadRatings('ratings', ratings_filepath, conn)

### rangePartition( ):- 
Takes as input: (1) the Ratings table stored in PostgreSQL and (2) an integer value N; that represents the number of partitions. Then, rangePartition() generates N horizontal fragments of the ratings table and store them in PostgreSQL. The algorithm should partition the ratings table based on N uniform ranges of the rating attribute.

In [40]:
def rangePartition(ratingstablename, numberofpartitions, openconnection):
    # Find out interval size of ratings in fragmented tables
    interval_size = 5.0/numberofpartitions
    
    # Determine the intervals
    # Range is of type low <= x < high
    rangeStart = 0
    nameIx = 0
    
    # For each interval, find values from DB
    with openconnection:
        with openconnection.cursor() as cur:
            while rangeStart < 5:
                rangeEnd = rangeStart+interval_size
                if int(rangeEnd) == 5:
                    cur.execute(
                        # Since last i.e. 5 gets excluded
                        sql.SQL("SELECT * FROM {} WHERE rating>=%s")
                            .format(sql.Identifier(ratingstablename)), 
                        (rangeStart,)
                    )
                else:
                    cur.execute(
                        sql.SQL("SELECT * FROM {} WHERE rating>=%s AND rating<%s")
                            .format(sql.Identifier(ratingstablename)), 
                        (rangeStart, rangeEnd)
                    )
                query_op = cur.fetchall()               
        
                # Create new table for fragment, push these values in it
                frag_name = RANGE_TABLE_PREFIX+str(nameIx)
                
                cur.execute(
                    sql.SQL("DROP TABLE IF EXISTS {}")
                        .format(sql.Identifier(frag_name))
                )
            
                cur.execute(
                    sql.SQL("CREATE TABLE {} (userid integer, movieid integer, rating float)")
                        .format(sql.Identifier(frag_name))
                )
                
                for (uid, mid, r) in query_op:
                    cur.execute(
                        sql.SQL("INSERT INTO {} (userid, movieid, rating) VALUES (%s, %s, %s)")
                            .format(sql.Identifier(frag_name)),
                        (uid, mid, r)
                    )
                rangeStart = rangeEnd
                nameIx += 1
            
            # Insert into metadata table
            cur.execute("INSERT INTO ratings_metadata (description, amt) VALUES (%s, %s)",
                ("Range partitions", numberofpartitions)
            )

In [41]:
conn = getOpenConnection(dbname='cse512')
N = 5
rangePartition('ratings', N, conn)

In [29]:
# Clear partitions
frag_names = [RANGE_TABLE_PREFIX+str(x) for x in range(0, 5)]
openconnection = getOpenConnection(dbname='cse512')
for name in frag_names:
    deleteTables(name, openconnection)

### roundRobinPartition( ):-
Takes as input: (1) the ratings table stored in PostgreSQL and (2) an integer value N; that represents the number of partitions. The function then generates N horizontal fragments of the ratings table and stores them in PostgreSQL.

In [42]:
def roundRobinPartition(ratingstablename, numberofpartitions, openconnection):
    # Generate N tables
    with openconnection:
        with openconnection.cursor() as cur:
            for i in range(0, numberofpartitions):
                frag_name = RROBIN_TABLE_PREFIX+str(i)
                cur.execute(
                    sql.SQL("DROP TABLE IF EXISTS {}")
                        .format(sql.Identifier(frag_name))
                )

                cur.execute(
                    sql.SQL("CREATE TABLE {} (userid integer, movieid integer, rating float)")
                        .format(sql.Identifier(frag_name))
                )
                
            cur.execute(
                sql.SQL("SELECT * FROM {}")
                    .format(sql.Identifier(ratingstablename))
            )
            query_op = cur.fetchall()
            nameIx = 0
            
            # Insert tuples in tables in round robin manner
            for (uid, mid, r) in query_op:
                frag_name = RROBIN_TABLE_PREFIX+str(nameIx)
                cur.execute(
                    sql.SQL("INSERT INTO {} (userid, movieid, rating) VALUES (%s, %s, %s)")
                        .format(sql.Identifier(frag_name)),
                    (uid, mid, r)
                )
                nameIx += 1
                nameIx = nameIx%numberofpartitions
                
            # Insert into metadata
            cur.execute("INSERT INTO ratings_metadata (description, amt) VALUES (%s, %s)",
                ("Round robin partitions", numberofpartitions)
            )

In [43]:
conn = getOpenConnection(dbname='cse512')
N = 5
roundRobinPartition('ratings', N, conn)

In [12]:
# Clear partitions
frag_names = [RROBIN_TABLE_PREFIX+str(x) for x in range(0, 5)]
openconnection = getOpenConnection(dbname='cse512')
for name in frag_names:
    deleteTables(name, openconnection)

### roundRobinInsert( ):
Takes as input: (1) ratings table stored in PostgreSQL, (2) userid, (3) itemid, (4) rating. Then, roundRobinInsert() inserts a new tuple to the ratings table and the right fragment based on the round robin approach.

In [48]:
def roundRobinInsert(ratingstablename, userid, itemid, rating, openconnection):
    # First insert to table
    with openconnection:
        with openconnection.cursor() as cur:
            cur.execute(
                sql.SQL("INSERT INTO {} (userid, movieid, rating) VALUES (%s, %s, %s)")
                    .format(sql.Identifier(ratingstablename)),
                (userid, itemid, rating)
            )
            
            # Find number of fragments
            metadata_name = "ratings_metadata"
            cur.execute(
                sql.SQL("SELECT amt FROM {} WHERE description=%s")
                    .format(sql.Identifier(metadata_name)),
                ('Round robin partitions',)
            )
            query_op = cur.fetchall()
            num_fragments = query_op[0][0]
            
            # Find correct fragment
            cur.execute(
                sql.SQL("SELECT COUNT(userid) FROM {}")
                    .format(sql.Identifier(ratingstablename))
            )
            query_op = cur.fetchall()
            num_records = query_op[0][0]
            if num_records%num_fragments == 0:
                frag_id = num_fragments - 1
            else:
                frag_id = num_records%num_fragments - 1
            frag_name = RROBIN_TABLE_PREFIX + str(frag_id)
        
            # Insert to fragment
            cur.execute(
                sql.SQL("INSERT INTO {} (userid, movieid, rating) VALUES (%s, %s, %s)")
                    .format(sql.Identifier(frag_name)),
                (userid, itemid, rating)
            )            

In [49]:
tuples = [(1, 616, 5), (2, 110, 5), (2, 151, 3)]
conn = getOpenConnection(dbname='cse512')
for (u, m, r) in tuples:
    roundRobinInsert('ratings', u, m, r, conn)

In [19]:
# Metadata table setup
openconnection = getOpenConnection(dbname='cse512')
name = "ratings_metadata"
with openconnection:
    with openconnection.cursor() as cur:
        cur.execute(
            sql.SQL("DROP TABLE IF EXISTS {}")
                .format(sql.Identifier(name))
        )
        
        cur.execute(
            sql.SQL("CREATE TABLE {} (description varchar(100), amt integer)")
                .format(sql.Identifier(name))
        )
        
        cur.execute(
            sql.SQL("INSERT INTO {} (description, amt) VALUES (%s, %s)")
                .format(sql.Identifier(name)),
            ("Range partitions", 5)
        )
        
        cur.execute(
            sql.SQL("INSERT INTO {} (description, amt) VALUES (%s, %s)")
                .format(sql.Identifier(name)),
            ("Round robin partitions", 5)
        )
            

### rangeInsert( ):-
Takes as input: (1) ratings table stored in PostgreSQL, (2) userid, (3) itemid, (4) rating. Then, roundRobinInsert() inserts a new tuple to the ratings table and the right fragment based on the round robin approach.

In [50]:
def rangeInsert(ratingstablename, userid, itemid, rating, openconnection):
    # First insert to table
    with openconnection:
        with openconnection.cursor() as cur:
            cur.execute(
                sql.SQL("INSERT INTO {} (userid, movieid, rating) VALUES (%s, %s, %s)")
                    .format(sql.Identifier(ratingstablename)),
                (userid, itemid, rating)
            )
            
            # Find number of fragments
            metadata_name = "ratings_metadata"
            cur.execute(
                sql.SQL("SELECT amt FROM {} WHERE description=%s")
                    .format(sql.Identifier(metadata_name)),
                ('Range partitions',)
            )
            query_op = cur.fetchall()
            num_fragments = query_op[0][0]
    
            # Find appropriate frag num
            interval_size = 5.0/num_fragments
            if rating == 5:
                frag_num = num_fragments-1
            else:
                frag_num = math.floor(rating/interval_size)

            frag_name = RANGE_TABLE_PREFIX + str(frag_num)
        
            # Insert to fragment
            cur.execute(
                sql.SQL("INSERT INTO {} (userid, movieid, rating) VALUES (%s, %s, %s)")
                    .format(sql.Identifier(frag_name)),
                (userid, itemid, rating)
            )            

In [51]:
tuples = [(1, 594, 5), (2, 110, 5), (2, 151, 3)]
# tuples = [(1, 616, 5)]
conn = getOpenConnection(dbname='cse512')
for (u, m, r) in tuples:
    rangeInsert('ratings', u, m, r, conn)

### rangeQuery( ):-
Returns all tuples for which the rating value is larger than or equal to RatingMinValue and less than or equal to RatingMaxValue. The returned tuples should be stored in outputPath.

In [52]:
def rangeQuery(ratingMinValue, ratingMaxValue, openconnection, outputPath):
    if os.path.exists(outputPath):
        os.remove(outputPath)
        
    
    with openconnection:
        with openconnection.cursor() as cur:
            # Get fragments
            metadata_name = "ratings_metadata"
            cur.execute(
                sql.SQL("SELECT amt FROM {} WHERE description=%s")
                    .format(sql.Identifier(metadata_name)),
                ('Range partitions',)
            )
            query_op = cur.fetchall()
            num_fragments = query_op[0][0]
            
            # Check if fragment is relevant
            relevant_map = [0 for i in range(num_fragments)]
            interval_size = 5.0/num_fragments
            curr_min = 0.0
            i = 0
            while curr_min < 5.0:
                # allowed values in fragment: min<=r<max
                curr_max = curr_min+interval_size
                if curr_min<=ratingMaxValue and curr_max>ratingMinValue:
                    relevant_map[i] = 1
                i += 1
                curr_min = curr_max
            
            # Query each relevant fragment
            for i in range(num_fragments):
                if relevant_map[i] == 1:
                    frag_name = RANGE_TABLE_PREFIX+str(i)
                    cur.execute(
                        sql.SQL("SELECT * FROM {} WHERE rating>=%s AND rating<=%s")
                            .format(sql.Identifier(frag_name)),
                        (ratingMinValue,ratingMaxValue)
                    )
                    query_op = cur.fetchall()
                    
                    # Write to file path
                    with open(outputPath,"a+") as f:
                        for (u, m, r) in query_op:
                            f.write(
                                "{0},{1},{2},{3}\n".format(frag_name, u, m, r)
                            )
                         
            # Get round robin fragments
            cur.execute(
                sql.SQL("SELECT amt FROM {} WHERE description=%s")
                    .format(sql.Identifier(metadata_name)),
                ('Round robin partitions',)
            )
            query_op = cur.fetchall()
            num_fragments = query_op[0][0]
            
            for i in range(num_fragments):
                frag_name = RROBIN_TABLE_PREFIX+str(i)
                cur.execute(
                    sql.SQL("SELECT * FROM {} WHERE rating>=%s AND rating<=%s")
                        .format(sql.Identifier(frag_name)),
                    (ratingMinValue,ratingMaxValue)
                )
                query_op = cur.fetchall()

                # Write to file path
                with open(outputPath,"a+") as f:
                    for (u, m, r) in query_op:
                        f.write(
                            "{0},{1},{2},{3}\n".format(frag_name, u, m, r)
                        )

In [55]:
conn = getOpenConnection(dbname='cse512')
output_path = 'C:\\Users\\rrgore\\Documents\\cse512-dds\\Assignment1\\query_result.txt'
rangeQuery(2.3, 4.2, conn, output_path)

### pointQuery( ):-
Returns all tuples for which the rating value is equal to RatingValue. The returned tuples should be stored in outputPath.

In [62]:
def pointQuery(ratingValue, openconnection, outputPath):
    if os.path.exists(outputPath):
        os.remove(outputPath)
        
    with openconnection:
        with openconnection.cursor() as cur:
            # Get range fragments
            metadata_name = "ratings_metadata"
            cur.execute(
                sql.SQL("SELECT amt FROM {} WHERE description=%s")
                    .format(sql.Identifier(metadata_name)),
                ('Range partitions',)
            )
            query_op = cur.fetchall()
            num_fragments = query_op[0][0]
            
            # Find appropriate frag num
            interval_size = 5.0/num_fragments
            relevant_frag = -1
            if ratingValue == 5:
                relevant_frag = num_fragments-1
            else:
                relevant_frag = math.floor(ratingValue/interval_size)
                
            frag_name = RANGE_TABLE_PREFIX + str(relevant_frag)
            cur.execute(
                sql.SQL("SELECT * FROM {} WHERE rating=%s")
                    .format(sql.Identifier(frag_name)),
                (ratingValue,)
            )
            query_op = cur.fetchall()

            # Write to file path
            with open(outputPath,"a+") as f:
                for (u, m, r) in query_op:
                    f.write(
                        "{0},{1},{2},{3}\n".format(frag_name, u, m, r)
                    )
            
            # Get round robin fragments
            cur.execute(
                sql.SQL("SELECT amt FROM {} WHERE description=%s")
                    .format(sql.Identifier(metadata_name)),
                ('Round robin partitions',)
            )
            query_op = cur.fetchall()
            num_fragments = query_op[0][0]
            
            for i in range(num_fragments):
                frag_name = RROBIN_TABLE_PREFIX+str(i)
                cur.execute(
                    sql.SQL("SELECT * FROM {} WHERE rating=%s")
                        .format(sql.Identifier(frag_name)),
                    (ratingValue,)
                )
                query_op = cur.fetchall()

                # Write to file path
                with open(outputPath,"a+") as f:
                    for (u, m, r) in query_op:
                        f.write(
                            "{0},{1},{2},{3}\n".format(frag_name, u, m, r)
                        )

In [63]:
conn = getOpenConnection(dbname='cse512')
output_path = 'C:\\Users\\rrgore\\Documents\\cse512-dds\\Assignment1\\query_result.txt'
pointQuery(5.0, conn, output_path)

In [1]:
def deleteTables(ratingstablename, openconnection):
    try:
        cursor = openconnection.cursor()
        if ratingstablename.upper() == 'ALL':
            cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'")
            tables = cursor.fetchall()
            for table_name in tables:
                cursor.execute('DROP TABLE %s CASCADE' % (table_name[0]))
        else:
            cursor.execute('DROP TABLE %s CASCADE' % (ratingstablename))
        openconnection.commit()
    except psycopg2.DatabaseError as e:
        if openconnection:
            openconnection.rollback()
        print('Error %s' % e)
    except IOError as e:
        if openconnection:
            openconnection.rollback()
        print('Error %s' % e)
    finally:
        if cursor:
            cursor.close()

In [65]:
conn = getOpenConnection(dbname='cse512')
deleteTables('ratings', conn)
deleteTables('ratings_metadata', conn)
deleteTables('range_ratings_part0', conn)
deleteTables('range_ratings_part1', conn)
deleteTables('range_ratings_part2', conn)
deleteTables('range_ratings_part3', conn)
deleteTables('range_ratings_part4', conn)
deleteTables('round_robin_ratings_part0', conn)
deleteTables('round_robin_ratings_part1', conn)
deleteTables('round_robin_ratings_part2', conn)
deleteTables('round_robin_ratings_part3', conn)
deleteTables('round_robin_ratings_part4', conn)

Error table "range_ratings_part3" does not exist

Error table "range_ratings_part4" does not exist

Error table "round_robin_ratings_part3" does not exist

Error table "round_robin_ratings_part4" does not exist



In [None]:
conn = getOpenConnection()
loadRatings(RATINGS_TABLE_NAME, INPUT_FILE_PATH_NAME, conn)
rangePartition(RATINGS_TABLE_NAME, 3, conn)
rangeInsert(RATINGS_TABLE_NAME, 1, 589, 4.2, conn)
rangeInsert(RATINGS_TABLE_NAME, 1, 589, 1.2, conn)
rangeInsert(RATINGS_TABLE_NAME, 1, 589, 3.2, conn)
roundRobinPartition(RATINGS_TABLE_NAME, 3, conn)
roundRobinInsert(RATINGS_TABLE_NAME, 1, 589, 4.2, conn)
roundRobinInsert(RATINGS_TABLE_NAME, 1, 589, 1.2, conn)
roundRobinInsert(RATINGS_TABLE_NAME, 1, 589, 3.2, conn)
rangeQuery(1, 5, conn, "./rangeResult.txt")
pointQuery(5, conn, "./pointResult.txt")
pointQuery(1.5, conn, "./pointResult.txt")
pointQuery(3.2, conn, "./pointResult.txt")