## Uppgift
Skriv ett parallelliserat program som går igenom ett antal listor med strängar (varje lista finns i en separat indatafil). Programmet ska producera en ny lista innehållande alla strängar ur indatalistorna som börjar med bokstaven 'r'. Den producerade listan ska vara fri från dubbletter. Utgå från att listorna skulle kunna vara väldigt långa (fastän din faktiska indata är liten). Du kan anta att strängarna i listorna är korta och antalet listor är litet samt att den slutgiltiga listan blir kort.
 

## Setup

In [1]:
import os
import pandas as pd
import multiprocessing as mp
import time
import subprocess

In [2]:
assetPath = "../assets/KBlistExtractorAssets/"
filenames = os.listdir(assetPath)
filenames

['utf8list3.txt',
 'utf8list2.txt',
 'utf8list0.txt',
 'utf8list6.txt',
 'MOCKutf8list12.txt',
 'utf8list8.txt',
 'MOCKutf8list10.txt',
 'utf8list4.txt',
 'utf8list1.txt',
 'utf8list7.txt',
 'MOCKutf8list14.txt',
 'MOCKutf8list13.txt',
 'MOCKutf8list11.txt',
 'utf8list9.txt',
 'utf8list5.txt']

## Functions

In [3]:
def preprocesser():
    with open(assetPath + 'merged.txt', 'wb') as outfile:
        for fname in filenames:
            with open(assetPath + fname, 'rb') as infile:
                for line in infile:
                    outfile.write(line)
#preprocesser()

In [4]:
def extract_data_pandas(file, character_to_filter_with = 'r',):
    filepath = assetPath + file
    file_data = pd.read_csv(filepath, names = ['strings'])
    filtered_data = file_data[file_data['strings'].str[0] == character_to_filter_with]
    return list(filtered_data['strings'].values)


In [5]:
def extract_data(file, character_to_filter_with = 'r',):
    filepath = assetPath + file
    
    strings = []
    
    with open(filepath, 'r', encoding ='utf-8') as f:
        for line in f:
            if line[0] == 'r':
                strings.append(line[:-1])

    return strings
extract_data(filenames[3])

['rgyvruqtppxufio', 'rgyvruqtppxufio', 'rxojwrpqtninpmt']

In [6]:
def extract_data_byte(file, character_to_filter_with = 'r',):
    filepath = assetPath + file
    
    strings = []
    
    with open(filepath, 'rb') as f:
        for line in f:
            if line[:1] == b'r':
                strings.append(line)
                
    return strings

In [7]:
def extract_data_byte_to_string(file, character_to_filter_with = 'r',):
    filepath = assetPath + file
    
    strings = []
    
    with open(filepath, 'rb') as f:
        for line in f:
            if line[:1] == b'r':
                strings.append(line.decode("utf-8")[:-1])
                
    return strings


In [8]:
def extract_data_byte_to_string_by_set(file, character_to_filter_with = 'r',):
    filepath = assetPath + file
    
    strings = set()
    
    with open(filepath, 'rb') as f:
        for line in f:
            if line[:1] == b'r':
                strings.update([line.decode("utf-8")[:-1]])
                
    return strings
#extract_data_byte_to_string_by_set(filenames[3])

In [9]:
def trivial(filenames, dataextraction = extract_data):
    results = []
    for file in filenames:
        filtered_data =  dataextraction(file)
        for string in filtered_data:
            results.append(string)

    return list(set(results))

In [10]:
def trivial_partial_set(filenames, dataextraction = extract_data_byte_to_string_by_set):
    results = set()
    for file in filenames:
        filtered_data =  dataextraction(file)
        results.update(filtered_data)

    return list(results)
trivial_partial_set(filenames)

['rxojwrpqtninpmt',
 'rxdadoolpdqcoih',
 'rgyvruqtppxufio',
 'ryinnpkkppfixjy',
 'rqeiqokxvnqqugv',
 'rsghlhjwdukrtqy',
 'rakxschavfumwqi',
 'rdfbljbxnjvotty',
 'rblkxwxoumiparc',
 'rilolmptmcjhhxn',
 'rfyegiaceaquwib',
 'rppoabafuatnxly',
 'rnyffrvrsluvssg',
 'roudtyqpixcjnfu']

In [11]:
 def parallel(filenames, dataextraction = extract_data):
    pool = mp.Pool()

    r = pool.map_async(dataextraction, filenames)


    results = []
    for partial_result in r.get():
        for string in partial_result:
            results.append(string)
    pool.close()
    pool.join()
    
    return list(set(results))

#parallel(filenames)

