# 066 스레드를 이용하여 병렬로 처리하려면? ― threading

In [None]:
# 066 스레드를 이용하여 병렬로 처리하려면? ― threading
# threading은 스레드를 이용하여 한 프로세스에서 2가지 이상의 일을 동시에 실행할 수 있게 하는 모듈이다.

# 문제
# 다음은 페이지 번호를 입력받아 위키독스의 페이지 리소스를 wikidocs_페이지번호.html 파일로 저장하도록 만든 함수이다.

# SSL 관련 오류가 발생한다면 '030 웹 페이지를 임시로 저장하려면? - functools.lru_cache'을 참고하자.

# import urllib.request


# def get_wikidocs(page):
#     print("wikidocs page:{}".format(page))  # 페이지 호출시 출력
#     resource = 'https://wikidocs.net/{}'.format(page)
#     try:
#         with urllib.request.urlopen(resource) as s:
#             with open('wikidocs_%s.html' % page, 'wb') as f:
#                 f.write(s.read())
#     except urllib.error.HTTPError:
#         return 'Not Found'
# A 씨는 참고 자료를 만드는 데 다음처럼 10개의 위키독스 페이지가 필요하다고 한다.

# 12, 13, 14, 15, 17, 18, 20, 21, 22, 24
# 앞의 함수를 사용하여 페이지 수만큼 실행하여 원하는 결과를 얻을 수 있다. 하지만, A 씨는 10개의 요청을 하나씩 보내기보다는 동시에 요청하여 더 빠른 속도로 리소스를 얻고 싶어 한다. 동시에 요청할 수 있도록 하려면 함수를 어떻게 수정해야 할까?

# 참고: 090 웹 페이지를 저장하려면? - urllib


In [1]:
import urllib.request


def get_wikidocs(page):
    print("wikidocs page:{}".format(page))  # 페이지 호출시 출력
    resource = 'https://wikidocs.net/{}'.format(page)
    try:
        with urllib.request.urlopen(resource) as s:
            with open('wikidocs_%s.html' % page, 'wb') as f:
                f.write(s.read())
    except urllib.error.HTTPError:
        return 'Not Found'

get_wikidocs(3)    

wikidocs page:3


In [None]:
# 풀이
# 동시에 요청하려면 threading 모듈을 사용하면 된다. 먼저 이 모듈을 사용하지 않고 10개의 페이지를 요청할 때 시간이 얼마나 걸리는지 측정해 보자.

# [파일명: threading_no_sample.py]

# import urllib.request


# def get_wikidocs(page):
#     print("wikidocs page:{}".format(page))  # 페이지 호출시 출력
#     resource = 'https://wikidocs.net/{}'.format(page)
#     try:
#         with urllib.request.urlopen(resource) as s:
#             with open('wikidocs_%s.html' % page, 'wb') as f:
#                 f.write(s.read())
#     except urllib.error.HTTPError:
#         return 'Not Found'


# import time

# start = time.time()

# pages = [12, 13, 14, 15, 17, 18, 20, 21, 22, 24]
# for page in pages:
#     get_wikidocs(page)

# end = time.time()

# print("수행시간: %f 초" % (end - start))
# 이 프로그램을 실행하면 다음과 같이 출력된다.

# wikidocs page:12
# wikidocs page:13
# wikidocs page:14
# wikidocs page:15
# wikidocs page:17
# wikidocs page:18
# wikidocs page:20
# wikidocs page:21
# wikidocs page:22
# wikidocs page:24
# 수행시간: 10.096202 초
# 필자의 PC에서는 페이지 10개의 리소스를 저장하는데 10초 정도의 시간이 걸렸다.

# 예제 실행 시간은 네트워크 환경 또는 서버의 트래픽 상태에 따라서 다를 수 있다.


In [2]:
import urllib.request


def get_wikidocs(page):
    print("wikidocs page:{}".format(page))  # 페이지 호출시 출력
    resource = 'https://wikidocs.net/{}'.format(page)
    try:
        with urllib.request.urlopen(resource) as s:
            with open('wikidocs_%s.html' % page, 'wb') as f:
                f.write(s.read())
    except urllib.error.HTTPError:
        return 'Not Found'

import time

start = time.time()

pages = [12, 13, 14, 15, 17, 18, 20, 21, 22, 24]
for page in pages:
    get_wikidocs(page)

end = time.time()

print("수행시간: %f 초" % (end - start))

wikidocs page:12
wikidocs page:13
wikidocs page:14
wikidocs page:15
wikidocs page:17
wikidocs page:18
wikidocs page:20
wikidocs page:21
wikidocs page:22
wikidocs page:24
수행시간: 2.542960 초


In [None]:
# 이번에는 스레드로 병렬 처리하는 threading 모듈을 사용하여 페이지 리소스를 동시에 저장하도록 변경해 보자.

# [파일명: threading_yes_sample.py]

# import urllib.request


# def get_wikidocs(page):
#     print("wikidocs page:{}".format(page))  # 페이지 호출시 출력
#     resource = 'https://wikidocs.net/{}'.format(page)
#     try:
#         with urllib.request.urlopen(resource) as s:
#             with open('wikidocs_%s.html' % page, 'wb') as f:
#                 f.write(s.read())
#     except urllib.error.HTTPError:
#         return 'Not Found'


