# Multi-Cores programming
# author :田奇 ID: SA16225271
# Asignment2

## 1. parallel prefix-sum
定义内部数据结构 TreeNode;
```java
public class TreeNode {
	int left, right;
	int sum;
	int fromleft;
	TreeNode leftChild, rightChild;
	
	TreeNode(int left, int right, int sum, int fromleft) {
		this.left = left;
		this.right = right;
		this.sum = sum;
		this.fromleft = fromleft;
		this.leftChild = null;
		this.rightChild = null;
	}
}

```
Bottom-Up 构建一颗Sum Tree;
```java
protected TreeNode compute() {
		if (high - low <= SEQUENTIAL_CUTOFF) {
			int sum = 0;
			for (int i = low; i < high; i++) {
				sum += input[i];
			}
			return new TreeNode(low, high, sum, 0);
		}
		else {
			int mid = (low+high)/2;
			TreeBuilder leftTask = new TreeBuilder(input, low, mid, SEQUENTIAL_CUTOFF);
			TreeBuilder rightTask = new TreeBuilder(input, mid, high, SEQUENTIAL_CUTOFF);
			leftTask.fork();
			TreeNode rightChild = rightTask.compute();
			TreeNode leftChild = (TreeNode) leftTask.join();
			TreeNode root = new TreeNode(low, high, leftChild.sum+rightChild.sum, 0);
			root.leftChild = leftChild;
			root.rightChild = rightChild;
			return root;
		}
```
Top-Down 更新 Sum Tree 的 fromLeft 数据域;
```java
@Override
	protected Object compute() {
		if (high - low <= SEQUENTIAL_CUTOFF) {
			output[low] = root.fromleft + input[low];
			for (int i = low+1; i < high; i++) {
				output[i] = output[i-1] + input[i];
			}
		}
		else {
			int mid = (low+high)/2;
			TreePrefix leftTask = new TreePrefix(root.leftChild, root.fromleft, input, output, low, mid, SEQUENTIAL_CUTOFF);
			TreePrefix rightTask = new TreePrefix(root.rightChild, root.fromleft+root.leftChild.sum, input, output, mid, high, SEQUENTIAL_CUTOFF);
			leftTask.fork();
			rightTask.compute();
			leftTask.join();
		}
		return null;
}
```
最终实现就是:
```java
public static int[] getPrefixSum(int[] input, int cutoff) {
		TreeNode root = TreeBuilder.sumTree(input, cutoff);
		return TreePrefix.prefixSum(root, input, cutoff);
	}
```

## 2. Parallel Monte PI
```java
public class MontePIParallel extends Thread{
	long left;
	long right;
	long sum = 0;
	MontePIParallel(long left, long right) {this.left = left; this.right = right;}
	@Override
	public void run() {
		double x, y;
		
		for (long i = left; i < right; i++) {
			x = Math.random();
			y = Math.random();
			if ((x * x + y * y) <= 1) {
				sum ++;
			}
		}
	}
	
	public static double getPI(long n, int threadSize) throws InterruptedException {
		MontePIParallel[] mpp = new MontePIParallel[threadSize];
		for (int i = 0; i < threadSize; i++) {
			mpp[i] = new MontePIParallel(n/threadSize*i, n/threadSize*(i+1));
			mpp[i].start();
		}
		
		int count = 0;
		for (int i = 0; i < threadSize; i++) {
			mpp[i].join();
			count += mpp[i].sum;
		}
		double PI = 4.0 * count / n;
		return PI;
	}
	
	public static double getPISerialize(long n) {
		double PI;
		double x,y;
		long count = 0;
		for (long i = 0; i < n; i++) {
			x = Math.random();
			y = Math.random();
			if ((x * x + y * y) <= 1) {
				count ++;
			}
		}
		PI = 4.0 * count / n;
		return PI;
	}
	
	public static void main(String[] args) throws InterruptedException {
		double PI;
		long n = 1L<<40;
		int threadSize = 1<<3;
		long start, end;
		start = System.currentTimeMillis();
		PI = MontePIParallel.getPISerialize(n);
		end = System.currentTimeMillis();
		System.out.println("Serialize PI: " + PI);
		System.out.println(end - start);
		
		start = System.currentTimeMillis();
		PI = MontePIParallel.getPI(n, threadSize);
		end = System.currentTimeMillis();
		System.out.println("Parallel PI: " + PI);
		System.out.println(end - start);
		
		
		
	}
}

```

