In [1]:
import hashlib
import time
import pandas as pd
import numpy as np
#pd.set_option('display.max_rows', None)
#pd.options.mode.chained_assignment = None  # default='warn'


In [2]:
#a)Pre-process the data
#Fetch subject, object and property and process the data
def preProcessData(dataF):
    dataF[0] = dataF[0].split(":")[1].strip()
    dataF[1] = dataF[1].split(":")[1].strip()
    dataF[2] = dataF[2].split(":")[1][:-1].strip() if len(dataF[2].split(":"))>1 else dataF[2][:-1].strip()
    return dataF

In [3]:
#Read '100k.txt' file into a pandas dataframe
raw100kDF = pd.read_csv("DataSet/100k.txt", header = None, 
                     sep="\t", 
                     quotechar='"', 
                     skipinitialspace=True, 
                     names=['subject','property','object'])

raw100kDF = raw100kDF.apply(preProcessData, axis=1)

#raw100kDF.info()

In [4]:
#b)Design and implement hash join and sort-merge join algorithms for the query evaluation
# dictionaryDF is a dictionary of dataframes with 
# key: property name, value: dataframe with subject, property and object for a given property
dictionaryDF = dict(tuple(raw100kDF.groupby('property', as_index=False)))

# Remove the "property" column from every dataframe is the dictionary dataframes
for key in dictionaryDF:
    dictionaryDF[key] = dictionaryDF[key].drop("property", axis=1)
    dictionaryDF[key].reset_index(drop = True, inplace = True)


In [5]:
#Function to do sort merge join based on keys
def sortMergeJoin(dataF1, key1, dataF2, key2):
    #Perform sorting based on the keys
    sortDataF1OnKey1 = dataF1.sort_values(by=key1, ascending=True)
    sortDataF2OnKey2 = dataF2.sort_values(by=key2, ascending=True)
    
    mergedDF = pd.merge(sortDataF1OnKey1, sortDataF2OnKey2, left_on=key1, right_on=key2, how='inner')
    return mergedDF    

In [6]:
#The sort-merge join merge results
startTime = time.time()
#sort merge join with two dataframew with different keys passed as params
followsObjectAndFriendOfSubject_SMJDF = sortMergeJoin(dictionaryDF["follows"],"object",dictionaryDF["friendOf"],"subject")

likesObjectAndHasReviewSubject_SMJDF = sortMergeJoin(dictionaryDF["likes"],"object",dictionaryDF["hasReview"],"subject")

friendsOfObjectAndLikesSubject_SMJDF = sortMergeJoin(followsObjectAndFriendOfSubject_SMJDF,"object_y",likesObjectAndHasReviewSubject_SMJDF,"subject_x")

sortMergeJoinQueryResultDF = friendsOfObjectAndLikesSubject_SMJDF.loc[:,("subject_x_x","object_x_x", "object_y_x","object_x_y","object_y_y")]

sortMergeJoinQueryResultDF.rename(columns = {'subject_x_x':'Follows Subject',
                                       'object_x_x':'Follows Object',
                                       'object_y_x':'FriendOf Object',
                                       'object_x_y':'Likes Object',
                                       'object_y_y':' HasReview Object'}, inplace = True)

endTime = time.time()

Sort-Merge Join Time cost

In [7]:
print("The execution time cost for the sort-merge join is %s seconds" %(endTime- startTime))


The execution time cost for the sort-merge join is 0.731783390045166 seconds


In [8]:
sortMergeJoinQueryResultDF.head()

Unnamed: 0,Follows Subject,Follows Object,FriendOf Object,Likes Object,HasReview Object
0,User141,User453,User104,Product0,Review606
1,User141,User453,User104,Product0,Review48
2,User141,User453,User104,Product0,Review118
3,User141,User453,User104,Product0,Review215
4,User141,User453,User104,Product0,Review247


In [9]:
#Hash-Join function based on keys
def hashJoin(dataF1, key1, dataF2, key2):
    dataF1["hashKEY"] = dataF1[key1].apply(hash)
    dataF2["hashKEY"] = dataF2[key2].apply(hash)
    #group by hashKEY. on data frame 1
    dataF1Partitions = dict(tuple(dataF1.groupby('hashKEY')))
    for key in dataF1Partitions:
        #reset index to perform correct merge
        dataF1Partitions[key].reset_index(drop = True, inplace = True)
    
    dictKeys = dataF1Partitions.keys()
    outputAsList = []
    
    for i in range(len(dataF2)):
        if (dataF2.iloc[i]['hashKEY']) in dictKeys:
            key = dataF2.iloc[i]['hashKEY']
            joinRow = dataF2.iloc[[i]]
            #perform left join on 'hash_key' with dataF1Partitions and joinRow
            mergedRow = pd.merge(dataF1Partitions[key],joinRow, on='hashKEY',  how='left')
            outputAsList.append(mergedRow.to_dict('list'))
    
    joinedPartitionList = []
    for dictionary in outputAsList:
        joinedPartitionList.append(pd.DataFrame(dictionary))
    
    resultDF = pd.concat(joinedPartitionList)
    resultDF = resultDF.drop('hashKEY',axis=1)
    
    return resultDF