# import time
# import threading

# start = time.time()

# pages = [12, 13, 14, 15, 17, 18, 20, 21, 22, 24]
# threads = []
# for page in pages:
#     t = threading.Thread(target=get_wikidocs, args=(page, ))
#     t.start()
#     threads.append(t)

# for t in threads:
#     t.join()  # 스레드가 종료될 때까지 대기

# end = time.time()

# print("수행시간: %f 초" % (end - start))

# threading.Thread로 get_wikidocs() 함수를 실행하는 스레드 t를 생성한다. target은 실행할 함수이고 args는 함수에 전달할 인수이다. 
# 이렇게 스레드를 생성한 후 t.start()를 실행하면 스레드가 독립적으로 실행된다.

# 각 스레드는 이전 스레드의 종료를 기다리지 않고 곧바로 실행된다.

# 그리고 만든 각 스레드 t는 threads라는 리스트 변수에 담았다. 이렇게 하는 이유는 생성된 스레드를 threads 변수에 담아 두어야 스레드가 종료될 때까지 
# 기다리는 join() 함수를 for문 밖에서 차례대로 호출할 수 있기 때문이다. 
# 만약 for문 안에서 스레드를 생성하여 실행하고 join() 함수도 곧바로 호출한다면 스레드의 이점이 사라지게 된다.

# 생성된 스레드를 모두 종료하려면 생성된 스레드마다 join() 함수를 호출해야만 한다. 
# 만약 join() 없이 이 프로그램을 실행하면 아직 스레드가 종료되지 않은 상태에서 수행 시간이 출력되므로 정확한 수행 시간을 출력할 수 없게 된다.

# threading 모듈을 사용하도록 수정한 프로그램의 실행 결과는 다음과 같다.

# wikidocs page:12
# wikidocs page:13
# wikidocs page:14
# wikidocs page:15
# wikidocs page:17
# wikidocs page:18
# wikidocs page:20
# wikidocs page:21
# wikidocs page:22
# wikidocs page:24
# 수행시간: 2.453772 초
# 10초 정도 걸리던 작업이 2.4초가량으로 상당히 단축된 것을 확인할 수 있다(실행 환경에 따라 실행 시간은 다름).

# 참고
# threading - 스레드 기반 병렬 처리: https://docs.python.org/ko/3/library/threading.html

In [3]:
import urllib.request


def get_wikidocs(page):
    print("wikidocs page:{}".format(page))  # 페이지 호출시 출력
    resource = 'https://wikidocs.net/{}'.format(page)
    try:
        with urllib.request.urlopen(resource) as s:
            with open('wikidocs_%s.html' % page, 'wb') as f:
                f.write(s.read())
    except urllib.error.HTTPError:
        return 'Not Found'

import time
import threading

start = time.time()

pages = [12, 13, 14, 15, 17, 18, 20, 21, 22, 24]
threads = []
for page in pages:
    t = threading.Thread(target=get_wikidocs, args=(page, ))
    t.start()
    threads.append(t)

for t in threads:
    t.join()  # 스레드가 종료될 때까지 대기

end = time.time()

print("수행시간: %f 초" % (end - start))


wikidocs page:12
wikidocs page:13
wikidocs page:14
wikidocs page:15
wikidocs page:17
wikidocs page:18
wikidocs page:20
wikidocs page:21
wikidocs page:22
wikidocs page:24
수행시간: 0.835591 초


## 067 멀티 프로세스를 이용하여 병렬로 처리하려면? ― multiprocessing

In [None]:
# 067 멀티 프로세스를 이용하여 병렬로 처리하려면? ― multiprocessing

# multiprocessing은 멀티 프로세스를 활용하여 2가지 또는 그 이상의 일을 동시에 실행할 수 있게 하는 모듈이다.

# 문제
# 다음은 CPU 연산이 많은 heavy_work() 함수를 4번 실행하고 그 소요 시간을 측정하는 예제이다.

# import time


# def heavy_work(name):
#     result = 0
#     for i in range(4000000):
#         result += i
#     print('%s done' % name)


# start = time.time()

# for i in range(4):
#     heavy_work(i)

# end = time.time()

# print("수행시간: %f 초" % (end - start))
# 프로그램의 실행 결과는 다음과 같다.

# 0 done
# 1 done
# 2 done
# 3 done
# 수행시간: 2.027041 초

# 이때 heavy_work() 함수를 동시에 실행하여 전체 수행 시간을 단축하려면 어떻게 프로그램을 수정해야 할까?


In [6]:
import time

def heavy_work(name):
    result = 0
    for i in range(4000000):
        result += i
    print(f'{name} done : result: {result:,}')
    
start = time.time()
for i in range(4):
    heavy_work(i)

end = time.time()

print("수행시간: %f 초" % (end - start))
    
        

0 done : result: 7,999,998,000,000
1 done : result: 7,999,998,000,000
2 done : result: 7,999,998,000,000
3 done : result: 7,999,998,000,000
수행시간: 0.691812 초


In [None]:
# 풀이
# 다음은 threading 모듈을 사용하여 문제를 풀고자 한 모습이다.

# 참고: 066 스레드를 이용하여 병렬로 처리하려면? - threading