## 3. Synchronized Block Vs java.util.concurrent.locks
同步块实现一个计数器;
```java
public class SYNCounter implements Counter{
	private long count = 0;
	SYNCounter(long count) {this.count = count;}
	public long getAndIncrement() {
		synchronized(this) {
			return this.count++;
		}
	}
}
```
采用Lock实现一个计数器;
```java
public class CASCounter implements Counter{
	private long count = 0;
	private Lock lock = new ReentrantLock();
	CASCounter(long count) {this.count = count;}
	public long getAndIncrement() {
		try {
			lock.lock();
			return count ++;
			
		} finally {
			lock.unlock();
		}
		
	}
}

```
并发版本的求素数.接口是 Counter, 可以传入不同的 Counter 实现来获得不同的性能.
```java
public void run() {
		long number;
		while ((number = counter.getAndIncrement()) <= SIZE) {
			if (isPrime(number)) {
				primes.add(number);
//				System.out.println(number);
			}
		}
	}
	
	public static boolean isPrime(long number) {
		if (number <= 1) return false;
		for (long i = 2; i <= (long)Math.sqrt(number); i++) {
			if (number % i == 0) return false;
		}
		return true;
	}
	
	public static List<Long> primeNumbers(Counter counter, int threads, long size) throws InterruptedException {
		List<Long> primes = new Vector<Long>();
		PrimeNumberConcurrency[] pns = new PrimeNumberConcurrency[threads];
		for (int i = 0; i < threads; i ++) {
			pns[i] = new PrimeNumberConcurrency(primes, counter, size);
			pns[i].start();
		}
		
		for (int i = 0; i < threads; i ++) {
			pns[i].join();
		}
		return primes;
	}
```


继续采用计算素数的并发版本, 但是使用不同的 Counter, 一个是JDK 并发包中使用 `ReentrantLock` 实现的 CASCounter, 另一个是 使用JVM原生关键字 `synchronized` 实现的 SYNCounter; 如果 size 比较小, 同步块更快, 但是当 size 很大的时候, 采用 ReentrantLock 的效果要比 synchronized block 要好. 但是由于 现代JVM 对 synchrinized 做了很多优化, 这种优势并不明显.

## 4. Parallel Merge Sort Time Complexity
### 相关概念
#### 并发执行环境的抽象DAG
我们可以把多线程计算抽象为一个DAG(Directed Acyclic Graph)模型. $G=(V,E)$. 如图是一个递归求解 Fib 程序的抽象 DAG.

顶点V中都是指令, 或者说非并行指令序列strands.

边E代表的是指令之间的依赖关系, $(u,v)\in E.$表示$u$ 必须要在 $v$ 之前执行.

 - Continuation Edges $(u, v)$ 是在顶点内部水平画出的,表示 $v$ 在 $u$ 后面串行执行.
 - Call Edges $(u, v)$ 指向下方,表示调用关系, $u$调用子过程$v$.
 - Spawn Edges $(u, v)$ 指向下方,表示的是 $u$ 生产出(Spawn)一个 $v$, 并且他和 $u$ 是并行执行的关系.
 - Return Edges 指向上, 表示接下来的strands执行需要首先从一个过程调用中返回,亦或是在并行结束后同步汇合.

#### Work
$T_1 = \text{算法在只有一个处理器时候的执行时间即完全串行时间}.$ 完全串行的执行时间就叫 work.

理想的P个处理器的并行计算机一次最多能够同时做 P 个单元的 work. 因此在 $T_P$ 时间内, 最多做的 work 量是 $PT_P$. 由于总的 work 是 $T_1$, 那么必须要有 $PT_P \geq T_1$才能够完成原来的串行任务. 那么我们得到一个原则定律 *work law*:
$$
T_P \geq \frac{T_1}{P}
$$

