In [1]:
import os
import time
from multiprocessing import Pool
from collections import defaultdict

from map_function_grid import map_grid_fun
from map_function_angular import map_angular_fun
from red_function import red_fun
from red_function_filtered import red_fun_with_filtering
from multiple_red_fun_angular import parallel_reducers_angular
from first_red_fun_parallel_grid import first_red
from multiple_red_fun_grid import parallel_reducers_grid


BASE_DIR = os.path.dirname(os.path.abspath("__file__"))
input = open(BASE_DIR + '/dataset.txt', 'r')

input_points = []
for line in input:
    datalist = line.strip().split(" ")
    tmp = []
    for x in datalist:
        tmp.append(float(x))
    input_points.append(tuple(tmp))

dim = 4  #default value for dimensions
split = 5  #default num split per dimension

In [2]:
def compute_sky(points):
    print('computing skyline')

    sorted_points = sorted(points)
    window = []

    for point in sorted_points:

        to_window = True
        for x in window:
            if x == point:
                break
            dominated = 0
            for d in range(dim):
                if point[d] < x[d]:
                    break
                else:
                    dominated += 1
            if dominated == dim:
                to_window = False
                break
        if to_window: window.append(point)

    return window

In [3]:
def remove_items(test_list, item):
    c = 0
    for i in test_list:
        c += 1
        if (i == item):
            if test_list[c] == item:
                test_list.remove(i)
            test_list.remove(i)

    return test_list


def compute_sky_bnl(points):
    window = []
    to_remove = set()
    discard = 0
    res = []

    #for each point scan the entire window(no sorting): if pass all tests, then add to window. Finally check if you have
    #to remove the point after the whole scan
    for p in points:
        for elem in window:
            if elem == p: continue
            discard = 1
            if elem not in to_remove:
                remove = 1
                for i in range(dim):
                    if p[i] < elem[i]:
                        discard = 0
                    elif elem[i] < p[i]:
                        remove = 0
                if remove:
                    to_remove.add(elem)
                if discard:
                    break
            else:
                discard = 0
        if not discard:
            window.append(p)
    res = window
    for elem in to_remove:
        res = remove_items(window, elem)

    return res

In [4]:
def compute_sky_with_filter(points, filter):
    sorted_points = sorted(points)
    window = []
    for point in sorted_points:
        to_window = True
        for x in filter:
            if x == point:
                break
            dominated = 0
            for d in range(dim):
                if point[d] < x[d]:
                    break
                else:
                    dominated += 1
            if dominated == dim:
                to_window = False
                break
        if to_window == False: continue
        for x in window:
            if x == point:
                break
            dominated = 0
            for d in range(dim):
                if point[d] < x[d]:
                    break
                else:
                    dominated += 1
            if dominated == dim:
                to_window = False
                break
        if to_window: window.append(point)

    return window

In [5]:
#sequential SFS
start = time.time()

res = compute_sky(input_points)

print('total execution time= ' + str(time.time() - start))
print(len(res))  #number of skyline points

computing skyline
total execution time= 2.868252992630005
395


In [6]:
#sequential bnl
#sequential SFS
start = time.time()

res = compute_sky_bnl(input_points)

print('total execution time= ' + str(time.time() - start))
print(len(res))  #number of skyline points

total execution time= 13.419967889785767
395


In [7]:
#MapReduce Style with grid partitioning
pool = Pool()
start = time.time()

mapped = pool.map(map_grid_fun, input_points)
print('mapping time: ' + str(time.time() - start))

###shuffling phase
candidates = defaultdict(list)

for p in mapped:
    candidates[p[0]].append(tuple(p[1:]))
###

result = []
reduction = time.time()
result.append(pool.map(red_fun, candidates.values()))
print('reduction time: ' + str(time.time() - reduction))

count = 0
points_ = []
for x in result[0]:
    for y in x:
        points_.append(y)