In [12]:
 def parallel_partial_set(filenames, dataextraction = extract_data_byte_to_string_by_set):
    pool = mp.Pool()

    r = pool.map_async(dataextraction, filenames)
    
    results = set()
    for partial_result in r.get():
        for string in partial_result:
            results.update([string])
    pool.close()
    pool.join()
    
    return list(results)

#parallel_partial_set(filenames)

In [13]:
def get_function_execution_time(iterations, function, *args):
    start_time = time.time()
    for i in range(iterations):
        function(*args)
    return (time.time() - start_time)/iterations


get_function_execution_time(1, trivial,filenames)

2.474238872528076

## Test execution time

In [14]:
iterations = 1


In [15]:
ex_speed = get_function_execution_time(iterations, trivial, filenames, extract_data_pandas)
print(f"Concurrent, pandas. \nTime: {ex_speed}")

ex_speed = get_function_execution_time(iterations, trivial, filenames, extract_data)
print(f"Concurrent, normal. \nTime: {ex_speed}")

ex_speed = get_function_execution_time(iterations, trivial, filenames, extract_data_byte)
print(f"Concurrent, bytes. \nTime: {ex_speed}")

ex_speed = get_function_execution_time(iterations, trivial, filenames, extract_data_byte_to_string)
print(f"Concurrent, bytes->string. \nTime: {ex_speed}")

ex_speed = get_function_execution_time(iterations, trivial_partial_set, filenames, extract_data_byte_to_string_by_set)
print(f"Concurrent, bytes->string. using sets instead of concatenating lists\nTime: {ex_speed}")

#ex_speed = get_function_execution_time(iterations, trivial, ['merged.txt'], extract_data_byte_to_string)
#print(f"Concurrent, bytes->string with preprocessing. \nTime: {ex_speed}")

Concurrent, pandas. 
Time: 4.7372682094573975
Concurrent, normal. 
Time: 2.3750832080841064
Concurrent, bytes. 
Time: 0.9587936401367188
Concurrent, bytes->string. 
Time: 0.9404957294464111
Concurrent, bytes->string. using sets instead of concatenating lists
Time: 0.9525682926177979


In [16]:
ex_speed = get_function_execution_time(iterations, parallel, filenames, extract_data_pandas)
print(f"parallel, pandas. \nTime: {ex_speed}")

ex_speed = get_function_execution_time(iterations, parallel, filenames, extract_data)
print(f"parallel, normal. \nTime: {ex_speed}")

ex_speed = get_function_execution_time(iterations, parallel, filenames, extract_data_byte)
print(f"parallel, bytes. \nTime: {ex_speed}")
    
ex_speed = get_function_execution_time(iterations, parallel, filenames, extract_data_byte_to_string)
print(f"parallel, bytes->string. \nTime: {ex_speed}")

ex_speed = get_function_execution_time(iterations, parallel_partial_set, filenames, extract_data_byte_to_string_by_set)
print(f"Parallel, bytes->string. using sets instead of concatenating lists\nTime: {ex_speed}")

parallel, pandas. 
Time: 1.2301671504974365
parallel, normal. 
Time: 0.6332626342773438
parallel, bytes. 
Time: 0.33527398109436035
parallel, bytes->string. 
Time: 0.33889126777648926
Parallel, bytes->string. using sets instead of concatenating lists
Time: 0.33441853523254395


## Augment data


In [17]:
# Increase the amount of data in the files
def create_fake_data():
    possible_strings = []
    for file in filenames:
        filepath = assetPath + file
        with open(filepath, 'r') as f:
            for line in f:
                possible_strings.append(line[:-1])

    long_list = possible_strings
    for i in range(15):
        long_list.extend(long_list)
    print(f"Created list of length {len(long_list)}")
    
    mock_data = [
    "MOCKutf8list10.txt",
    "MOCKutf8list11.txt",
    "MOCKutf8list12.txt",
    "MOCKutf8list13.txt",
    "MOCKutf8list14.txt"
    ]

    for name in mock_data:
        with open(assetPath + name, 'w') as f:
            print(f"wrote {name}")
            f.writelines(long_list)
#create_fake_data()

## Check amount of running processes

In [18]:
bashCommand = "cat /proc/sys/fs/file-nr"
process = subprocess.Popen(bashCommand.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
print(output.decode('utf-8'))

9465	0	1617430



## Notebook to python

In [19]:
#!jupyter nbconvert --to script KBlistExtractor.ipynb

[NbConvertApp] Converting notebook KBlistExtractor.ipynb to script
[NbConvertApp] Writing 7276 bytes to KBlistExtractor.py
