# 选题介绍 - 快速排序

## 小组成员

- 计1504 41416058 徐经纬
- 计1504 41524206 李凯
- 计1504 41524208 李鸿卓
- 计1504 41524201 朱柯佳
- 计1502 61562034 艾万


## 朴素快速排序算法


众所周知的基于分治思想的排序算法，朴素的实现上，最优时间复杂度为$O(nlogn)$，最坏时间复杂度可以被卡到$O(n^2)$

以下针对整形数组来介绍和探究快速排序算法的应用和扩展

主要算法步骤：

- 从数组中选择一个目标值aim，按照下面的方式划分数组：
    
    - 所有小于aim的元素都划分到aim的左侧
    
    - 所有不小于aim的元素都划分到aim的右侧

- 递归对左右两部分分别进行上一步操作，直至整个数组处理完成

### 一个简单的例子

    8 5 7 3 9    - 选择目标值为7

    7 5 8 3 9    - 将目标位置和当前最左边位置互换，令left=1(val=5), right=5(val=9)

    7 5 8 3 9    - 右移left，左移right至满足交换条件：left=3(val=8), right=4(val=3), 交换两个位置的值

    7 5 3 8 9    - left=4 > right=3, 结束交换，交换right位置和最左边位置

    3 5 7 8 9    - 对[3 5]和[8 9]两部分递归进行处理
    
    3 5 7 8 9    - 完成排序

### 优化

主要算法步骤：

- 从数组中选择一个目标值aim，按照下面的方式划分数组：
    
    - 所有小于aim的元素都划分到aim的左侧
    
    - 所有不小于aim的元素都划分到aim的右侧

- 递归对左右两部分分别进行上一步操作，直至整个数组处理完成

主要优化点：

- [x] 目标位置选取优化：对于固定选取最左边位置，完全逆序数据会被卡到最差复杂度 => 结论：固定目标位置的选取都是不优秀的
- [x] 划分方式优化：对于朴素二分方式，完全相同数据会被卡到最差复杂度 => 结论：对于存在大量相同数据的数据集，朴素二分方式是不优秀的


```cpp
using namespace std;
typedef pair<int, int> P;

P partition(int a[], int l, int r) {
	if (l >= r) return P(l, r);
	int cur = l;
	int left = l;
	int right = r;
	int p = rand() % (r - l + 1);
	int aim = a[l + p];
	swap(a[l + p], a[l]);
	while (cur <= right) {
		if (a[cur] == aim) cur++;
		else if (a[cur] < aim) swap(a[cur++], a[left++]);
		else swap(a[cur], a[right--]);
	}
	return P(left, right);
}

void qsort(int a[], int l, int r) {
	if (l >= r) return;
	P ret = partition(a, l, r);
    int left = ret.first, right = ret.second;
	qsort(a, l, left - 1);
	qsort(a, right + 1, r);
}
```

- $a[l...left - 1] < aim$
- $a[left...right] = aim$
- $a[right+ 1...r] > aim$

In [1]:
import subprocess
def run(method, workers, testSize, seed, compiled=True):
    if method not in ['naive', 'openmp', 'mpi']:
        raise ValueError(method)
    # print("run: method={}, workers={}, n={}, seed={}".format(method, workers, testSize, seed))
    script = './run-{}.sh'.format(method)
    cmd = [script, '-w {}'.format(workers), '-n {}'.format(testSize), '-s {}'.format(seed), ]
    if not compiled:
        cmd.append('-c')
    return float(subprocess.check_output(cmd))

In [2]:
run('naive', 2, 10000000, 4516)

5.188803

In [3]:
run('openmp', 2, 10000000, 4516)

3.395418

In [4]:
run("mpi", 2, 10000000, 4516)

3.239426

In [8]:
import math
import random
import plotly.plotly
import plotly.graph_objs as go
from plotly.offline import iplot, init_notebook_mode

init_notebook_mode(connected=True)

def trace1():
    testSize = int(5e6)
    seed = [i - 1 for i in range(20)]
    y = []
    for i in seed:
        timeInSec = run('naive', 1, testSize, i)
        y.append(timeInSec)

    return go.Scatter(x=seed, y=y, mode='lines+markers', name='fixed test size')

def trace2():
    seed = 123456
    y = []
    testSize = [1000, ]
    for i in range(16):
        testSize.append(testSize[-1] * 2)
    
    for i in testSize:
        timeInSec = run('naive', 1, i, seed)
        y.append(timeInSec)

    return go.Scatter(x=testSize, y=y, mode='lines+markers', name='fixed seed')