#### Span
$T_\infty = \text{算法在有无限个处理器的时候的执行时间}$. 就是假设有无穷个处理器的时候,算法最坏执行时间叫 span. 

span 其实就是有向无环图(DAG)中从起点到终点的最长的路径. 也即是并行算法必须依赖这样一个最长的路径才能执行完成.

*span law* 就是实际生产环境只可能是有限个处理器P,他的执行时间肯定比无穷个更慢, 那么必定有
$$
T_P \geq T_\infty
$$

#### Speedup
加速比 $\frac{T_1}{T_P}$ 就定义的是实际使用 P 个处理器的时候, 和使用一个处理器相比时间少了多少.

通过上述的 *work law* 可以知道, $\frac{T_1}{T_P} \leq P$.

需要特别注意的是, 并行只是提供常数上的加速, 不会出现指数级别或者数量级的加速.

>parallelism provides only constant time improvements (the constant being the number of processors) to any algorithm! Parallelism cannot move an algorithm from a higher to lower complexity class (e.g., exponential to polynomial, or quadratic to linear). Parallelism is not a silver bullet: good algorithm design and analysis is still needed.

#### Parallelism
并行计算复杂度分析, 我们一般使用 $\frac{T_1}{T_\infty}$即 work 和 span 的比值来表示并行计算的复杂度.
该公式可以从三个方面来理解:

 - Ratio : The average amount of work that can be performed for each step of parallel execution time.
 - Upper Bound : the maximum possible speedup that can be achieved on any number of processors.
 - Limit: The limit on the possibility of attaining perfect linear speedup. Once the number of processors exceeds the parallelism, the computation cannot possibly achieve perfect linear speedup. The more processors we use beyond parallelism, the less perfect the speedup.

#### MergeSort 并行分析
串行归并排序即只有一个处理器的时候, 其递推公式如下, 时间复杂度是 $O(nlogn)$;
$$
T_1(n) = 2T_1(n/2) + n \space \text{(work)}\tag{1}
$$
以下是串行归并排序的伪代码.
```java
Merge-Sort(A, l, r)
    if (l < r) {
        int mid = (l+r)/2
        Merge-Sort(A, l, mid)
        Merge-Sort(A, mid+1, r)
        Merge(A, l, mid, r)
    }

Merge(A, l, mid, r)
    left = A[l...mid] + sentinel
    right = A[mid+1...r] + sentinel
    i = 0
    j = 0
    k = 0
    while (i < left.length || j < right.length) {
        if (left[i] < right[j]) A[k++] = left[i++];
        else A[k++] = right[j++];
    }
```
当采用 Divide-And-Conquer 对左右两部分的归并进行并行时(假设是无穷个处理器), 但是合并过程还是 $O(n)$; 最后的时间复杂度是$O(n)$
$$
T_\infty(n) = T_\infty(n/2) + n \space \text{(span)}\tag{2}
$$
```java
Merge-Sort(A, l, r)
    if (l < r) {
        int mid = (l+r)/2
        Spawn Merge-Sort(A, l, mid)
        Merge-Sort(A, mid+1, r)
        Sync
        Merge(A, l, mid, r)
    }
    
```
从(1) (2) 得到的并行度是 $O(\frac{nlogn}{n})=O(logn)$.

如果我们进一步对合并过程做并行操作, 采用左右子数组二分查找的思想,可以将合并过程变为 $O(logn)$. 方法就是对已经排好序的左右两部分子数组, 首先找到较长的左半部分子数组$L$的中间值x, O(1) 时间即可完成, 然后再看x 在右半部分子数组$R$该插入的位置p, 采用BinarySearch O(logn) 时间即可完成. 这样左半部分就分成了 $L_1$ 和 $L_2$ 两部分, 分别是比x小的和比x大的. 同样, 右半部分子数组分成 $R_1$ 和 $R_2$ 分别是比x小和比x大的.那么这就是类似于快速排序, 将 x 复制到原来数组中应该在的位置(L1+R1+x+L2+R2), 这里的排序都不是  in-place 的, 其中有一个复制数组的过程, 如果这个过程使用 for 循环, 那么时间复杂度可以认为是 O(n) 了, 因此子数组复制过程我们直接使用操作系统基于内存块的复制`System.arraycopy`(也可以使用 `Arrays.copyOf`, 他的内部实际调用的就是 `System.arraycopy`), 不使用 for 循环进行复制.