In [10]:
#Hash Join Results
startTime = time.time()
#hash merge join between dataframes 
followsObjectAndFriendOfSubject_HJDF = hashJoin(dictionaryDF['follows'], 'object',dictionaryDF['friendOf'], 'subject')

likesObjectAndHasReviewSubject_HJDF = hashJoin(dictionaryDF['likes'], 'object',dictionaryDF['hasReview'], 'subject')

friendsOfObjectAndLikesSubject_HJDF = hashJoin(followsObjectAndFriendOfSubject_HJDF, 'object_y',likesObjectAndHasReviewSubject_HJDF, 'subject_x')

hashJoinAndQueryResultDF = friendsOfObjectAndLikesSubject_HJDF.loc[:,("subject_x_x","object_x_x","object_y_x","object_x_y","object_y_y")]
hashJoinAndQueryResultDF.rename(columns = {'subject_x_x':'Follows Subject',
                                       'object_x_x':'Follows Object',
                                       'object_y_x':'FriendOf Object',
                                       'object_x_y':'Likes Object',
                                       'object_y_y':' HasReview Object'}, inplace = True)
                               
endTime = time.time()

In [11]:
print("The execution time cost for hash join: %s in seconds" %(endTime-startTime))

The execution time cost for hash join: 35.49708700180054 in seconds


In [12]:
hashJoinAndQueryResultDF.head()

Unnamed: 0,Follows Subject,Follows Object,FriendOf Object,Likes Object,HasReview Object
0,User630,User9,User20,Product0,Review24
1,User26,User57,User20,Product0,Review24
2,User197,User57,User20,Product0,Review24
3,User351,User57,User20,Product0,Review24
4,User396,User57,User20,Product0,Review24


In [13]:
#c Improve the hashing function
#Hash-Join function based on keys
def improvedHashJoins(dataF1, key1, dataF2, key2):
    dataF1["hashKEY"] = dataF1[key1].apply(hash)
    dataF2["hashKEY"] = dataF2[key2].apply(hash)
    #group by 'hashKEY' on data frame 1
    dataF1Partitions = dict(tuple(dataF1.groupby('hashKEY')))
    for key in dataF1Partitions:
        dataF1Partitions[key].reset_index(drop = True, inplace = True)
    #group by 'hashKEY' on data frame 2
    dataF2Partitions = dict(tuple(dataF2.groupby('hashKEY')))
    for key in dataF2Partitions:
        #reset index to perform correct merge
        dataF2Partitions[key].reset_index(drop = True, inplace = True)
  
    outputAsList = []
    
    for key2 in dataF2Partitions:
        for key1 in dataF1Partitions:
            #if the key matches merge the two dataframes on left join with key 'hashKey'
             if key1==key2:
                    mergedPartitions=pd.merge(dataF1Partitions[key1], dataF2Partitions[key2], on='hashKEY', how='left')
                    outputAsList.append(mergedPartitions.to_dict('list'))
    
    joinedPartitionList = []
    for dictionary in outputAsList:
        joinedPartitionList.append(pd.DataFrame(dictionary))
    
    resultDF = pd.concat(joinedPartitionList)
    resultDF = resultDF.drop('hashKEY',axis=1)
    
    return resultDF

In [14]:
#Improved Hash Join Results
startTime = time.time()
#hash merge join between dataframes 
followsObjectAndFriendOfSubject_HJDF = improvedHashJoins(dictionaryDF['follows'], 'object',dictionaryDF['friendOf'], 'subject')

likesObjectAndHasReviewSubject_HJDF = improvedHashJoins(dictionaryDF['likes'], 'object',dictionaryDF['hasReview'], 'subject')

friendsOfObjectAndLikesSubject_HJDF = improvedHashJoins(followsObjectAndFriendOfSubject_HJDF, 'object_y',likesObjectAndHasReviewSubject_HJDF, 'subject_x')

hashJoinAndQueryResultDF = friendsOfObjectAndLikesSubject_HJDF.loc[:,("subject_x_x","object_x_x","object_y_x","object_x_y","object_y_y")]
hashJoinAndQueryResultDF.rename(columns = {'subject_x_x': 'follows.subject',
                                          'object_x_x': 'follows.object',
                                          'object_y_x': 'friendOf.object',
                                          'object_x_y': 'likes.object',
                                          'object_y_y': 'hasReview.object'}, inplace = True)
                               
endTime = time.time()

In [16]:
print("The execution time cost of improved hash merge join %s seconds"%(endTime-startTime))

The execution time cost of improved hash merge join 4.0191123485565186 seconds


In [17]:
hashJoinAndQueryResultDF.head()

Unnamed: 0,follows.subject,follows.object,friendOf.object,likes.object,hasReview.object
0,User25,User670,User296,Product0,Review24
1,User25,User670,User296,Product0,Review48
2,User25,User670,User296,Product0,Review118
3,User25,User670,User296,Product0,Review215
4,User25,User670,User296,Product0,Review247