In [6]:
%%time
trace_naive_testsize = trace1()
trace_naive_seed = trace2()

CPU times: user 49.7 ms, sys: 110 ms, total: 159 ms
Wall time: 2min 3s


In [7]:
bar_trance = go.Bar(
    x=trace_naive_testsize.x,
    y=trace_naive_testsize.y,
    name='bar trance',
    marker=dict(
        color='rgb(158,202,225)',
        line=dict(
            color='rgb(8,48,107)',
            width=1.5),
        ),
    opacity=0.6
)

iplot({
    "data": [trace_naive_testsize, bar_trance],
    "layout": go.Layout(title="plot 1 of quick sort<br>fixed test size: 5M elements", 
                        xaxis=dict(title='seed'),
                        yaxis=dict(title='runtime(s)'),
                        showlegend=True,
                       )
})

def nlogn(val):
    return math.log(val) / math.log(2) * val

ratio_trace = go.Scatter(
    x=trace_naive_seed.x,
    y=[trace_naive_seed.y[i] / nlogn(trace_naive_seed.x[i]) for i in range(len(trace_naive_seed.x))],
    name='ratio',
    yaxis='y2'
)

iplot({
    "data": [trace_naive_seed, ratio_trace],
    "layout": go.Layout(title="plot 2 of quick sort<br>fixed seed: SEED = 123456",
                        xaxis=dict(title='test size'),
                        yaxis=dict(title='runtime(s)'),
                        yaxis2=dict(title='ratio', overlaying='y', side='right'),
                        showlegend=True,
                       )
})

# OpenMP

```cpp
void qsort(int a[], int l, int r) {
	if (l >= r) return;
	P ret = partition(a, l, r);
	qsort(a, l, ret.first - 1);
	qsort(a, ret.second + 1, r);
}
```

# OpenMP

```cpp
void qsort(int a[], int l, int r) {
	if (l >= r) return;
	P ret = partition(a, l, r);
	int left = ret.first, right = ret.second;
	{
		#pragma omp task firstprivate(a, l, left)
		{
			qsort(a, l, left - 1);
		}
		#pragma omp task firstprivate(a, right, r)
		{
			qsort(a, right + 1, r);
		}
	}
}

/////////////////

omp_set_num_threads(num_of_threads);

#pragma omp parallel shared(data, n)
{
    #pragma omp single nowait
    {
        qsort(data, 0, n - 1);
    }
}
```

### section | task

### firstprivate

### single nowait



In [8]:
def trace3():
    # x轴为线程数
    ret = []
    seed = 123456
    testSize = [int(1e4), ]
    for i in range(3):
        testSize.append(testSize[-1] * 2)
    testSize.append(int(1e7))
    for i in range(3):
        testSize.append(testSize[-1] * 2)
        
    threads = [i + 1 for i in range(20)]
    
    for n in testSize:
        y = []
        for thread in threads:
            timeInSec = run('openmp', thread, n, seed)
            y.append(timeInSec)
        ret.append(go.Scatter(x=threads, y=y, mode='lines+markers', name='{} items'.format(n)))
    return ret

def trace4():
    # 线程数量=4,8,16
    # seed=123456
    # 固定种子，增加测试集大小
    ret = []
    seed = 123456
    testSize = [1000, ]
    for i in range(16):
        testSize.append(testSize[-1] * 2)

    for thread in [1, 2, 3, 4, 5, 6]:
        y = []
        for n in testSize:
            timeInSec = run('openmp', thread, n, seed)
            y.append(timeInSec)
        ret.append(go.Scatter(x=testSize, y=y, mode='lines+markers', name='{} threads'.format(thread)))
    return ret

In [9]:
%%time
trace_openmp_testsize = trace3()
trace_openmp_seed = trace4()

CPU times: user 186 ms, sys: 754 ms, total: 940 ms
Wall time: 21min 1s


In [10]:
iplot({
    "data": trace_openmp_testsize[:4],
    "layout": go.Layout(title="plot 3.1 of quick sort<br>openMP<br>fixed small test size", 
                        xaxis=dict(title='number of threads'),
                        yaxis=dict(title='runtime(s)'),
                        showlegend=True,
                       )
})

iplot({
    "data": trace_openmp_testsize[4:],
    "layout": go.Layout(title="plot 3.2 of quick sort<br>openMP<br>fixed big test size", 
                        xaxis=dict(title='number of threads'),
                        yaxis=dict(title='runtime(s)'),
                        showlegend=True,
                       )
})