print('Local Skyline points: ' + str(count))
global_reduction = time.time()
res = compute_sky(points_)
print('global reduction time: ' + str(time.time() - global_reduction))
print(len(res))
print('total execution time= ' + str(time.time() - start))

pool.close()
pool.join()

mapping time: 0.44748973846435547
reduction time: 0.9194223880767822
Local Skyline points: 0
computing skyline
global reduction time: 0.4597160816192627
395
total execution time= 2.0130677223205566


In [8]:
#MapReduce Style with angular partitioning
pool = Pool()
start = time.time()

mapped = pool.map(map_angular_fun, input_points)
print('mapping time: ' + str(time.time() - start))

###shuffling phase
candidates = defaultdict(list)

for p in mapped:
    candidates[p[0]].append(tuple(p[1:]))
###

result = []
reduction = time.time()
result.append(pool.map(red_fun, candidates.values()))
print('reduction time: ' + str(time.time() - reduction))

count = 0
points = []
for x in result[0]:
    for y in x:
        points.append(y)

print('Local Skyline points: ' + str(count))
global_reduction = time.time()
res = compute_sky(points)
print('global reduction time: ' + str(time.time() - global_reduction))
print(len(res))
print('total execution time= ' + str(time.time() - start))

pool.close()
pool.join()

mapping time: 0.6787850856781006
reduction time: 0.2604379653930664
Local Skyline points: 0
computing skyline
global reduction time: 0.06574511528015137
395
total execution time= 1.4219980239868164


In [9]:
#MapReduce Style - grid partitioning with partitions dominance tests
pool = Pool()
mapping = time.time()
mapped_tmp = pool.map(map_grid_fun, input_points)

print("mapping time= " + str(time.time() - mapping))

mapped_set = set()
candidates = defaultdict(list)
for x in mapped_tmp:
    mapped_set.add(x[0])
    candidates[x[0]].append(tuple(x[1:]))

mapped_list = list(mapped_set)

mapped_list.sort()

new_list = []
i = 0

done = 0
new_list = []

for point in mapped_list:
    p1 = []
    for i in range(dim):
        p1.append((point[i]) / split)
    to_window = True
    for x in new_list:
        if x == point:
            break
        dominated = 0
        p2 = []
        for i in range(dim):
            p2.append((x[i] + 1) / split)
        for d in range(dim):
            if p1[d] < p2[d]:
                break
            else:
                dominated += 1
        if dominated == dim:
            to_window = False
            break
    if to_window:
        new_list.append(point)

print('partitions survived: ' + str(len(new_list)))

mapped = []
candidates2 = defaultdict(list)
punti_ok = 0
for k in candidates.keys():
    if k in new_list:
        candidates2[k] = candidates[k]
        punti_ok += len(candidates[k])
print(len(new_list))
total_points=len(input_points)
print('punti pruned: ' + str(total_points - punti_ok))

result = []

result.append(pool.map(red_fun, candidates2.values()))

count = 0
points_list = []
for x in result[0]:
    for y in x:
        count += 1
        points_list.append(y)

#print('Local Skyline points: '+str(count))
global_reduction = time.time()
res = compute_sky(points)
print('sequential time: ' + str(time.time() - global_reduction))

print(len(res))
print('total execution time= ' + str(time.time() - mapping))

pool.close()
pool.join()

mapping time= 0.5149581432342529
partitions survived: 369
369
punti pruned: 204339
computing skyline
sequential time: 0.06895899772644043
395
total execution time= 1.5926289558410645


In [10]:
#MapReduce Style - nearest to the origin filter

pool = Pool()
start = time.time()

mapped = pool.map(map_grid_fun, input_points)

candidates = defaultdict(list)

for p in mapped:
    candidates[p[0]].append(tuple(p[1:]))

result = []

result.append(pool.map(red_fun_with_filtering, candidates.values()))

