In [21]:
import subprocess
from multiprocessing import Process,Semaphore,Lock,Queue,Pool
import asyncio
import time
import nest_asyncio
import aiohttp
from bs4 import BeautifulSoup
import logging
import random

nest_asyncio.apply()

# subprocess 生成多余进程
执行shell相关命令，替代os的部分功能，如os.system()

## 运行外部命令

In [None]:
completed = subprocess.run(['ls', '-1'])
print('returncode:', completed.returncode)

total 104
-rw-rw-r--  1 ubuntu ubuntu 33224 Mar 10 13:30 360linuxc(172.16.102.5_8080_8090).sh
drwxrwxr-x  2 ubuntu ubuntu  4096 Nov 18  2019 Anaconda
drwxrwxr-x 23 ubuntu ubuntu  4096 Nov 19  2019 anaconda3
drwxrwxr-x  3 ubuntu ubuntu  4096 Nov 29  2019 docker_test
drwxrwxr-x  2 ubuntu ubuntu  4096 Apr 13 10:55 K8S_test
returncode: 0

In [None]:
subprocess.run('echo $HOME', shell=True)

/home/ubuntu
CompletedProcess(args='echo $HOME', returncode=0)

### 捕获错误
subprocess.CalledProcessError

### 捕获输出

In [None]:
completed = subprocess.run(['ls', '-1'],stdout=subprocess.PIPE,)

print('returncode:', completed.returncode)
print('Have {} bytes in stdout:\n{}'.format(
    len(completed.stdout),
    completed.stdout.decode('utf-8'))
)

## 直接使用管道

call(args, *, stdin=None, stdout=None, stderr=None, shell=False, timeout=None)：执行命令，返回状态码

check_call(args, *, stdin=None, stdout=None, stderr=None, shell=False, timeout=None)：检查执行命令后的返回值，如果是0则返回，如果不是则抛出异常
 
check_output(args, *, stdin=None, stderr=None, shell=False, universal_newlines=False, timeout=None)：检查执行命令后的返回值，如果是0则返回执行结果，如果不是则抛出异常

**使用方法**

用数组作为参数
> subprocess.call(['df', '-h'])

用字符串作为参数(shell=True)，尤其对必须在shell中解析的regex
> subprocess.call(['df -h',shell=True])

## 与进程通信

Popen(args,bufsize=-1,stdin=None, stdout=None, stderr=None,cwd=None)：用于执行复杂的系统命令

利用communicate()函数返回一个tuple(标准输出和错误)
> args：shell命令，可以是字符串或者序列类型（如：list，元组）

> bufsize：指定缓冲

> stdin, stdout, stderr：分别表示程序的标准输入、输出、错误句柄

> cwd：用于设置子进程的当前目录

对子进程的操作
> wait()：等待子进程运行结束后再执行父进程

> poll()：检查子进程的状态

> kill()：终止子进程

> sent_signal()：向子进程发送信号

> pid：查看子进程的pid

> args：查看shell命令

利用管道符号：subprocess.PIPE

In [None]:
# 解决regex和参数同时存在的情况
import glob
import subprocess

for item in glob.glob("*url"):
        tumor = item.replace(".url","")
        p1 = subprocess.Popen(['wget', '-i', item], stdout=subprocess.PIPE)
        p1.communicate()
        p2 = subprocess.Popen(['mkdir', "temp"], stdout=subprocess.PIPE)
        p2.communicate()
        #使用一个中间文件，先解决regex，再解决参数
        p3 = subprocess.Popen(["mv *gz temp"],shell=True,stdout=subprocess.PIPE)
        p3.communicate()
        p4 = subprocess.Popen(["mv","temp",tumor],stdout=subprocess.PIPE)
        p4.communicate()

# multiprocessing 多进程

## 基础操作

Process(target [, name [, args [, kwargs]]])：target为调用函数名

cpu.count()：获取当前机器的 CPU 核心数量

active_children()：获取目前所有的运行的进程

start()：启动多个进程

daemon：设置为True，则父进程结束后子进程自动终止

join()：所有子进程都执行完再结束

Lock()：同一时间只能一个进程输出，其他进程等待
> 获得锁 Lock(),acquire()

> 释放锁 Lock().release()

> Semaphore(信号量)：做到同步和互斥，及控制临界资源数量

## 进程间通信
Queue

In [8]:
import time
from random import random

buffer=Queue(10)
empty=Semaphore(2)   #缓冲区空余数
full=Semaphore(0)    #缓冲区占用数
lock=Lock()

class Consumer(Process):

    def run(self):
        global buffer, empty, full, lock
        while True:
            full.acquire()
            lock.acquire()
            print("Consumer get",buffer.get())
            time.sleep(1)
            lock.release()
            empty.release()

class Producer(Process):

    def run(self):
        global buffer, empty, full, lock
        while True:
            empty.acquire()         #占用一个缓冲区位置,缓冲区空余数-1
            lock.acquire()
            num=random()
            print("Producer put",num)
            buffer.put(num)         #对缓冲区进行操作
            time.sleep(1)
            lock.release()
            full.release()          #缓冲区占用数+1

