In [1]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing as mp
from multiprocessing import Pool, Value, Array, Process, current_process
import time
import os
import numpy as np
import random

In [2]:


def read_4bytes(filename):
    numbers = []
    with open(filename, 'br') as f:
        data = f.read(4)
        while data:
            number = int.from_bytes(data, "big", signed=True)
            numbers.append(number)
            data = f.read(4)
    return numbers



In [3]:
def count_frequency(x): 
    freq = {} 
    for item in x: 
        if (item in freq): 
            freq[item] += 1
        else: 
            freq[item] = 1
    return freq

In [4]:
def random_choices(x, tries):
    sequence = []
    while len(sequence) < tries:
        sequence.append(random.choice(x))
    return sequence

In [5]:
def make_subsample(num_items, x):
    resulting_sequence = []
    weights = []
    while num_items:
        number_of_tries = 1000000
        sequence = random_choices(x, number_of_tries)
        freq = count_frequency(sequence)
        highest = max(freq.values())
        for key, value in freq.items():
            if value == highest:
                resulting_sequence.append(key)
                weights.append(value / number_of_tries)
                x.remove(key)
                break ;
        random.shuffle(x)
        num_items -= 1
    return resulting_sequence


In [6]:
def check_the_pid(parameters):
    filename, N = parameters
    start_time = time.time()
    x  = read_4bytes(filename)
    time_for_reading = start_time - time.time()
    subsample = make_subsample(N, x)
    time_for_sampling = time_for_reading - time.time()
    pid = int(os.getpid())
    proc_file = str(current_process().name)+"-"+ str(pid)+ ".txt"
    f = open(proc_file, "a+")
    f.write("Process ID : %d\nParent ID : %d\n" % (pid, os.getppid()))
    res = []
    for i in subsample:
        f.write("%d\n" % (i))
        if i % (3 * pid) == 0:
            res.append(i)
    for i in res: f.write("%s\nФ" % (i))
    f.write("Execution time: %d\nTime spent for reading file: %d\nTime spent fot sampling: %d\n"
            % (start_time - time.time(), time_for_reading, time_for_sampling))
    f.close()


In [7]:
def check_the_pid_(args):
    return check_the_pid(*args)

In [8]:
def several_files():
    files = []
    variant = int(input("Press 1 if you want to use 1 file, press 2 - if different"))
    if variant == 2:
        num = int(input("How many files to use? "))
        for i in range(num):
            files.append(str(input("Enter file name : ")))
    if variant == 1:
        files.append(str(input("Enter file name : ")))
    else:
        print("Invalid input")
        exit()
    return files

In [39]:
%%time
import multiprocessing
from multiprocessing import Pool
from functools import partial
PROCESSES = 8
N = 100
files = ['example'] #, 'example', 'example']#several_files()
parameters = [(i, N) for i in files]
if len(files) > 1: PROCESSES = len(files)
if len(files) == 1: parameters *= PROCESSES
print(parameters)
fp = partial(check_the_pid, N=N)
with multiprocessing.Pool(PROCESSES) as pool:
    pool.map(check_the_pid, parameters)

[('example', 100), ('example', 100), ('example', 100), ('example', 100), ('example', 100), ('example', 100), ('example', 100), ('example', 100)]
CPU times: user 198 ms, sys: 71.3 ms, total: 269 ms
Wall time: 4min 54s


In [None]:
%%time
for i in range(PROCESSES):
    check_the_pid(parameters[i])

In [16]:
import _thread
import threading
from threading import current_thread, Lock

In [None]:


class myThread(threading.Thread):
    def __init__(self, threadID, parameters, lock):
        threading.Thread.__init__(self)
        #self.threadID = threadID
        self.name = "Thread-" + str(threadID)
        self.filename, self.number = parameters
        self.lock = lock
    def run(self):
        print ("Starting " + self.name)
        _thread_task(self.filename, self.number, self.lock)
        print ("Exiting " + self.name)

def _thread_task(filename, N, lock):
    lock.acquire() 
    filtering(filename, N)
    lock.release()



In [17]:
def filtering(_id, filename, N):
    start_time = time.time()
    x  = read_4bytes(filename)
    time_for_reading = time.time() - start_time
    subsample = make_subsample(N, x)
    time_for_sampling = time.time() - time_for_reading
    thread_file = (str("Thread-")+str(_id)+".txt")
    f = open(thread_file, "a+")
    f.write("Process ID : %d\nParent ID : %d\n" % (_id,os.getppid()))
    res = []
    for i in subsample:
        f.write("%d\n" % (i))
        if i % (3 * int(_id)) == 0:
            res.append(i)
    for i in res: f.write("%d\n" % (i))
    f.write("Execution time: %s\nTime spent for reading file: %s\nTime spent fot sampling: %s\n"
            % (str(time.time() - start_time ), str(time_for_reading), str(time_for_sampling)))
    f.close()

In [34]:
def thread_task(filename, N):
    #filename, N = parameters
    _id = threading.current_thread().ident
    print("Started Thread-"+str(_id)+ "...")
    filtering(_id, filename, N)
    print("Ended Thread-"+str(_id)+ "...")
    #name = "Thread-" + str(_id)
    #name_id = (name, _id)
    

In [37]:
def create_thread_in_process(parameters):
    filename, N = parameters
    thread = threading.Thread(target=thread_task, args=(filename, N))
    thread.start()
    thread.join()

In [38]:
%%time
#def creating_threads():
THREADS = 8
N = 100
files = ['example'] #several_files()
parameters = [(i, N) for i in files]
if len(files) > 1: THREADS = len(files)
if len(files) == 1: parameters *= THREADS

# Create new threads
#name = "Thread-"
#lock = threading.Lock() 

with multiprocessing.Pool(THREADS) as pool:
    pool.map(create_thread_in_process, parameters)
    
#with ThreadPoolExecutor(max_workers=THREADS) as executor:
#    executor.map(thread_task, parameters)
    
#threads = [myThread(threadID, parameter, lock)
#           for threadID, parameter in enumerate(parameters)]
#
#for thread in threads:
#    thread.start()
#for parameter in parameters:
#    thread = myThread(threadID, parameter, lock)
#    
#    threads.append(thread)
#    threadID += 1
#
#for thread in threads:
#    thread.join()

print ("Exiting Main Thread")

Started Thread-140418830288640...Started Thread-140418830288640...Started Thread-140418830288640...Started Thread-140418830288640...Started Thread-140418830288640...Started Thread-140418830288640...Started Thread-140418830288640...



Started Thread-140418830288640...



Ended Thread-140418830288640...
Ended Thread-140418830288640...
Ended Thread-140418830288640...
Ended Thread-140418830288640...
Ended Thread-140418830288640...
Ended Thread-140418830288640...
Ended Thread-140418830288640...
Ended Thread-140418830288640...
Exiting Main Thread
CPU times: user 166 ms, sys: 116 ms, total: 282 ms
Wall time: 4min 41s