count = 0
points_list = []
filter = []
for x in result[0]:
    for y in x[0]:
        count += 1
        points_list.append(y)
    filter.append(x[1])

print('Local Skyline points: ' + str(count))
reduction = time.time()
res = compute_sky_with_filter(points_list, filter)
print('reduction time= ' + str(time.time() - reduction))
print(len(res))

print('total execution time= ' + str(time.time() - start))
pool.close()
pool.join()

Local Skyline points: 43571
reduction time= 1.9726598262786865
395
total execution time= 3.715308666229248


In [11]:
#MapReduce Style - multiple reducer with angular partitioning
pool = Pool()
start = time.time()


mapping = time.time()
mapped_tmp = pool.map(map_angular_fun, input_points)
print("mapping time= " + str(time.time() - mapping))

candidates = defaultdict(list)
for p in mapped_tmp:
    candidates[p[0]].append(tuple(p[1:]))

result = []
reduction = time.time()
result.append(pool.map(red_fun, candidates.values()))

loc=[]
for x in result[0]:
    loc+=x
print('reduction time= ' + str(time.time() - reduction))
res_list = [loc] * len(result[0])
global_skyline = []
gl = []
tmp = []
d=zip(result[0], res_list)

global_skyline.append(pool.map(parallel_reducers_angular, d))


count = 0
l=0
print('total execution time= ' + str(time.time() - start))
for x in result[0]:
    for y in x[0]:
        count += 1
for g in global_skyline[0]:
    for x in g:
        gl.append(tuple(x))
        l+=len(g)

print(len(gl))

pool.close()
pool.join()

mapping time= 1.0326259136199951
reduction time= 0.34678125381469727
total execution time= 1.785710096359253
395


In [13]:
#MapReduce Style - multiple reducer with grid partitioning
pool = Pool()
start = time.time()

mapping = time.time()
mapped_tmp = pool.map(map_grid_fun, input_points)

print("mapping time= " + str(time.time() - mapping))

mapped_set = set()
candidates = defaultdict(list)
for x in mapped_tmp:
    mapped_set.add(x[0])
    candidates[x[0]].append(tuple(x[1:]))
mapped_list = list(mapped_set)
print('num partitions: ' + str(len(mapped_list)))

mapped_list.sort()
new_list = []

for point in mapped_list:
    p1 = []
    for i in range(dim):
        p1.append((point[i]) / split)
    to_window = True
    for x in new_list:
        if x == point:
            break
        dominated = 0
        p2 = []
        for i in range(dim):
            p2.append((x[i] + 1) / split)
        for d in range(dim):
            if p1[d] < p2[d]:
                break
            else:
                dominated += 1
        if dominated == dim:
            to_window = False
            break
    if to_window:
        new_list.append(point)

print('partitions survived: ' + str(len(new_list)))
mapped = []
candidates2 = defaultdict(list)
for k in candidates.keys():
    if k in new_list:
        candidates2[k] = candidates[k]
        candidates2[k].append(k)

result = []
result.append(pool.map(first_red, candidates2.values()))#qui creare file con reduce

global_skyline_ = []
for x in result[0]:
    global_skyline_+=x
gl = []
g=[global_skyline_]* len(result[0])
d=zip(result[0], g)

global_skyline=[]
bottleneck=time.time()
global_skyline.append(pool.map(parallel_reducers_grid, d))
print('GLOBAL REDUCTION: '+str(time.time()-bottleneck))


count = 0
l=0
print('total execution time= ' + str(time.time() - start))

for g in global_skyline[0]:
    for x in g:
        gl.append(tuple(x))
        l+=len(g)
print(len(gl))

pool.close()
pool.join()

GLOBAL REDUCTION: 2.549891948699951
total execution time= 4.202321290969849
395
mapping time= 0.5053739547729492
num partitions: 626
partitions survived: 369
GLOBAL REDUCTION: 2.9927620887756348
total execution time= 4.409381151199341
395
