# Psychoinformatics - Week 17 (Examples)
by Tsung-Ren (Tren) Huang (trhuang@g.ntu.edu.tw)

In [1]:
%config IPCompleter.greedy=True 
#%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import IPython.display as ipd

## 1 Performance Profiling (of CPU & RAM)
To profile your programs outside Jupyter Notebook, popular choices are <a href="https://docs.python.org/3.2/library/profile.html">cProfile</a> & <a href="https://pypi.org/project/perf/">perf</a>.

### 1.0 Testing Materials

In [2]:
import time
s=0 # global sum

def job(t):
    global s
    s+=1
    print('Job ',t,s)
    tmp=range(t)
    time.sleep(t)  # wait for "t" seconds
    return t

### 1.1 CPU profiling (time/timeit/prun/lprun)

In [3]:
%time job(1)

Job  1 1
CPU times: user 2.76 ms, sys: 2.17 ms, total: 4.92 ms
Wall time: 1.01 s


1

In [4]:
%%time 
print('Profiling the whole cell')
for i in range(5):
    job(i)

Profiling the whole cell
Job  0 2
Job  1 3
Job  2 4
Job  3 5
Job  4 6
CPU times: user 9.16 ms, sys: 3.64 ms, total: 12.8 ms
Wall time: 10 s


In [5]:
%timeit job(1)

Job  1 7
Job  1 8
Job  1 9
Job  1 10
Job  1 11
Job  1 12
Job  1 13
Job  1 14
1 s ± 1.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [6]:
%%prun 
for i in range(5):
    print(job(i)*2)

Job  0 7
0
Job  1 8
2
Job  2 9
4
Job  3 10
6
Job  4 11
8
 

In [7]:
#!conda install line_profiler
%load_ext line_profiler
%lprun -f job job(3)

Job  3 12


### 1.2 Memory Profiling (memit/mprun)

In [8]:
#!pip install memory_profiler
%load_ext memory_profiler

In [9]:
#del a
%memit a=list(range(100000))
del a
%memit a=range(100000)
del a 
%memit a=np.arange(100000)

peak memory: 81.59 MiB, increment: 7.94 MiB
peak memory: 78.33 MiB, increment: 0.00 MiB
peak memory: 78.34 MiB, increment: 0.01 MiB


In [10]:
%%file test.py
import time
def new_job(t):
    print('Job ',t)
    tmp=range(t)
    time.sleep(t)  # wait for "t" seconds
    return t

Overwriting test.py


In [11]:
from test import new_job
%mprun -f new_job new_job(3)

Job  3



## 2 Asynchronous Computing (require Python 3.7+)
Python is still working on asyncio. You should always check out the <a href="https://docs.python.org/3/library/asyncio.html">latest documents</a>.

### 2.0 asyncio

In [12]:
#!conda install -c anaconda python=3.7
# or reinstall Anaconda from https://www.anaconda.com/download/

In [13]:
%%file test.py
import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main1(): # wait for every task to complete:
    t0=time.time()
    await asyncio.gather(say_after(1,'hello'),say_after(2,'world'))
    #await asyncio.gather(*[say_after(1,'hello'),say_after(2,'world')])
    print(time.time()-t0)
    
async def main2():
    t0=time.time() # wait for a particular task to complete:
    task1=asyncio.create_task(say_after(1,'hello'))
    task2=asyncio.create_task(say_after(2,'world'))
    #await task1
    print(time.time()-t0)
    
asyncio.run(main1()) # switch between main1() & main2()

Overwriting test.py


In [14]:
!python test.py

hello
world
2.00559401512146


### 2.1 Synchronous Crawlers

In [15]:
import urllib
def ptt(page):
    print(page,end=' ')
    u='http://www.ptt.cc/bbs/boy-girl/index'+str(page)+'.html' 
    r=urllib.request.Request(u,headers={'User-Agent':''})
    return urllib.request.urlopen(r).read().decode('utf-8')

In [16]:
%%time
for i in range(1,11): 
    text=ptt(i)
    #print(text) # it does get stuff!

1 2 3 4 5 6 7 8 9 10 CPU times: user 248 ms, sys: 31.8 ms, total: 280 ms
Wall time: 1.47 s


### 2.1 Asynchronous Crawlers

In [17]:
%%file test.py

import aiohttp,asyncio,time

async def ptt(session,page):
    print(page,end=' ')
    URI='http://www.ptt.cc/bbs/boy-girl/index'+str(page)+'.html' 
    response = await session.get(URI) # wait & switch
    return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        all_texts = await asyncio.gather(*[ptt(session,i) for i in range(1,11)])
        #print(all_texts) # it does get stuff!

t0=time.time()
asyncio.run(main())
print(time.time()-t0)

Overwriting test.py


