In [9]:
import multiprocessing
import pandas as pd
import numpy as np
import subprocess
import argparse
import time
import json
import tqdm
import sys
import os

# Extract windows around each base and return a dataframe.
def windows(chunk, queue, counter):
    pid = str(os.getpid())
    filename = f'{pid}.txt'
    subprocess.run(['strace', '-p', pid, '-o', filename])
    
    data = chunk['data']
    window = chunk['window']
    position = chunk['position']
    
    print (f'Process {position} Working')

    # Radius is half the window size rounded down. So 51 is 25. 
    radius = int((window - 1)/2)
    # Skip the first radius rows, and the last radius rows since we can't get
    # full windows there anyways. 
    iterator = range(radius, len(data.index) - radius - 1)

    # Update progress bar after this many rows. 
    interval = 200
    vectors = [None] * len(data.index)

    last = time.time()
    for i in iterator:
        # Find the window. 
        start = i - radius
        end = i + radius + 1
        section = data.iloc[start:end]

        start_position = data.index[start][1]
        end_position = data.index[end][1]

        # Checks if rows are contiguous.
        if (abs(end_position - start_position) > window):
            continue

#         if time.time() - last > 1:
#             total = 0
#             for item in vectors:
#                 total += sys.getsizeof(item)
#             print (f'Process {position}: {total} bytes')
#             last = time.time()
        
        # # Flattens the section of the table using column first Fortran method.
        vector = section.to_numpy().flatten(order = 'F')
        vectors[i] = vector
    print (f'Process {position} Done')

    # Create the dataframe here so we can just concatenate later. 
    vectors = pd.DataFrame({'vectors': vectors}, index = data.index)
    queue.put((position, vectors))

# Divide the table into the number of cores available so we can use the 
# multiprocessing package. 
def chunking(data, window):
    radius = int((window - 1)/2)

    # The chunk size is the number of rows assigned to each core. We never want 
    # to have left over rows so we add one. 
    num_processes = multiprocessing.cpu_count()
    chunk_size = int(len(data)/num_processes) + 1

    chunks = []
    position = 0
    total = 0
    for i in range(0, len(data), chunk_size):
        start = max(i - radius, 0)
        end = i + chunk_size + radius + 1
        section = data.iloc[start:end]
        total  += len(section) - radius - 2

        chunks.append({
            'data': section,
            'window': window,
            'position': position
        })
        position += 1

    return chunks, total

In [3]:
data = pd.read_hdf('/active/myler_p/People/Sur/software/dna-modification-detection/data/interm/new_data.h5')
chunks, total = chunking(data[['top_A','top_T','top_C','top_G','top_ipd','bottom_ipd']], 51)

In [10]:
# We then send a set number of jobs equal to the number of cores. Each
# process loads its data into a queue and we dequeue as they come into it. 
print (f'Using {len(chunks)} Cores')
counter = multiprocessing.Value('i', 0)
queue = multiprocessing.Queue()

processes = []
for i in range(len(chunks)):
    process = multiprocessing.Process(target = windows, args = (chunks[i], queue, counter))
    processes.append(process)
    process.start()

i = 0
while queue.empty():
    time.sleep(1)
    dead = 0
    for process in processes:
        if not process.is_alive():
            dead += 1
    print (f'{dead} Dead after {i} Seconds')
    i += 1


Using 28 Cores
0 Dead after 0 Seconds
0 Dead after 1 Seconds
0 Dead after 2 Seconds
0 Dead after 3 Seconds
0 Dead after 4 Seconds
0 Dead after 5 Seconds
0 Dead after 6 Seconds
0 Dead after 7 Seconds
0 Dead after 8 Seconds
0 Dead after 9 Seconds
0 Dead after 10 Seconds
0 Dead after 11 Seconds
0 Dead after 12 Seconds
0 Dead after 13 Seconds
0 Dead after 14 Seconds
0 Dead after 15 Seconds
0 Dead after 16 Seconds
0 Dead after 17 Seconds
0 Dead after 18 Seconds
0 Dead after 19 Seconds
0 Dead after 20 Seconds
0 Dead after 21 Seconds
0 Dead after 22 Seconds
0 Dead after 23 Seconds
0 Dead after 24 Seconds
0 Dead after 25 Seconds
0 Dead after 26 Seconds
0 Dead after 27 Seconds
0 Dead after 28 Seconds
0 Dead after 29 Seconds
0 Dead after 30 Seconds
0 Dead after 31 Seconds
0 Dead after 32 Seconds
0 Dead after 33 Seconds
0 Dead after 34 Seconds
0 Dead after 35 Seconds
0 Dead after 36 Seconds
0 Dead after 37 Seconds
0 Dead after 38 Seconds
0 Dead after 39 Seconds
0 Dead after 40 Seconds
0 Dead afte

Process Process-60:
Traceback (most recent call last):


0 Dead after 102 Seconds


Process Process-81:


KeyboardInterrupt: 

Process Process-58:
Process Process-66:
Process Process-63:
Process Process-64:
Process Process-59:
Process Process-65:
Process Process-84:
Process Process-80:
Process Process-73:
Process Process-71:
Process Process-79:
Process Process-75:
Process Process-62:
Process Process-68:
Process Process-78:
Process Process-83:
Process Process-74:
Process Process-72:
Process Process-82:
Process Process-77:
Process Process-61:
Process Process-69:
Process Process-70:
Process Process-76:
Process Process-67:
Process Process-57:
  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Tra

  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "<ipython-input-9-65e2be448226>", line 16, in windows
    subprocess.run(['strace', '-p', pid, '-o', filename])
  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-9-65e2be448226>", line 16, in windows
    subprocess.run(['strace', '-p', pid, '-o', filename])
  File "/gpfs/home/asur/.python/pyt

  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/subprocess.py", line 956, in communicate
    self.wait()
  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/subprocess.py", line 956, in communicate
    self.wait()
  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/subprocess.py", line 490, in run
    stdout, stderr = process.communicate(input, timeout=timeout)
  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/subprocess.py", line 490, in run
    stdout, stderr = process.communicate(input, timeout=timeout)
  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/subprocess.py", line 956, in communicate
    self.wait()
  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/subprocess.py", line 956, in communicate
    self.wait()
  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/subprocess.py", line 956, in communicate
    self.wait()
  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/subprocess.py", line 490, in run
    stdout, st

  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/subprocess.py", line 1653, in _wait
    (pid, sts) = self._try_wait(0)
  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/subprocess.py", line 1653, in _wait
    (pid, sts) = self._try_wait(0)
  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/subprocess.py", line 1653, in _wait
    (pid, sts) = self._try_wait(0)
  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/subprocess.py", line 1611, in _try_wait
    (pid, sts) = os.waitpid(self.pid, wait_flags)
  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/subprocess.py", line 1653, in _wait
    (pid, sts) = self._try_wait(0)
  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/subprocess.py", line 1019, in wait
    return self._wait(timeout=timeout)
  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/subprocess.py", line 1019, in wait
    return self._wait(timeout=timeout)
  File "/gpfs/home/asur/.python/python-3.7.5/lib/python3.7/subpro