# [파일명: multiprocessing_threading_sample.py]

# import time


# def heavy_work(name):
#     result = 0
#     for i in range(4000000):
#         result += i
#     print('%s done' % name)


# if __name__ == '__main__':
#     import threading

#     start = time.time()
#     threads = []
#     for i in range(4):
#         t = threading.Thread(target=heavy_work, args=(i, ))
#         t.start()
#         threads.append(t)

#     for t in threads:
#         t.join()  # 스레드가 종료될 때까지 대기

#     end = time.time()

#     print("수행시간: %f 초" % (end - start))
# 실행 결과는 다음과 같다.

# c:\projects\pylib>python multiprocessing_threading_sample.py
# 2 done
# 0 done
# 1 done
# 3 done
# 수행시간: 2.005648 초
# 이처럼 스레드는 CPU 연산만 수행할 때는 수행 시간 단축에 그리 도움이 되지 않음을 알 수 있다.

# 파이썬 스레드는 메모리 관리를 위해 하나의 스레드만이 파이썬 객체에 접근할 수 있도록 제한하는데, 이것을 GIL(Global Interpreter Lock)이라 한다. 
# 이러한 이유로 스레드는 GIL에 영향을 받지 않는 I/O가 주로 발생하는 네트워크 통신 또는 파일 읽고 쓰기와 같은 작업에 유리하다.


In [None]:
!pip install py-cpuinfo

In [16]:
import cpuinfo


In [19]:
import os
print('OS CPU COUNT : ', os.cpu_count())

import multiprocessing
print('OS CPU COUNT : ', multiprocessing.cpu_count())

import platform
def printOsInfo():
    print('OS                   :\t', platform.system())
    print('OS Version           :\t', platform.version())
printOsInfo()
    
import platform ,psutil
def printSystemInfor():
    print('Process information  :\t', platform.processor())
    print('Process Architecture :\t', platform.machine())
    print('RAM Size             :\t',str(round(psutil.virtual_memory().total / (1024.0 **3)))+"(GB)")
printSystemInfor()

import cpuinfo
cpuinfo.get_cpu_info()


OS CPU COUNT :  12
OS CPU COUNT :  12
OS                   :	 Windows
OS Version           :	 10.0.22621
Process information  :	 Intel64 Family 6 Model 165 Stepping 3, GenuineIntel
Process Architecture :	 AMD64
RAM Size             :	 16(GB)