```java

// 每一层Merge内部都申请新空间讲A复制一份temp,然后排序过程中将temp合并写会A

Parallel-Merge-Sort(A, l, r)
    if (l < r) {
        int mid = (l+r)/2
        Spawn Parallel-Merge-Sort(A, l, mid)
        Parallel-Merge-Sort(A, mid+1, r)
        Sync
        T = Arrays.copyOf(A)
        Parallel-Merge(T, l, mid, mid+1, r, A, l)
    }
    
Parallel-Merge(T, l1, r1, l2, r2, A, pos)
    n1 = r1 - l1 + 1
    n2 = r2 - l2 + 1
    if n1 < n2
        swap(l1, l2)
        swap(r1, r2)
        swap(n1, n2)
    if n1 == 0
        return
    else 
        x = (l1+r1)/2
        y = BinarySearch(T, l2, r2, T[x])
        len = x - l1 + y - l2
        A[pos+len] = T[x]
        Spawn Parallel-Merge(T, l1, x-1, l2, y-1, A, pos)
        Parallel-Merge(T, x+1, r1, y, r2, A, pos+len+1)
        Sync  
```

以下是并行进行 merge() 操作的递推公式;
$$
T^{merge}_\infty(n) = T^{merge}_\infty(n/2) + logn \space \text{(span)}\tag{3}
$$
对于(3), 我们可以做如下推导:


\begin{equation} \nonumber
\begin{split}
T^{merge}_\infty(n) &= T^{merge}_\infty(\frac{n}{2^1}) + log(\frac{n}{2^0})\\
     &= T^{merge}_\infty(\frac{n}{2^2}) + log(\frac{n}{2^1}) + log(\frac{n}{2^0})\\
     &= T^{merge}_\infty(\frac{n}{2^3}) + log(\frac{n}{2^2}) + log(\frac{n}{2^1}) + log(\frac{n}{2^0})\\
     &= \cdots \\
     &= T^{merge}_\infty(\frac{n}{2^k}) + log(\frac{n}{2^{k-1}}) + \cdots + log(\frac{n}{2^1}) + log(\frac{n}{2^0})\\
     &= 1 + log(\frac{n}{2^{k-1}}) + \cdots + log(\frac{n}{2^1}) + log(\frac{n}{2^0})
\end{split}
\end{equation}

其中
$$
\frac{n}{2^k} = 1; k = logn;
$$

最后可知
\begin{equation}\nonumber
\begin{split}
T^{merge}_\infty(n) &= log(\frac{n}{2^{0}}\frac{n}{2^{1}}\frac{n}{2^{2}}\cdots\frac{n}{2^{k-1}})\\
     &= log(\frac{n^k}{2^{\frac{(k)(k-1)}{2}}})\\
     &= klogn + \frac{k(k-1)}{2}\\
     &= O(log^2n)
\end{split}
\end{equation}
这样一来, 新的并行归并排序的时间复杂度递推公式就是:
$$
T_\infty(n) = T_\infty(n/2) + log^2n \space \text{(span)}\tag{4}
$$
对于(4)式子, 我们可以使用递推出其时间复杂度是 O(log^3n).
最后得到并行计算复杂度是$O(\frac{nlogn}{log^3n})=O(\frac{n}{log^2n})$