In [18]:
!python test.py

1 2 3 4 5 6 7 8 9 10 0.17531275749206543


## 3 Parallel Computing (w/ multiple threads or processes)

In [2]:
# Number of threads NumPy can utilize
!pip3 install mkl
import mkl.service
mkl.service.get_max_threads()

Collecting mkl
[?25l  Downloading https://files.pythonhosted.org/packages/9b/98/c892b77b755cb0c53491eabc88c49451a92e36fa5c5baf578e77b91ee31d/mkl-2019.0-py2.py3-none-manylinux1_x86_64.whl (261.0MB)
[K    100% |████████████████████████████████| 261.0MB 171kB/s ta 0:00:01    38% |████████████▍                   | 101.3MB 132kB/s eta 0:20:08    53% |█████████████████▏              | 140.2MB 302kB/s eta 0:06:39    83% |██████████████████████████▊     | 218.2MB 403kB/s eta 0:01:47
[?25hCollecting intel-openmp (from mkl)
[?25l  Downloading https://files.pythonhosted.org/packages/ea/f0/b1bc380d90241ab37149ed526a2ae585f6780ba616bd670b57f0559be37b/intel_openmp-2019.0-py2.py3-none-manylinux1_x86_64.whl (729kB)
[K    100% |████████████████████████████████| 737kB 267kB/s ta 0:00:01
[?25hInstalling collected packages: intel-openmp, mkl
Successfully installed intel-openmp-2019.0 mkl-2019.0
[33mYou are using pip version 18.0, however version 18.1 is available.
You should consider upgrading via 

ModuleNotFoundError: No module named 'mkl'

### 3.0 Sequential Commands (filter/map/reduce)

In [20]:
from functools import reduce

def f1(x):
    return x if x > 5 else None

def f2(x,y):
    return x+y

a=list(range(1,11))
b=filter(f1,a)
c=map(f1,a)
d=reduce(f2,a)

print(list(b),list(c),d)

[6, 7, 8, 9, 10] [None, None, None, None, None, 6, 7, 8, 9, 10] 55


### 3.1 Multiple Threads (w/ shared memory)

In [21]:
import concurrent.futures as cf

s=0 # global sum

def job(t):
    global s
    s+=1
    print('Job ',t,s)
    tmp=range(t)
    time.sleep(t)  # wait for "t" seconds
    return t

In [22]:
%%time
with cf.ThreadPoolExecutor(max_workers=2) as pool:
    results=pool.map(job,range(1,5))
print(list(results))

Job  1 1
Job  2 2
Job  3 3
Job  4 4
[1, 2, 3, 4]
CPU times: user 9.46 ms, sys: 4.76 ms, total: 14.2 ms
Wall time: 6.01 s


### 3.2 Multiple Processes (w/ distributed memory)

In [23]:
%%time
# Different processes don't share memory
with cf.ProcessPoolExecutor(max_workers=4) as pool:
    results=pool.map(job,range(1,5))
print(list(results))

Job  1 5
Job  3 5
Job  2 5
Job  4 5
[1, 2, 3, 4]
CPU times: user 20.7 ms, sys: 28.9 ms, total: 49.5 ms
Wall time: 4.07 s


In [24]:
# Testing Queue for memory-sharing:
from multiprocessing import Pool, Queue
queue = Queue()
queue.put('abc')
msg=queue.get()    
#msg=queue.get()    
print(msg)

abc


In [25]:
# Revise the job function to see a shared queue:
def job2(tq): #ts=(t,q)
    s=tq[1].get()+1 # get & update the sum from the queue
    tq[1].put(s) # put the sum back to the queue
    print('Job ',tq[0],s)
    time.sleep(tq[0])  # wait for "t" seconds
    return tq[0]

In [26]:
# Testing a shared queue:
q=Queue()
q.put(0)
job2((1,q))
job2((2,q))
[(i,q) for i in range(1,5)]

Job  1 1
Job  2 2


[(1, <multiprocessing.queues.Queue at 0x10e8fe6d8>),
 (2, <multiprocessing.queues.Queue at 0x10e8fe6d8>),
 (3, <multiprocessing.queues.Queue at 0x10e8fe6d8>),
 (4, <multiprocessing.queues.Queue at 0x10e8fe6d8>)]

In [27]:
%%time
# Multiple processes w/ a shared queue:
import multiprocessing
m=multiprocessing.Manager()
q=m.Queue()
q.put(0)
pool=Pool(processes=4)  
results=pool.map(job2,[(i,q) for i in range(1,5)])
print(list(results))

Job  2 1
Job  3 2
Job  1 3
Job  4 4
[1, 2, 3, 4]
CPU times: user 30.5 ms, sys: 33.8 ms, total: 64.3 ms
Wall time: 4.12 s


### 3.2 GPU Computing
Because Tensorflow does not support Python 3.7, we used <a href="https://pytorch.org/">PyTorch</a> instead.
You can check <a href="https://github.com/wkentaro/pytorch-for-numpy-users">"PyTorch for NumPy users"</a> for a quick introduction.

In [28]:
%%time
m=np.random.rand(4096,2160) #4K video
for i in range(120): #120 frames
    m/=2

CPU times: user 3.12 s, sys: 55.9 ms, total: 3.17 s
Wall time: 3.24 s


In [29]:
import torch as t
t.backends.cudnn.benchmark=True

In [30]:
%%time
#m=t.rand(4096,2160).cuda() # if you have a NVIDIA gpu
m=t.rand(4096,2160)
for i in range(120): #120 frames
    m=t.div(m,2)

CPU times: user 1.02 s, sys: 388 ms, total: 1.4 s
Wall time: 1.41 s


## 4 Distributed Computing (w/ PySpark)
You can read <a href="https://spark.apache.org/docs/latest/quick-start.html">this</a> for a quick introduction to Spark.

### 4.0 Spark Basics

In [31]:
import pyspark
sc=pyspark.SparkContext()
sc.getConf().getAll()

[('spark.rdd.compress', 'True'),
 ('spark.driver.host', '192.168.0.14'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.id', 'local-1546449837183'),
 ('spark.master', 'local[*]'),
 ('spark.driver.port', '49756'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell')]

In [32]:
rdd=sc.parallelize(range(10))
rdd.filter(lambda x:x>5).collect()
print(rdd.sum(),rdd.stats())
rdd.reduce(lambda x,y:x+y)

import math
rdd2=rdd.map(lambda x:math.sqrt(x)*10)
rdd2.collect() #collected to local memory

45 (count: 10, mean: 4.5, stdev: 2.8722813232690143, max: 9.0, min: 0.0)


[0.0,
 10.0,
 14.142135623730951,
 17.32050807568877,
 20.0,
 22.360679774997898,
 24.49489742783178,
 26.457513110645905,
 28.284271247461902,
 30.0]

### 4.1 Estimation of Pi (map)

In [33]:
sc.stop()
sc=pyspark.SparkContext(master='local[2]') # try "local"
sc.getConf().getAll()

[('spark.rdd.compress', 'True'),
 ('spark.driver.host', '192.168.0.14'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.port', '49791'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1546449841250'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.master', 'local[2]'),
 ('spark.app.name', 'pyspark-shell')]

In [34]:
import time, numpy as np

NUM_SAMPLES=1000000

def inside1(p):
    x, y = np.random.random(), np.random.random()
    return 1 if x*x + y*y < 1 else 0

def inside2(p):
    x, y = np.random.random(), np.random.random()
    return x*x + y*y < 1

t0=time.time()
count = sc.parallelize(range(0, NUM_SAMPLES)).map(inside1).sum()
print(time.time()-t0,"Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))

t0=time.time()
count = sc.parallelize(range(0, NUM_SAMPLES)).filter(inside2).count()
print(time.time()-t0,"Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))

1.4918570518493652 Pi is roughly 3.140352
0.9012300968170166 Pi is roughly 3.140352


### 4.2 Word Counts (map+reduce)

In [35]:
import urllib.request
urllib.request.urlretrieve("http://ceiba.ntu.edu.tw/course/4671ea/content/alice_in_wonderland.txt","alice.txt")
text_file = sc.textFile("alice.txt")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
print(counts.collect())
#counts.saveAsTextFile("alice_results.txt")



### 4.3 Spark SQL & DataFrame
See more info <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame">here</a>.

In [36]:
import pandas
from sklearn import datasets
iris = datasets.load_iris()
features = pandas.DataFrame(iris.data, columns = iris.feature_names)
targets = pandas.DataFrame(iris.target, columns = ['Species'])
merged = pandas.concat([features, targets], axis = 1)
print(merged)

     sepal length (cm)  sepal width (cm)  petal length (cm)  petal width (cm)  \
0                  5.1               3.5                1.4               0.2   
1                  4.9               3.0                1.4               0.2   
2                  4.7               3.2                1.3               0.2   
3                  4.6               3.1                1.5               0.2   
4                  5.0               3.6                1.4               0.2   
5                  5.4               3.9                1.7               0.4   
6                  4.6               3.4                1.4               0.3   
7                  5.0               3.4                1.5               0.2   
8                  4.4               2.9                1.4               0.2   
9                  4.9               3.1                1.5               0.1   
10                 5.4               3.7                1.5               0.2   
11                 4.8      

In [37]:
# Prepare Spark Dataframe:
from pyspark.sql import SQLContext
from pyspark.ml.feature import RFormula

sqlContext = SQLContext(sc)
raw_df = sqlContext.createDataFrame(merged) # Spark Dataframe
raw_df.show(5)
fomula = RFormula(formula = 'Species ~ .')
raw_df = fomula.fit(raw_df).transform(raw_df)
raw_df.show(5)
train_df, test_df = raw_df.randomSplit([0.8, 0.2])

+-----------------+----------------+-----------------+----------------+-------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|Species|
+-----------------+----------------+-----------------+----------------+-------+
|              5.1|             3.5|              1.4|             0.2|      0|
|              4.9|             3.0|              1.4|             0.2|      0|
|              4.7|             3.2|              1.3|             0.2|      0|
|              4.6|             3.1|              1.5|             0.2|      0|
|              5.0|             3.6|              1.4|             0.2|      0|
+-----------------+----------------+-----------------+----------------+-------+
only showing top 5 rows

+-----------------+----------------+-----------------+----------------+-------+-----------------+-----+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|Species|         features|label|
+-----------------+----------------+-----------

### 4.4 Spark MLlib

### 4.4.1 Unsupervised Learning

In [38]:
from pyspark.mllib.clustering import *
print(iris)
rdd=sc.parallelize(iris.data)
model=KMeans.train(rdd,3)
model.clusterCenters

{'data': array([[5.1, 3.5, 1.4, 0.2],
       [4.9, 3. , 1.4, 0.2],
       [4.7, 3.2, 1.3, 0.2],
       [4.6, 3.1, 1.5, 0.2],
       [5. , 3.6, 1.4, 0.2],
       [5.4, 3.9, 1.7, 0.4],
       [4.6, 3.4, 1.4, 0.3],
       [5. , 3.4, 1.5, 0.2],
       [4.4, 2.9, 1.4, 0.2],
       [4.9, 3.1, 1.5, 0.1],
       [5.4, 3.7, 1.5, 0.2],
       [4.8, 3.4, 1.6, 0.2],
       [4.8, 3. , 1.4, 0.1],
       [4.3, 3. , 1.1, 0.1],
       [5.8, 4. , 1.2, 0.2],
       [5.7, 4.4, 1.5, 0.4],
       [5.4, 3.9, 1.3, 0.4],
       [5.1, 3.5, 1.4, 0.3],
       [5.7, 3.8, 1.7, 0.3],
       [5.1, 3.8, 1.5, 0.3],
       [5.4, 3.4, 1.7, 0.2],
       [5.1, 3.7, 1.5, 0.4],
       [4.6, 3.6, 1. , 0.2],
       [5.1, 3.3, 1.7, 0.5],
       [4.8, 3.4, 1.9, 0.2],
       [5. , 3. , 1.6, 0.2],
       [5. , 3.4, 1.6, 0.4],
       [5.2, 3.5, 1.5, 0.2],
       [5.2, 3.4, 1.4, 0.2],
       [4.7, 3.2, 1.6, 0.2],
       [4.8, 3.1, 1.6, 0.2],
       [5.4, 3.4, 1.5, 0.4],
       [5.2, 4.1, 1.5, 0.1],
       [5.5, 4.2, 1.4, 0.2],
     

[array([5.9016129 , 2.7483871 , 4.39354839, 1.43387097]),
 array([5.006, 3.428, 1.462, 0.246]),
 array([6.85      , 3.07368421, 5.74210526, 2.07105263])]

### 4.4.2 Supervised Learning

In [39]:
# training:
from pyspark.ml.classification import *
clf=LogisticRegression()
model = clf.fit(train_df)

In [40]:
# testing:
prediction = model.transform(test_df)
prediction.show(5)

+-----------------+----------------+-----------------+----------------+-------+-----------------+-----+--------------------+-------------+----------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|Species|         features|label|       rawPrediction|  probability|prediction|
+-----------------+----------------+-----------------+----------------+-------+-----------------+-----+--------------------+-------------+----------+
|              4.4|             2.9|              1.4|             0.2|      0|[4.4,2.9,1.4,0.2]|  0.0|[3952.96056696346...|[1.0,0.0,0.0]|       0.0|
|              4.4|             3.2|              1.3|             0.2|      0|[4.4,3.2,1.3,0.2]|  0.0|[5256.47242683079...|[1.0,0.0,0.0]|       0.0|
|              4.8|             3.4|              1.6|             0.2|      0|[4.8,3.4,1.6,0.2]|  0.0|[5185.27359064195...|[1.0,0.0,0.0]|       0.0|
|              5.0|             3.0|              1.6|             0.2|      0|[5.0,3.0,1.6,0.2]|  0

In [41]:
prediction.select("label","prediction").show()

+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  2.0|       2.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  2.0|       1.0|
+-----+----------+
only showing top 20 rows