{'python_version': '3.9.7.final.0 (64 bit)',
 'cpuinfo_version': [9, 0, 0],
 'cpuinfo_version_string': '9.0.0',
 'arch': 'X86_64',
 'bits': 64,
 'count': 12,
 'arch_string_raw': 'AMD64',
 'vendor_id_raw': 'GenuineIntel',
 'brand_raw': 'Intel(R) Core(TM) i5-10400 CPU @ 2.90GHz',
 'hz_advertised_friendly': '2.9000 GHz',
 'hz_actual_friendly': '2.9040 GHz',
 'hz_advertised': [2900000000, 0],
 'hz_actual': [2904000000, 0],
 'l2_cache_size': 1572864,
 'stepping': 3,
 'model': 165,
 'family': 6,
 'l3_cache_size': 12582912,
 'flags': ['3dnow',
  '3dnowprefetch',
  'abm',
  'acpi',
  'adx',
  'aes',
  'apic',
  'avx',
  'avx2',
  'bmi1',
  'bmi2',
  'clflush',
  'clflushopt',
  'cmov',
  'cx16',
  'cx8',
  'de',
  'ds_cpl',
  'dtes64',
  'dts',
  'erms',
  'est',
  'f16c',
  'fma',
  'fpu',
  'fxsr',
  'ht',
  'ia64',
  'intel_pt',
  'invpcid',
  'lahf_lm',
  'mca',
  'mce',
  'mmx',
  'monitor',
  'movbe',
  'mpx',
  'msr',
  'mtrr',
  'osxsave',
  'pae',
  'pat',
  'pbe',
  'pcid',
  'pclmul

In [8]:
# [Python] GIL, Global interpreter Lock 은 무엇일까요.
# 참조 : https://ssungkang.tistory.com/entry/python-GIL-Global-interpreter-Lock%EC%9D%80-%EB%AC%B4%EC%97%87%EC%9D%BC%EA%B9%8C

import random
import threading
import time


def working():
    max([random.random() for i in range(5000000)])


# 1 Thread
# ------------------
s_time = time.time()
working()
working()
e_time = time.time()
print(f'{e_time - s_time:.5f}')


# 2 Threads
# ------------------
s_time = time.time()
threads = []
for i in range(2):
    threads.append(threading.Thread(target=working))
    threads[-1].start()

for t in threads:
    t.join()

e_time = time.time()
print(f'{e_time - s_time:.5f}')



# 3 Multiprocessing
# ------------------

import multiprocessing

start = time.time()
procs = []
for i in range(4):
    p = multiprocessing.Process(target=working)
    p.start()
    procs.append(p)

for p in procs:
    p.join()  # 프로세스가 모두 종료될 때까지 대기

end = time.time()

print(f'{e_time - s_time:.5f}')

1.08530
1.13763
1.13763


In [6]:
import random
import threading
import time


def working():
    time.sleep(0.1)
    max([random.random() for i in range(100000)])
    time.sleep(0.1)
    max([random.random() for i in range(100000)])
    time.sleep(0.1)
    max([random.random() for i in range(100000)])
    time.sleep(0.1)
    max([random.random() for i in range(100000)])
    time.sleep(0.1)
    max([random.random() for i in range(100000)])
    time.sleep(0.1)


# 1 Thread
s_time = time.time()
working()
working()
e_time = time.time()
print(f'{e_time - s_time:.5f}')


# 2 Threads
s_time = time.time()
threads = []
for i in range(2):
    threads.append(threading.Thread(target=working))
    threads[-1].start()

for t in threads:
    t.join()

e_time = time.time()
print(f'{e_time - s_time:.5f}')


# 3 Multiprocessing
#-----------------
import multiprocessing

start = time.time()
procs = []
for i in range(4):
    p = multiprocessing.Process(target=working)
    p.start()
    procs.append(p)

for p in procs:
    p.join()  # 프로세스가 모두 종료될 때까지 대기

end = time.time()

print(f'{e_time - s_time:.5f}')



1.29323
0.66623
0.66623


In [7]:
import time

def heavy_work(name):
    result = 0
    for i in range(4000000):
        result += i
    print(f'{name} done : result: {result:,}')


if __name__ == '__main__':
    import threading

    start = time.time()
    threads = []
    for i in range(4):
        t = threading.Thread(target=heavy_work, args=(i, ))
        t.start()
        threads.append(t)

    for t in threads:
        t.join()  # 스레드가 종료될 때까지 대기

    end = time.time()

    print("수행시간: %f 초" % (end - start))


1 done : result: 7,999,998,000,000
0 done : result: 7,999,998,000,000
2 done : result: 7,999,998,000,000
3 done : result: 7,999,998,000,000
수행시간: 0.710613 초


In [None]:
# 이번에는 threading 모듈 대신 multiprocessing 모듈을 사용해 보자. 
# multiprocessing 모듈은 멀티 프로세서와 별개의 메모리를 사용하여 완전히 독립하여 병렬 프로그래밍할 수 있다. 
# 단, 여러 개의 CPU가 있는 멀티코어 환경에서만 가능하다.

# [파일명: multiprocessing_sample.py]

# import time


# def heavy_work(name):
#     result = 0
#     for i in range(4000000):
#         result += i
#     print('%s done' % name)


# if __name__ == '__main__':
#     import multiprocessing

#     start = time.time()
#     procs = []
#     for i in range(4):
#         p = multiprocessing.Process(target=heavy_work, args=(i, ))
#         p.start()
#         procs.append(p)

#     for p in procs:
#         p.join()  # 프로세스가 모두 종료될 때까지 대기

#     end = time.time()

#     print("수행시간: %f 초" % (end - start))

# 사용 방법은 threading 모듈일 때와 마찬가지이다.

# multiprocessing 모듈과 threading 모듈의 사용법이 같다는 것은 의도적으로 둘 간 변환이 쉽도록 설계했기 때문으로 추정된다.

# multiprocessing.Process 클래스로 프로세스를 생성한다. 프로세스 생성 시 target에는 실행할 함수명을 지정하고 args에는 그 함수에 전달할 인수를 설정한다. 
# 스레드와 마찬가지로 start() 함수로 프로세스를 실행하고 join() 함수로 프로세스가 종료되기를 기다린다.

# multiprocessing.Process를 사용한 결과는 다음과 같다.

# c:\projects\pylib>python multiprocessing_sample.py
# 0 done
# 3 done
# 1 done
# 2 done
# 수행시간: 0.663679 초
# 멀티코어 환경에서 무려 3배 이상 속도가 향상되었다는 것을 확인할 수 있다.

# 이 예제는 4 CPU 환경에서 실행했다.

# 알아두면 좋아요
# multiprocessing.Pool
# multiprocessing.Process와 비슷한 multiprocessing.Pool을 사용할 수도 있다. 사용 방법은 다음과 같다.

# [파일명: multiprocessing_pool_sample.py]

# import time


# def heavy_work(name):
#     result = 0
#     for i in range(4000000):
#         result += i
#     print('%s done' % name)


# if __name__ == '__main__':
#     import multiprocessing

#     start = time.time()
#     pool = multiprocessing.Pool(processes=4)
#     pool.map(heavy_work, range(4))
#     pool.close()
#     pool.join()

#     end = time.time()

#     print("수행시간: %f 초" % (end - start))
# multiprocessing.Pool(processes=4)로 pool 객체를 만들고 나서 map()을 호출하면 heavy_work() 함수를 동시에 실행할 수 있다. multiprocessing.Pool은 포크할 프로세스의 개수를 지정할 수 있다는 특징이 있다. 위 예에서는 4개의 프로세스를 실행하도록 설정하였다.

# 포크(fork)란 프로세스가 자기 자신을 복제하는 동작을 일컫는다.

# 참고
# multiprocessing - 프로세스 기반 병렬 처리: https://docs.python.org/ko/3/library/multiprocessing.html

In [4]:
import time


def heavy_work(name):
    result = 0
    for i in range(4000000):
        result += i
    print('%s done' % name)


if __name__ == '__main__':
    import multiprocessing

    start = time.time()
    procs = []
    for i in range(4):
        p = multiprocessing.Process(target=heavy_work, args=(i, ))
        p.start()
        procs.append(p)

    for p in procs:
        p.join()  # 프로세스가 모두 종료될 때까지 대기

    end = time.time()

    print("수행시간: %f 초" % (end - start))


수행시간: 0.162991 초


## 068 병렬로 작업을 처리하려면? ― concurrent.futures


In [None]:
# 068 병렬로 작업을 처리하려면? ― concurrent.futures
# 스레드를 구현하려면 threading 모듈을 사용하고 멀티 프로세스 프로그램을 구현하려면 multiprocessing 모듈을 사용해야 한다. 
# 하지만, 일반적으로 concurrent.futures 모듈을 사용하면 같은 규칙으로 스레드와 멀티 프로세스 코드를 더 쉽게 작성할 수 있다.

# 문제
# 다음은 multiprocessing 절에서 다루었던 문제 풀이이다.

# 참고 : 067 프로세스를 이용하여 병렬로 처리하려면? - multiprocessing

# import time


# def heavy_work(name):
#     result = 0
#     for i in range(4000000):
#         result += i  # result 를 계산만 하고 리턴하지 않았다.
#     print('%s done' % name)


# if __name__ == '__main__':
#     import multiprocessing

#     start = time.time()
#     procs = []
#     for i in range(4):
#         p = multiprocessing.Process(target=heavy_work, args=(i, ))
#         p.start()
#         procs.append(p)

#     for p in procs:
#         p.join()  # 프로세스가 모두 종료될 때까지 기다린다.

#     end = time.time()

#     print("수행시간: %f 초" % (end - start))

# 여러 프로세스가 동시에 heavy_work() 함수를 호출하는 코드이다. 
# 현재 heavy_work() 함수는 result를 계산만 하고 그 값을 반환하지는 않는다. 
# 계산한 result를 반환하도록 heavy_work() 함수를 수정하고 호출한 heavy_work() 함수의 반환값을 모두 더해 출력하도록 
# 메인 프로세스를 수정하려면 어떻게 해야 할까?


In [None]:
# 풀이
# multiprocess 모듈로 포크한 프로세스의 수행 결과를 메인 프로세스에서 취합하는 방법에는 여러 가지가 있겠지만 
# 가장 일반적으로 사용하는 concurrent.futures 모듈로 이 문제를 풀어보자.

# [파일명: concurrent_futures_process_sample.py]

# import time


# def heavy_work(name):
#     result = 0
#     for i in range(4000000):
#         result += i
#     print('%s done' % name)
#     return result  # 결과를 반환하도록 변경


# if __name__ == '__main__':
#     import concurrent.futures

#     start = time.time()

#     total_result = 0
#     pool = concurrent.futures.ProcessPoolExecutor(max_workers=4)

#     procs = []
#     for i in range(4):
#         procs.append(pool.submit(heavy_work, i))

#     for p in concurrent.futures.as_completed(procs):
#         total_result += p.result()

#     end = time.time()

#     print("수행시간: %f 초" % (end - start))
#     print("총결괏값: %s" % total_result)

# 우선 heavy_work() 함수가 계산한 result를 반환하도록 변경했다. 
# 그리고 multiprocessing.Process 대신 concurrent.futures.ProcessPoolExecutor를 사용하도록 변경했다. 
# 두 모듈 모두 역할은 같다. 
# 단, concurrent.futures.ProcessPoolExecutor는 max_workers를 지정하여 포크할 프로세스의 최대 개수를 지정할 수 있다는 점이 다르다.

# pool.submit(heavy_work, i)는 프로세스를 포크하여 heavy_work() 함수를 호출한다. i 는 heavy_work() 함수에 전달할 인수이다. 
# concurrent.futures.as_completed(procs)는 수행된 프로세스가 모두 종료될 때까지 대기하게 하고 종료된 순서대로 프로세스 p를 반환한다. 
# 이때 각 프로세스의 수행 결괏값은 p.result()로 얻을 수 있다.

# 프로세스 결괏값이 필요 없다면 concurrent.futures.as_completed(procs) 대신 concurrent.futures.wait(procs)를 사용한다.

# 프로그램의 실행 결과는 다음과 같다.

#   c:\projects\pylib>python concurrent_futures_process_sample.py
# 2 done
# 0 done
# 1 done
# 3 done
# 수행시간: 0.926396 초
# 총결괏값: 31999992000000
#


In [None]:
# 알아두면 좋아요

# concurrent.futures.ThreadPoolExecutor
# 앞서 살펴보았던 위키독스의 여러 페이지 리소스를 스레드를 이용하여 가져오는 예제도 concurrent.futures.ThreadPoolExecutor를 사용하여 다음과 같이 변경할 수 있다.

# ※ 참고 : 066 스레드를 이용하여 병렬로 처리하려면? - threading

# [파일명: concurrent_futures_thread_sample.py]

# import urllib.request


# def get_wikidocs(page):
#     print("wikidocs page:{}".format(page))  # 페이지 호출시 출력
#     resource = 'https://wikidocs.net/{}'.format(page)
#     try:
#         with urllib.request.urlopen(resource) as s:
#             with open('wikidocs_%s.html' % page, 'wb') as f:
#                 f.write(s.read())
#     except urllib.error.HTTPError:
#         return 'Not Found'


# if __name__ == '__main__':
#     import time
#     import concurrent.futures

#     start = time.time()

#     pages = [12, 13, 14, 15, 17, 18, 20, 21, 22, 24]
#     pool = concurrent.futures.ThreadPoolExecutor(max_workers=4)

#     threads = []
#     for page in pages:
#         threads.append(pool.submit(get_wikidocs, page))

#     concurrent.futures.wait(threads)  # 스레드가 모두 종료될 때까지 대기

#     end = time.time()

#     print("수행시간: %f 초" % (end - start))

# 이 예제에서는 반환값이 필요 없으므로 concurrent.futures.as_completed 대신 concurrent.futures.wait를 사용했다. 
# concurrent.futures.wait는 스레드가 모두 종료될 때까지 기다리는 역할만 한다.

# concurrent.futures.ThreadPoolExecutor와 concurrent.futures.ProcessPoolExecutor의 사용법이 똑같다는 점에 주목하자.

# 참고
# concurrent.futures - 병렬 작업 실행하기: https://docs.python.org/ko/3/library/concurrent.futures.html#module-concurrent.futures

## 069 시스템 명령어를 실행하려면? ― subprocess


In [None]:
# 069 시스템 명령어를 실행하려면? ― subprocess
# subprocess는 다양한 방법으로 시스템 명령을 실행하는 모듈이다.

# 문제
# 다음은 리눅스나 유닉스 시스템에서 현재 디렉터리의 파일 목록을 자세하게 출력할 때 사용하는 명령어이다.

# ls -l
# 특정 디렉터리의 파일 목록을 작성하고자 파이썬으로 이 명령을 실행하고 그 결과를 out.txt 파일로 저장하는 프로그램을 만들려면 어떻게 해야 할까?

In [None]:
# 풀이
# 시스템 명령어를 실행하는 방법에는 os.system, os.spawn 등이 있지만, 사용 방법이 다양하고 유연한 subprocess를 사용하는 것이 가장 일반적인 방법이다. 
# 실제로 파이썬 문서에서도 os.system 대신 subprocess를 사용할 것을 추천한다.

# 다음은 subprocess를 사용한 문제 풀이이다.

# [파일명: subprocess_sample.py]

# import subprocess

# with open('out.txt', 'wb') as f:
#     out = subprocess.run(['ls', '-l'], capture_output=True)
#     f.write(out.stdout)

# 윈도우라면 ['ls', '-l'] 대신 ['cmd', '/c', 'dir']을 사용하면 된다.

# ls -l이라는 명령어는 ['ls', '-l']처럼 공백을 기준으로 나눈(split()) 문자열을 리스트로 전달해야 한다.


In [None]:
import subprocess

with open('out.txt', 'wb') as f:
    out = subprocess.run(['cmd', '/c', 'dir'], capture_output=True)
    f.write(out.stdout)
    

with open('out.txt', 'rb') as f:
    for line in f.readlines():
        print(line.decode('euc-kr'))
        

In [None]:
# 알아두면 좋아요
# --------------

# shlex로 명령어 리스트 생성하기
# 다음처럼 shlex를 사용하면 더 간단하게 명령어 리스트를 생성할 수 있다. 명령어 옵션이 많을 때는 shlex를 사용하면 편리하다.

# 참고: 105 문장을 분석하려면? - shlex

# import shlex

# command = shlex.split('ls -l')
# print(command)  # ['ls', '-l'] 출력

# capture_output=True는 명령을 실행한 결과를 저장한다는 의미이다. capture_output을 True로 설정하면 위와 같이 out 객체로 표준 출력에 
# 해당하는 out.stdout, 또는 표준 오류에 해당하는 out.stderr를 구할 수 있다.

# out.stdout은 바이트 문자열이므로 파일 작성 시 'wb' 모드로 지정해야 한다. 
# 출력 결과로 바이트 대신 일반 문자열을 얻고 싶다면 다음처럼 text=True 옵션을 추가하면 된다.

# out = subprocess.run(['ls', '-l'], capture_output=True, text=True)
# text=True 옵션을 추가했다면 출력 결과는 바이트가 아닌 일반 문자열이 되므로 파일 쓰기 모드도 'wb'가 아닌 'w'로 바꾸어야 한다.


In [None]:
# 알아두면 좋아요
# --------------
# 복잡한 명령어의 실행
# 다음처럼 복잡한 형태의 명령어를 실행할 때는 shell=True 옵션을 사용하는 것이 유리하다.

# import subprocess

# subprocess.run('find ./ -name "*.html"|xargs grep "python"', shell=True)
# shell=True 옵션을 지정하면 명령어를 리스트로 전달할 필요 없이 전체 명령어를 하나의 문자열로 전달하면 된다.

# 여기서 사용한 find는 현재 디렉터리의 하위 디렉터리를 포함한 모든 html 파일 중 "python"이라는 문자열이 포함된 부분을 모두 찾아서 출력하는 명령어이다.

In [42]:
import subprocess

with open('out.txt', 'wb') as f:
    #out = subprocess.run('cmd /c dir', shell=True, capture_output=True)
    out = subprocess.run('dir', shell=True, capture_output=True)
    #out = subprocess.run(['cmd', '/c', 'dir'], capture_output=True)
    f.write(out.stdout)
    f.write(out.stderr)
    

with open('out.txt', 'rb') as f:
    for line in f.readlines():
        print(line.decode('euc-kr'))
        

 C 드라이브의 볼륨에는 이름이 없습니다.

 볼륨 일련 번호: 8E8F-89C6



 c:\Users\USER\Quick_Ref\Programming\Python\Python 중급\(박응용) 점프투파이썬_라이브러리 예제 편 디렉터리



2023-04-10  오후 01:24    <DIR>          .

2023-04-03  오후 02:42    <DIR>          ..

2023-04-03  오후 02:37                 0 00. 들어가기 전에.ipynb

2023-04-06  오전 08:19            20,151 01. 텍스트 다루기.ipynb

2023-04-04  오전 08:52             5,876 02. 바이너리 데이터 다루기.ipynb

2023-04-06  오전 08:19            85,213 03. 다양한 데이터 다루기.ipynb

2023-04-06  오전 08:19            22,928 04. 수학과 숫자 다루기.ipynb

2023-04-06  오후 03:29           122,893 05. 함수형 프로그래밍 다루기.ipynb

2023-04-07  오후 07:04            44,422 06 . 파일과 디렉터리 다루기.ipynb

2023-04-07  오후 07:04            80,182 07. 데이터 저장하고 관리하기.ipynb

2023-04-07  오후 07:04             7,707 08. 데이터 압축하고 보관하기.ipynb

2023-04-10  오전 10:33                 0 12. imsi.ipynb

2023-04-06  오후 05:57                 1 aest4.txt

2023-04-06  오후 05:59    <DIR>          archive

2023-04-07  오후 06:40            16,384 blog.db

2023-04-06  오후 02:57 

In [None]:
# 더 알아보기
# -----------
# subprocess.Popen

# subprocess.Popen을 사용하면 보다 세밀하게 시스템 명령을 사용할수 있다. 
# 기본적으로 subprocess.Popen을 사용하여 시스템 명령을 호출하면 백그라운드 프로세스로 실행된다. 
# 따라서 시스템 명령 호출시 hang이 걸리는 상황이 발생한다면 subprocess.Popen을 사용하여 세밀한 조작을 할 필요가 있다.

# subprocess.run은 백그라운드로 실행되지 않고 해당 명령이 종료될 때까지 대기한다.

# 예를 들어 어떤 명령을 수행했을때 10초간 기다려도 응답이 오지 않을 경우 해당 프로세스를 종료하고 싶다면 다음과 같이 코딩할 수 있다.

# proc = subprocess.Popen(...)
# try:
#     out, errs = proc.communicate(timeout=10)
# except subprocess.TimeoutExpired:
#     proc.kill()

# subprocess.Popen으로 생성된 객체에 communicate 함수를 사용하면 해당 명령이 종료될 때까지 대기한다. 
# 이 때 timout 속성에 시간(초)을 지정할수 있다.

# communicate 함수에 의해 반환된 out은 표준 출력객체로 out.stdout으로 그 값을 구할수 있다.

# 참고
# subprocess - 서브 프로세스 관리: https://docs.python.org/ko/3/library/subprocess.html
# 동영상 - https://youtube.com/shorts/ibcJTnPVPh8?feature=share

In [48]:
# def subprocess_open(command):
#     popen = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
#     (stdoutdata, stderrdata) = popen.communicate()
#     return stdoutdata, stderrdata

import subprocess

with open('out1.txt', 'wb') as f:
    popen = subprocess.Popen('dir', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    try:
        (stdoutdata, stderrdata) = popen.communicate(timeout=10)
        f.write(stdoutdata)
        f.write(stderrdata)
    except subprocess.TimeoutExpired:
        popen.kill()        
    

with open('out1.txt', 'rb') as f:
    for line in f.readlines():
        print(line.decode('euc-kr'))
        



# proc = subprocess.Popen(...)
# try:
#     out, errs = proc.communicate(timeout=10)
# except subprocess.TimeoutExpired:
#     proc.kill()


 C 드라이브의 볼륨에는 이름이 없습니다.

 볼륨 일련 번호: 8E8F-89C6



 c:\Users\USER\Quick_Ref\Programming\Python\Python 중급\(박응용) 점프투파이썬_라이브러리 예제 편 디렉터리



2023-04-10  오후 01:49    <DIR>          .

2023-04-03  오후 02:42    <DIR>          ..

2023-04-03  오후 02:37                 0 00. 들어가기 전에.ipynb

2023-04-06  오전 08:19            20,151 01. 텍스트 다루기.ipynb

2023-04-04  오전 08:52             5,876 02. 바이너리 데이터 다루기.ipynb

2023-04-06  오전 08:19            85,213 03. 다양한 데이터 다루기.ipynb

2023-04-06  오전 08:19            22,928 04. 수학과 숫자 다루기.ipynb

2023-04-06  오후 03:29           122,893 05. 함수형 프로그래밍 다루기.ipynb

2023-04-07  오후 07:04            44,422 06 . 파일과 디렉터리 다루기.ipynb

2023-04-07  오후 07:04            80,182 07. 데이터 저장하고 관리하기.ipynb

2023-04-07  오후 07:04             7,707 08. 데이터 압축하고 보관하기.ipynb

2023-04-10  오전 10:33                 0 12. imsi.ipynb

2023-04-06  오후 05:57                 1 aest4.txt

2023-04-06  오후 05:59    <DIR>          archive

2023-04-07  오후 06:40            16,384 blog.db

2023-04-06  오후 02:57 

## 070 원하는 작업을 원하는 시간에 실행하려면? ― sched


In [None]:
# 070 원하는 작업을 원하는 시간에 실행하려면? ― sched
# sched는 지정된 시간에 원하는 이벤트를 실행하게 하는 이벤트 스케줄러 모듈이다.

# 문제
# 먼저 다음 프로그램을 살펴보자.

# def print_a(a):
#     print(a)


# def print_b(b):
#     print(b)


# def print_c(c):
#     print(c)


# print_a("A")
# print_b("B")
# print_c("C")

# print_a, print_b, print_c 함수를 한 번씩 호출하는 간단한 프로그램이다. 이 프로그램을 다음과 같이 동작하도록 수정하려면 어떻게 해야 할까?

# 1. 프로그램 실행 후 5초 후에 print_a() 호출
# 2. 프로그램 실행 후 3초 후에 print_b() 호출
# 3. 프로그램 실행 후 7초 후에 print_c() 호출


In [54]:
import time

def print_a(a):
    print('Curr Time: ', time.strftime('%Y-%m-%d %H:%M:%S'))
    print(a)

def print_b(b):
    print('Curr Time: ', time.strftime('%Y-%m-%d %H:%M:%S'))
    print(b)
    
def print_c(c):
    print('Curr Time: ', time.strftime('%Y-%m-%d %H:%M:%S'))
    print(c) 
    
time.sleep(5)
print_a('waiting for 5sec and print A')           

time.sleep(3)
print_b('waiting for 5sec and print B')           

time.sleep(7)
print_c('waiting for 5sec and print C')           

Curr Time:  2023-04-10 14:11:55
waiting for 5sec and print A
Curr Time:  2023-04-10 14:11:58
waiting for 5sec and print B
Curr Time:  2023-04-10 14:12:06
waiting for 5sec and print C


In [None]:
# 풀이
# 스레드 실행을 잠시 중지하는 time.sleep()을 사용해서 이 문제를 풀 수도 있지만, 
# 이벤트 방식으로 스케줄을 만들어야 한다면 sched 모듈을 사용하는 것이 가장 편리하다.

# 다음은 sched 모듈을 사용한 문제 풀이이다.

# [파일명: sched_sample.py]

# import time
# import sched

# start = time.time()


# def print_a(a):
#     print(time.time() - start)
#     print(a)


# def print_b(b):
#     print(time.time() - start)
#     print(b)


# def print_c(c):
#     print(time.time() - start)
#     print(c)


# s = sched.scheduler()
# s.enter(5, 1, print_a, ('A',))  # 5초 후에 실행
# s.enter(3, 1, print_b, ('B',))  # 3초 후에 실행
# s.enter(7, 1, print_c, ('C',))  # 7초 후에 실행
# s.run()

# sched.scheduler()로 스케줄러 객체 s를 생성한 후 s.enter()로 실행할 이벤트를 등록하면 된다. 
# 이 함수에는 모두 4개의 매개변수를 지정했는데, enter() 함수의 첫 번째 매개변수는 delay 시간(초)을 의미한다. 
# 두 번째 매개변수는 우선순위(priority)로, 같은 delay 시간에 1개 이상의 이벤트를 등록할 때 우선순위가 높은 것(낮은 숫자가 우선순위가 높음)부터 시작한다. 
# 세 번째 매개변수는 실행할 함수이고 네 번째 매개변수는 실행할 함수에 전달할 인수이다.

# s.enter()로 이벤트를 등록한 후 s.run()으로 스케줄을 시작한다. 그리고 스케줄이 제대로 동작하는지 확인하고자 time 모듈을 사용하여 함수 호출까지 소요된 시간을 출력했다.

# 프로그램을 실행한 결과는 다음과 같다.

# c:\projects\pylib>python sched_sample.py
# 3.0093700885772705
# B
# 5.009588956832886
# A
# 7.0107176303863525
# C
# 프로그램을 실행하고 3초 후에 B가 실행되고 그리고 2초 후(총 5초 경과 후)에 A, 그리고 또 2초 후(총 7초 경과 후)에 C가 출력되는 것을 확인할 수 있다.

# 참고
# sched - 이벤트 스케줄러: https://docs.python.org/ko/3/library/sched.html

In [55]:
import time
import sched

start = time.time()


def print_a(a):
    print(time.time() - start)
    print(a)


def print_b(b):
    print(time.time() - start)
    print(b)


def print_c(c):
    print(time.time() - start)
    print(c)


s = sched.scheduler()
s.enter(5, 1, print_a, ('A',))  # 5초 후에 실행
s.enter(3, 1, print_b, ('B',))  # 3초 후에 실행
s.enter(7, 1, print_c, ('C',))  # 7초 후에 실행
s.run()


3.0007519721984863
B
5.001099109649658
A
7.001610994338989
C