### Java 实现
```java
@Override
	public void run() {
		if (r - l < CUTOFF) {
			Arrays.sort(A, l, r+1);
			return ;
		}
		else {
			int mid = (l + r) / 2;
			ParallelMergeSort leftMergeSort = new ParallelMergeSort(A, l, mid, CUTOFF);
			ParallelMergeSort rightMergeSort = new ParallelMergeSort(A, mid+1, r, CUTOFF);
			leftMergeSort.start();
			rightMergeSort.start();
			try {
				leftMergeSort.join();
				rightMergeSort.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			int[] T = Arrays.copyOf(A, A.length);
			ParallelMerge pm = new ParallelMerge(T, l, mid, mid+1, r, A, l);
			pm.run();
		}
	}
	
	
class ParallelMerge extends Thread {
	int[] A;
	int[] T;
	int l1,r1,l2,r2,pos;
	int CUTOFF;
	
	public ParallelMerge(int[] T, int l1, int r1, int l2, int r2, int[] A, int pos, int cutoff) {
		this.A = A;
		this.T = T;
		this.l1 = l1;
		this.r1 = r1;
		this.l2 = l2;
		this.r2 = r2;
		this.pos = pos;
		this.CUTOFF = cutoff;
	}
	
	@Override
	public void run() {
		int n1 = r1 - l1 + 1;
		int n2 = r2 - l2 + 1;
		if (n1 < n2){
			n1 = n1 ^ n2; n2 = n1 ^ n2; n1 = n1 ^ n2;
			r1 = r1 ^ r2; r2 = r1 ^ r2; r1 = r1 ^ r2;
			l1 = l1 ^ l2; l2 = l1 ^ l2; l1 = l1 ^ l2;
		}
		
		if (n1 == 0) {
			return;
		}
		if (n1 + n2 < CUTOFF) {
			int k = 0;
			int i = l1, j = l2;
			while (i < r1 || j < r2) {
				if (i == r1) {
					while (j < r2) {
						A[pos+k++] = T[j++];
					}
					break;
				}
				if (j == r2) {
					while (i < r1) {
						A[pos+k++] = T[i++];
					}
					break;
				}
				if (T[i] < T[j]) {
					A[pos+k++] = T[i++];
				}
				else {
					A[pos+k++] = T[j++];
				}
			}
		}
		else {
			int x = (l1 + r1) / 2;
			int y = Arrays.binarySearch(T, l2, r2+1, T[x]);// return  (-(insertion point) - 1) if no key else >=0
			y = y >= 0 ? y : -(y+1); // 插入点
			int len = x - l1 + y - l2;
			A[pos+len] = T[x];
			ParallelMerge pm1 = new ParallelMerge(T, l1, x-1, l2, y-1, A, pos, CUTOFF);
			ParallelMerge pm2 = new ParallelMerge(T, x+1, r1, y, r2, A, pos+len+1, CUTOFF);
			pm1.start();
			pm2.start();
			try {
				pm1.join();
				pm2.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			
		}
	}
}
```

## 5. Mutual and DeadLock Ananysis
1. 协议满足互斥.
证明如下:
从代码中可以看出有如下正常语序:
$$
Write_A[turn=A] -> Read_A[busy=false] -> 
\\ Write_A[busy=true] -> Read_A[turn=A] -> CS_A
$$
$$
Write_B[turn=B] -> Read_B[busy=false] ->
\\ Write_B[busy=true] -> Read_B[turn=B] -> CS_B
$$
假设该协议不满足互斥条件, 也就是说存在 $CS_A$ 和 $CS_ B$ 都在执行临界区代码, 那么可以得出的是
$$
Read_A[turn=A] -> CS_A
\\ Write_B[turn=B] -> Read_B[busy=false] -> 
\\ Write_B[busy=true] -> Read_B[turn=B] -> CS_B
$$
但是可以看出, $CS_A$ 在执行的时候, busy 不可能是 false 的. 也就不可能发生同时进入临界区了. 因此该协议是满足互斥条件的.

2. 不满足死锁和饥饿.
因为可以存在这样的竞争序列, 导致双方死循环, 永远也无法进入临界区.
$$
Read_A[turn=A] -> Write_B[turn=B] -> Read_B[turn=B] -> Write_A[turn=A]
$$