if __name__ == "__main__":
    p=Producer()
    c=Consumer()
    p.daemon=c.daemon=True
    p.start()
    c.start()
    p.join()
    c.join()
    print("Ended!")

Ended!


## 管道
Pipe

In [9]:
class Consumer(Process):
    def __init__(self,pipe):
        Process.__init__(self)
        self.pipe=pipe

    def run(self):
        self.pipe.send("Consumer words")
        print("Consumer received: ",self.pipe.recv())

class Producer(Process):
    def __init__(self, pipe):
        Process.__init__(self)
        self.pipe = pipe

    def run(self):
        self.pipe.send("Producer words")
        print("Producer received: ",self.pipe.recv())

if __name__ == "__main__":
    pipe=Pipe()
    p=Producer(pipe[0])       #将管道的两端分别传给两个进程
    c=Consumer(pipe[1])
    p.daemon=c.daemon=True
    p.start()
    c.start()
    p.join()
    c.join()
    print("Ended")

Ended


## 进程池 Pool
非阻塞：apply_async()，添加进程后，不一定非要等到改进程执行完就添加其他进程运行

阻塞：apply()则相反

In [17]:
### 阻塞
import math

pool=Pool(processes=5)
start=time.time()
for i in range(10):
    pool.apply(math.sqrt,(i,))
pool.close()
pool.join()

In [18]:
### 非阻塞

pool=Pool(processes=5)
start=time.time()
for i in range(10):
    pool.apply_async(math.sqrt,(i,))
pool.close()
pool.join()

### 多进程写入同一文件
使用multiprocessing库的回调函数（callback）

In [None]:
def mycallback(x):
        output.write(x+"\n")

if __name__=="__main__":
        pool=multiprocessing.Pool(processes=12)
        for item in strain_combination:
                pool.apply_async(compare2.calculate_distance,(item,),callback=mycallback)
        pool.close()
        pool.join()

**<font color="red">在多进程中使用装饰器时，由于装饰器修饰后的函数会丢失原有函数名，导致不可pickle，需要配合functools.wraps使用</font>**

## 实际案例

In [None]:
### methylation_binom_cal

from scipy.stats import binom
import argparse
import subprocess
import pandas as pd
import sys
sys.path.append("/home/zuotianyu/script/stat")
from model_stats import binom_test, generate_binom_table
import multiprocessing
import glob
import os

"""
拆成24个文件，30min以内处理完
新增加功能：根据输入的conversion_rate先制作table
"""

def parse_args():
    parser = argparse.ArgumentParser(description="检测甲基化位点")
    parser.add_argument("-f")
    parser.add_argument("-fdr",required=False,default=0.01)   #non-conversion + sequencing error
    parser.add_argument("-sd", required=False, default="/ehpcdata/zuotianyu/WGBS_mid_file/binom_test")
    parser.add_argument("-pfx")     #输出文件前缀
    args = parser.parse_args()
    return args.f,args.fdr,args.sd,args.pfx

def get_EX(n,thresholdDict):
    exp = thresholdDict.get(n)
    if exp:
        return exp
    else:
        return 1

def get_pValue(mNum,n,fdr,binomDict):
    pValue = binomDict.get("%s_%s" % (n, mNum))
    if pValue:
        return pValue
    else:
        return binom_test(mNum,n,fdr)

def process_file(_file,fdr,binomDict,thresholdDict):

    output = open("%s.txt" %(_file),"a")
    with open(_file) as f:
        for line in f:
            lineList = line.rstrip().split()
            mNum = int(lineList[3])
            umNum = int(lineList[4])
            n = mNum+umNum
            EX = get_EX(n,thresholdDict)                    #默认0.05
            pValue = get_pValue(mNum,n,fdr,binomDict)
            output.write("%s\t%s\t%s\n" %("\t".join(lineList[:7]),EX,pValue))
    f.close()
    output.close()
    os.remove(_file)

if __name__ == "__main__":
    input_file,fdr,sd,output_prefix = parse_args()
    fdr = round(float(fdr),3)
    if not os.path.exists(output_prefix):
        os.mkdir(output_prefix)
    os.chdir(output_prefix)

    generate_binom_table(fdr,sd)
    table = pd.read_table("%s/binom_table_%s" %(sd,fdr), header=None, names=["con", "p"], dtype={"con": str, "p": str})
    table2 = pd.read_table("%s/binom_threshold_%s" %(sd,fdr), header=None, names=["n", "m"])
    binomDict = dict(zip(table["con"], table["p"]))
    thresholdDict = dict(zip(table2["n"], table2["m"]))

    wd = os.getcwd()
    output = open("%s.CX_report.txt" %(output_prefix), "a")
    subprocess.check_call(["split","-l","50000000",input_file,"%s_piece" %(output_prefix)])
    record = []
    pool = multiprocessing.Pool(processes=24)
    for piece_file in glob.glob("*piece*"):
        pool.apply_async(process_file, args=(piece_file,fdr,binomDict,thresholdDict,))
        record.append(piece_file)
    pool.close()
    pool.join()

    for item in record:
        _file = "%s.txt" %(item)
        with open(_file) as f:
            for line in f:
                output.write(line)
        f.close()
        os.remove(_file)
    output.close()

