In [None]:
from csc4585 import *

In [None]:
%%java
import java.util.List;
import java.util.ArrayList;

public abstract class ParallelRun {
    public final int num_threads;
    
    List<Thread> threads = new ArrayList<>();
    public ParallelRun(int num_threads) {
        this.num_threads = num_threads;
    }
    public void run() {
        long t1 = System.currentTimeMillis();
        final ParallelRun pr = this;
        for(int t=0;t < num_threads;t ++) {
            final int thread_num = t;
            Thread thread = new Thread() {
                public void run() {
                    pr.run(thread_num);
                }
            };
            threads.add(thread);
        }
        for(Thread thread : threads) {
            thread.start();
        }
        for(Thread thread : threads) {
            try {
                thread.join();
            } catch(InterruptedException ie) {}
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("Total time = %.2f seconds%n", 0.001*(t2-t1));
    }
    
    public abstract void run(int thread);
}

In [None]:
%%java
public class ParallelFibPrint extends ParallelRun {
    public final int nmax;
    public ParallelFibPrint(int n,int p) {
        super(p);
        this.nmax = n;
    }
    public void run(int thread) {
        final int delta = nmax/num_threads;
        final int nlo = thread*delta;
        final int nhi = thread+1==num_threads ? nmax : (thread+1)*delta-1;
        for(int n=nlo;n <= nhi; n++) {
            System.out.printf("%d: fib(%d)=%d%n", thread, n, fib(n));
        }
    }
    public int fib(int n) {
        if(n < 2) return n;
        return fib(n-1)+fib(n-2);
    }
    
    public static void main(String[] args) {
        ParallelFibPrint pfp = new ParallelFibPrint(40,5);
        pfp.run();
    }
}

In [None]:
%%java
import java.util.concurrent.atomic.AtomicInteger;

public class ParallelFibPrint extends ParallelRun {
    AtomicInteger counter = new AtomicInteger(0);
    public final int nmax;
    public ParallelFibPrint(int n,int p) {
        super(p);
        this.nmax = n;
    }
    public void run(int thread) {
        while(true) {
            int n = counter.getAndIncrement();
            if(n > nmax)
                break;
            System.out.printf("%d: fib(%d)=%d%n", thread, n, fib(n));
        }
    }
    public int fib(int n) {
        if(n < 2) return n;
        return fib(n-1)+fib(n-2);
    }
    
    public static void main(String[] args) {
        ParallelFibPrint pfp = new ParallelFibPrint(40,5);
        pfp.run();
    }
}

In [None]:
# Amdahl's Law
import matplotlib.pyplot as plt
import numpy as np

# create a variable n to represent the number of processors.
# It starts at 1, goes to 100, and has 100 points.
n = np.linspace(1,100,100)

# p is the fraction of the code that's parallel
p = .9
plt.plot(n,1/(1-p+p/n))

In [None]:
%%java
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.List;
import java.util.ArrayList;
import java.util.Random;

public class LockedQueue<T> {
    public static Random RAND = new Random();
    
    Lock lock = new ReentrantLock();
    Condition notFull = lock.newCondition();
    Condition notEmpty = lock.newCondition();
    T[] items;
    int count;
    
    public LockedQueue(int capacity) {
        items = (T[])new Object[capacity];
    }
    
    public void enq(T x) {
        lock.lock();
        try {
            while(count == items.length) {
                System.out.println("notFull.await()");
                try {
                    notFull.await();
                } catch(InterruptedException ie) {}
            }
            items[count++] = x;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }
    
    public T deq() {
        lock.lock();
        try {
            while(count == 0) {
                System.out.println("notEmpty.await()");
                try {
                    notEmpty.await();
                } catch(InterruptedException ie) {}
            }
            T result = items[--count];
            notFull.signal();
            return result;
        } finally {
            lock.unlock();
        }
    }
    
    public static void main(String[] args) {
        int capacity = 3;
        final LockedQueue<Integer> lq = new LockedQueue<>(capacity);
        for(int i=0;i<capacity;i++) {
            lq.enq(i);
        }
        List<Thread> li = new ArrayList<>();
        for(int i=0;i<6;i++) {
            final int n = i;
            Thread t = new Thread() {
                public void run() {
                    for(int k=0;k<20;k++) {
                        if(n % 2 == 0) {
                            lq.enq(RAND.nextInt(1000));
                        } else {
                            int r = lq.deq();
                            System.out.printf("%d%n", r);
                        }
                    }
                }
            };
            t.start();
        }
    }
}

In [None]:
%%java
public interface BaseLock {
    public void lock();
    public void unlock();
}

In [None]:
%%java
import java.util.concurrent.locks.Lock;

public interface ReadWriteLock {
        BaseLock readLock();
        BaseLock writeLock();
}

In [None]:
%%java
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;

public class SimpleReadWriteLock implements ReadWriteLock {
    int readers;
    boolean writer;
    Lock lock;
    BaseLock readLock, writeLock;
    Condition condition;
    public SimpleReadWriteLock() {
        writer = false;
        readers = 0;
        lock = new ReentrantLock();
        readLock = new ReadLock();
        writeLock = new WriteLock();
        condition = lock.newCondition();
    }
    
    public BaseLock readLock() {
        return readLock;
    }
    
    public BaseLock writeLock() {
        return writeLock;
    }
    
    class ReadLock implements BaseLock {
        public void lock() {
            lock.lock();
            try {
              while (writer) {
                  try {
                      condition.await();
                  } catch(InterruptedException ie) {}
              }
              readers++;
            } finally {
              lock.unlock();
            }
        }
        public void unlock() {
          lock.lock();
          try {
            readers--;
            if (readers == 0) {
              condition.signalAll();
            }
          } finally {
            lock.unlock();
          }
        }
    }
    private class WriteLock implements BaseLock {
        public void lock() {
            lock.lock();
            try {
              while (readers > 0 || writer) {
                try {
                    condition.await();
                } catch(InterruptedException ie) {}
              }
              writer = true;
            } finally {
              lock.unlock();
            }
        }
        public void unlock() {
          lock.lock();
          try {
              writer = false;
              condition.signalAll();
          } finally {
              lock.unlock();
          }
        }
    }
}


In [None]:
%%java
public class TestReadWriteLock {
    public static void sleep() {
        try {
            Thread.sleep(5);
        } catch(InterruptedException ie) {}
    }
    public static void main(String[] args) {
        final SimpleReadWriteLock rwlock = new SimpleReadWriteLock();
        Thread t = new Thread() {
            public void run() {
                BaseLock writeLock = rwlock.writeLock();
                try {
                    TestReadWriteLock.sleep();
                    writeLock.lock();
                    System.out.println("Write Lock");
                    TestReadWriteLock.sleep();
                } finally {
                    writeLock.unlock();
                }
            }
        };
        t.start();
        for(int i=0;i<10;i++) {
            t = new Thread() {
                public void run() {
                    BaseLock readLock = rwlock.readLock();
                    try {
                        readLock.lock();
                        System.out.println("Read Lock");
                        TestReadWriteLock.sleep();
                    } finally {
                        readLock.unlock();
                    }
                }
            };
            t.start();
        }
    }
}