iplot({
    "data": trace_openmp_seed,
    "layout": go.Layout(title="plot 4 of quick sort<br>openMP<br>fixed seed: 123456", 
                        xaxis=dict(title='size of test'),
                        yaxis=dict(title='runtime(s)'),
                        showlegend=True,
                       )
})

# MPI

### 任务划分

### 数据划分

In [11]:
def gen_shapes(stx, sty, width, height, n, gapx=1):
    ret = []
    point_x = []
    point_y = []
    edy = sty + height
    for i in range(n):
        x = stx + i * (width + gapx)
        edx = x + width
        ret.append({
            'type': 'rect',
            'x0': x,
            'y0': sty,
            'x1': edx,
            'y1': edy,
            'line': {
                'color': 'rgba(128, 0, 128, 1)',
                'width': 2,
            },
            'fillcolor': 'gray',
            'opacity': 0.3,
        })
        point_x.append(x / 2 + edx / 2)
        point_y.append(sty / 2 + edy / 2)
    return ret, point_x, point_y

def gen_all_shapes(stx, sty, base_width, base_height, n, gapx, gapy):
    ret = []
    point_x = []
    point_y = []
    for i in range(n):
        l = 2 ** (n - i - 1)
        cnt = 2 ** i
        curx = stx
        cury = sty + i * (base_height + gapy)
        width = cnt * (base_width + gapx) - gapx
        shapes, px, py = gen_shapes(curx, cury, width, base_height, l, gapx)
        ret.extend(shapes)
        point_x.extend(px)
        point_y.extend(py)
    return ret, point_x, point_y

shapes, x, y = gen_all_shapes(1, 1, 3, 1, 4, 1, 1)
text = []
for i in range(4):
    l = 2 ** (3 - i)
    for j in range(l, l + l):
        text.append(j)

trace0 = go.Scatter(
    x=x,
    y=y,
    text=text,
    mode='text',
)
data = [trace0]

layout = {
    'xaxis': {
        'range': [0, 35],
        'showgrid': False,
        'zeroline': False,
        'showline': False,
        'ticks': '',
        'showticklabels': False,
    },
    'yaxis': {
        'range': [0, 10],
        'showgrid': False,
        'zeroline': False,
        'showline': False,
        'ticks': '',
        'showticklabels': False,
    },
    'shapes': shapes,
}
fig = {
    'data': data,
    'layout': layout,
}
iplot(fig)

In [12]:
trace1 = go.Scatter(
    x=x,
    y=y,
    text=[1,5,3,6,2,7,4,8,1,3,2,4,1,2,1],
    mode='text',
)
iplot({
    'data': [trace1],
    'layout': layout,
})

In [13]:
trace2 = go.Scatter(
    x=x,
    y=y,
    text=[1,'5=1+2^2',3,'7=3+2^2',2,'6=2+2^2',4,'8=4+2^2',1,'3=1+2^1',2,'4=2+2^1',1,'2=1+2^0',1],
    mode='text',
)
iplot({
    'data': [trace2],
    'layout': layout,
})

$$ RANK\_LSON = RANK $$
$$ RANK\_RSON = RANK + 2^{depth} $$

# MPI

```cpp
/* Skipped: Base sort and help func */
void mpi_sort(int a[], int l, int r, int id, int max_id, int deep) {
	MPI_Status status;
	int rson_id = id + pow(2, deep);
	if (rson_id > max_id) {
		qsort(a, l, r);
		return;
	}
	rson_id--;
	
	if (l > r) {
		MPI_Send(a + l, 0, MPI::INT, rson_id, 0, MPI_COMM_WORLD);
		MPI_Recv(a + l, 0, MPI::INT, rson_id, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
		return;
	}
	
	P ret = partition(a, l, r);
	int lsize = ret.first - l;
	int rsize = r - ret.second;
	if (lsize < rsize) {
		MPI_Send(a + l, lsize, MPI::INT, rson_id, l, MPI_COMM_WORLD);
		mpi_sort(a, ret.second + 1, r, id, max_id, deep + 1);
		MPI_Recv(a + l, lsize, MPI::INT, rson_id, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
	} else {
		MPI_Send(a + ret.second + 1, rsize, MPI::INT, rson_id, ret.second + 1, MPI_COMM_WORLD);
		mpi_sort(a, l, ret.first - 1, id, max_id, deep + 1);
		MPI_Recv(a + ret.second + 1, rsize, MPI::INT, rson_id, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
	}
}
```