In [None]:
### WGBS_chiploc

import pandas as pd
import time
import glob
import multiprocessing
from functools import wraps

"""
处理数据背景：甲基化样本共200+，每个文件10亿+行，需要从中挑出在甲基化芯片中50万+条序列中的位点（每条序列50bp长）
速度优化：1）甲基化样本文件不是按照染色体顺序排列，但每条染色体内部的位置都是排过序的
          -->需要先对芯片位点排序，按照chr提取起点位置列表至字典中方便调用
          2）judge()函数中的搜索优化：终止+位点列表的删减（已检出位点之前所有位点的删除，避免无效搜索）
          3）输出每个样本每条染色体的运行时间和总运行时间
          4）多进程处理，加快速度（注意decorator的坑）

"""
def process_chip():
    chip = "/glusterfs/home/zuo_tianyu/python_script/methylation/human_methylation_450k_beadchip_array.txt"
    table = pd.read_table(chip,dtype={"CHR":str})
    select_columns = ["CHR","MAPINFO"]
    df = table[select_columns].dropna()
    df.rename(columns={"MAPINFO":"start","CHR":"chr"},inplace=True)    #芯片都是50bp
    df["chr"] = "chr"+df["chr"]
    df.sort_values(by=["chr","start"],inplace=True)
    df.set_index("chr",inplace=True)
    chrList = df.index.unique()
    chr_locDict = {}
    for chr_name in chrList:
        chr_locDict[chr_name] = df.ix[chr_name]["start"].tolist()
    print("芯片所用序列位点已获得")
    return chr_locDict

def timer(func):
    """
    multiprocessing不支持装饰器直接使用（因为装饰器修饰后的函数会丢失原有函数名，导致不可pickle）
    使用functools.wraps解决（注解底层包装函数）
    """
    @wraps(func)
    def wrapper(*args,**kwargs):
        start_time = time.time()
        func(*args,**kwargs)
        end_time = time.time()
        print("%s" % (end_time - start_time))
    return wrapper

@timer
def process_sample(CX_file,chr_locDict):
    copyDict = chr_locDict
    print("%s开始处理" % (CX_file))
    sample_name = CX_file.replace(".*", "")
    output = open(sample_name + ".CX_filtered.txt", "a")
    last_chr = ""
    chr_process_time = time.time()
    with open(CX_file) as f:
        for line in f:
            lineList = line.rstrip().split()
            _chr = lineList[0]
            loc = lineList[1]
            if "_" in _chr or _chr == "chrM":
                continue
            if monitor(last_chr,_chr):
                print("%s的%s处理共耗时%s" %(sample_name,last_chr,time.time()-chr_process_time))
                chr_process_time = time.time()
                last_chr = _chr
            else:
                pass
            this_judge = judge(_chr, loc,copyDict)
            if this_judge:
                output.write(line)
                copyDict = this_judge
            else:
                pass
    print("%s处理完成" % (CX_file))
    f.close()
    output.close()

#输出各chr的处理时间
def monitor(last_chr,now_chr):
    if last_chr != now_chr:
        return 1
    else:
        return

def judge(_chr,loc,chr_locDict):
    chip_loc = chr_locDict[_chr]
    loc = int(loc)
    for nu,loci in enumerate(chip_loc):
        loci = int(loci)
        if loc < loci:
            return
        elif loc >= loci and loc <= loci+49:
            del chip_loc[:nu]
            chr_locDict[_chr] = chip_loc
            return chr_locDict
        else:
            pass

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=24)
    chr_locDict = process_chip()
    for CX_file in glob.glob("*txt"):
        pool.apply_async(process_sample,args=(CX_file,chr_locDict,))
    pool.close()
    pool.join()

# asyncio 异步I/O

In [22]:
async def add(x=1,y=2):
    print("Add {} + {} ...".format(x,y))
    await asyncio.sleep(2)
    return x+y

In [23]:
### 执行多个task
s = time.time()
loop = asyncio.get_event_loop()
tasks = [add(x,y) for x,y in zip(range(1,10),range(11,20))]
loop.run_until_complete(asyncio.wait(tasks))
print(time.time()-s)

Add 6 + 16 ...
Add 1 + 11 ...
Add 7 + 17 ...
Add 2 + 12 ...
Add 8 + 18 ...
Add 3 + 13 ...
Add 9 + 19 ...
Add 4 + 14 ...
Add 5 + 15 ...
2.0032901763916016


In [24]:
# 获取多个并发的task的结果
s = time.time()
loop = asyncio.get_event_loop()
tasks = [loop.create_task(add(1,2)),loop.create_task(add(11,12)),loop.create_task(add(111,112))]
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
    print(task.result())
print(time.time()-s)

Add 1 + 2 ...
Add 11 + 12 ...
Add 111 + 112 ...
3
23
223
2.00235915184021