# MPI

```cpp
int main(int argc, char *argv[]) {
    /* Skipped: input and initialisation*/
	myid++;
    // id start from 1
	int retcode = 0;
	if (myid == 1) {
		int n, seed;
		sscanf(argv[1], "%d", &n);
		sscanf(argv[2], "%d", &seed);
		int *data = (int*)malloc(sizeof(int)*n);
		srand(seed);
		for (int i = 0; i < n; i++)
			if (0 == seed) data[i] = 0;
			else if (-1 == seed) data[i] = n - i;
			else data[i] = rand() % n + 1;

		double tmp = MPI_Wtime();
		mpi_sort(data, 0, n - 1, myid, numprocs, 0);
		tmp = MPI_Wtime() - tmp;
		printf("%lf\n", tmp);
		retcode = validate(data, n);
		free(data);
	} else {
		int *subarray = NULL;
		MPI_Status status;
		int sub_size = 0;
		int index = 0;
		int parent_id = 0;
		while (pow(2, index) < myid) index++;
		MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
		MPI_Get_count(&status, MPI::INT, &sub_size);
		parent_id = status.MPI_SOURCE;
		subarray = (int*)malloc(sub_size * sizeof(int));
		
		MPI_Recv(subarray, sub_size, MPI::INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
		mpi_sort(subarray, 0, sub_size - 1, myid, numprocs, index);
		MPI_Send(subarray, sub_size, MPI::INT, parent_id, parent_id,MPI_COMM_WORLD);
		
		free(subarray);
	}
	MPI_Finalize();
	return retcode;
}
```

In [14]:
def trace5():
    # x轴为线程数
    ret = []
    seed = 123456
    testSize = [int(1e4), ]
    for i in range(3):
        testSize.append(testSize[-1] * 2)
    testSize.append(int(1e7))
    for i in range(3):
        testSize.append(testSize[-1] * 2)

    procs = [i + 1 for i in range(32)]
    
    for n in testSize:
        y = []
        for proc in procs:
            timeInSec = run('mpi', proc, n, seed)
            y.append(timeInSec)
        ret.append(go.Scatter(x=procs, y=y, mode='lines+markers', name='{} items'.format(n)))
    return ret

def trace6():
    # 线程数量=4,8,16
    # seed=123456
    # 固定种子，增加测试集大小
    ret = []
    seed = 123456
    testSize = [1000, ]
    for i in range(16):
        testSize.append(testSize[-1] * 2)

    for proc in [1, 2, 4, 8, 16]:
        y = []
        for n in testSize:
            timeInSec = run('mpi', proc, n, seed)
            y.append(timeInSec)
        ret.append(go.Scatter(x=testSize, y=y, mode='lines+markers', name='{} procs'.format(proc)))
    return ret

In [15]:
%%time
trace_mpi_testsize = trace5()

CPU times: user 135 ms, sys: 850 ms, total: 985 ms
Wall time: 40min 43s


In [16]:
%%time
trace_mpi_seed = trace6()

CPU times: user 70.2 ms, sys: 252 ms, total: 322 ms
Wall time: 4min 43s


In [18]:
iplot({
    "data": trace_mpi_testsize[:4],
    "layout": go.Layout(title="plot 5 of quick sort<br>mpi<br>fixed small test size", 
                        xaxis=dict(title='number of procs'),
                        yaxis=dict(title='runtime(s)'),
                        showlegend=True,
                       )
})

iplot({
    "data": trace_mpi_testsize[4:],
    "layout": go.Layout(title="plot 5 of quick sort<br>mpi<br>fixed big test size", 
                        xaxis=dict(title='number of procs'),
                        yaxis=dict(title='runtime(s)'),
                        showlegend=True,
                       )
})

iplot({
    "data": trace_mpi_seed,
    "layout": go.Layout(title="plot 6 of quick sort<br>mpi<br>fixed seed: 123456", 
                        xaxis=dict(title='size of test'),
                        yaxis=dict(title='runtime(s)'),
                        showlegend=True,
                       )
})

# 三种快排效率的对比

### 测试方案

左边纵轴为运行时间，右边纵轴为运行时间和串行快排运行时间之比

- 横轴为workers数量（对openmp来说是线程数，对mpi来说是进程数，取值1-20），seed固定为6666，测试集大小为1e4,2e4,1e7,2e7,

- 横轴为测试集大小（$1000 * 2^k, 0 <= k < 16$）,seed固定为6666，workers为1,2,4,8,16

一共9张图

In [10]:
def sol1(method, n):
    seed = 6666
    x = [i for i in range(1, 21)]
    y = []
    for worker in x:
        timeInSec = run(method, worker, n, seed)
        y.append(timeInSec)
    return go.Scatter(x=x, y=y, mode='lines+markers', name=method)

In [14]:
%%time
cnt = 0
for n in [1e4, 2e4, 1e7, 2e7]:
    n = int(n)
    cnt += 1
    naive_time_in_sec = run('naive', 1, n, 6666)
    trace_openmp = sol1('openmp', n)
    trace_mpi = sol1('mpi', n)
    trace_openmp_ratio = go.Scatter(
        x=trace_openmp.x,
        y=[trace_openmp.y[i] / naive_time_in_sec for i in range(len(trace_openmp.x))],
        name='openmp ratio',
        yaxis='y2'
    )
    trace_mpi_ratio = go.Scatter(
        x=trace_mpi.x,
        y=[trace_mpi.y[i] / naive_time_in_sec for i in range(len(trace_mpi.x))],
        name='mpi ratio',
        yaxis='y2'
    )
    iplot({
        "data": [trace_openmp, trace_mpi, trace_openmp_ratio, trace_mpi_ratio],
        "layout": go.Layout(title="plot 7-{} of quick sort<br>fixed seed: SEED = 6666<br>fixed size = {}".format(cnt, n),
                            xaxis=dict(title='workers'),
                            yaxis=dict(title='runtime(s)'),
                            yaxis2=dict(title='ratio', overlaying='y', side='right'),
                            showlegend=True,
                           )
    })

CPU times: user 431 ms, sys: 561 ms, total: 992 ms
Wall time: 7min 41s


In [15]:
def sol2(method, worker):
    seed = 6666
    x = [1000, ]
    for i in range(16):
        x.append(x[-1] * 2)
    y = []
    for n in x:
        timeInSec = run(method, worker, n, seed)
        y.append(timeInSec)
    return go.Scatter(x=x, y=y, mode='lines+markers', name=method)

In [17]:
%%time
naive_x = [1000, ]
for i in range(16):
    naive_x.append(naive_x[-1] * 2)
naive_y = []
for n in naive_x:
    timeInSec = run('naive', 1, n, 6666)
    naive_y.append(timeInSec)

CPU times: user 12.1 ms, sys: 52.8 ms, total: 64.9 ms
Wall time: 1min 14s


In [18]:
%%time
cnt = 0
for worker in [1, 2, 4, 8, 16]:
    cnt += 1
    trace_openmp = sol2('openmp', worker)
    trace_mpi = sol2('mpi', worker)
    trace_openmp_ratio = go.Scatter(
        x=trace_openmp.x,
        y=[trace_openmp.y[i] / naive_y[i] for i in range(len(trace_openmp.x))],
        name='openmp ratio',
        yaxis='y2'
    )
    trace_mpi_ratio = go.Scatter(
        x=trace_mpi.x,
        y=[trace_mpi.y[i] / naive_y[i] for i in range(len(trace_mpi.x))],
        name='mpi ratio',
        yaxis='y2'
    )
    iplot({
        "data": [trace_openmp, trace_mpi, trace_openmp_ratio, trace_mpi_ratio],
        "layout": go.Layout(title="plot 8-{} of quick sort<br>fixed seed: SEED = 6666<br>fixed worker = {}".format(cnt, worker),
                            xaxis=dict(title='size of test'),
                            yaxis=dict(title='runtime(s)'),
                            yaxis2=dict(title='ratio', overlaying='y', side='right'),
                            showlegend=True,
                           )
    })

CPU times: user 487 ms, sys: 588 ms, total: 1.07 s
Wall time: 9min 18s


# 总结

OpenMP的优点： 

- OpenMP相对于MPI而言更容易使用，对原始代码改动小
- 代码更容易理解和维护 

OpenMP的缺点：

- 使用多线程，所有线程共享内存空间，硬件制约较大
- 对递归的优化不明显

MPI的优点：

- 与OpenMP相比，在规模更大的情况下一般表现较好
- 编写代码时对细节控制力更强
- 使用多进程，能够充分利用单机的多核性能

MPI的缺点：

- 如果单机运行，多进程运行时进程开销和通信开销都比较大，在进程数量比较小或者数据集比较小的时候可能效率反而不如串行
- 多机运行时需要注意通信代价
- 对原始代码改动较大，相比openmp来说不太好维护，需要比较谨慎的考虑数据划分和任务划分