<a href="https://colab.research.google.com/github/ottohruby/pzp-proj/blob/main/projekt.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
import time
from multiprocessing import Queue, Process, cpu_count
from collections import defaultdict
from math import inf

!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!pip install findspark
!pip install py4j
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"
import findspark
findspark.init()
print("Initialization Finished.")
# !cat /proc/meminfo
# !cat /proc/cpuinfo

--2022-11-30 17:12:52--  https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
Resolving downloads.apache.org (downloads.apache.org)... 88.99.95.219, 135.181.214.104, 2a01:4f9:3a:2c57::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|88.99.95.219|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 299350810 (285M) [application/x-gzip]
Saving to: ‘spark-3.3.1-bin-hadoop3.tgz.1’


2022-11-30 17:12:54 (102 MB/s) - ‘spark-3.3.1-bin-hadoop3.tgz.1’ saved [299350810/299350810]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Initialization Finished.


In [3]:
from google.colab import drive
drive.mount('/content/drive/')

FOLDER_PATH = os.path.join('/content/drive/My Drive/pzp-projekt')
DATA_FILE_PATH = os.path.join(FOLDER_PATH, 'data.txt')
STOP_WORDS_FILE_PATH = os.path.join(FOLDER_PATH, 'stop_words.txt')

MIN_LENGTH = 4
MAX_LENGTH = 8

Mounted at /content/drive/


# Úkoly:
● Načíst textový soubor data.txt,

● načíst textový soubor stop_words.txt,

● zpracovávat datový soubor data.txt po slovech,

● odfiltrovat slova s počtem znaků větším než 8 a menší než 4,

● odfiltrovat slova podle stop slov ze stop_words.txt,

● spočítat statistiky – nejfrekventovanější slovo a počet jeho výskytů, nejméně frekventované
slovo a počet jeho výskytů, celkový počet slov po filtracích,

● výsledný algoritmus by měl být co nejefektivnější,

● výsledky časů zpracování jednotlivými algoritmy (CPU, GPU, ....) vykreslete do koláčového
grafu (použijte knihovnu Matplotlib),

● všechny požadované výstupy vypište do konzole a výsledky srovnejte.

# 1. CPU – jedno vláknový algoritmus

In [9]:
def word_counts_from_file(file):
    words_dict = defaultdict(int)
    try:
        with(open(file, 'r', encoding='utf-8-sig')) as f: #, 
            for line in f:
                for word in line.lower().split():
                    words_dict[word] += 1

    except FileNotFoundError:
        print("File not found")
    except OSError:
        print("OS error occured")

    return words_dict

def filter_stop_words(input_dict, filter_dict):
    for key in filter_dict:
        input_dict.pop(key, None)
    return input_dict 

def filter_words_length(input_dict, min_length, max_length):
    input_dict = {key: input_dict[key] 
                  for key in input_dict if min_length <= len(key) <= max_length}
    return input_dict

def describe_dictionary(input_dict):
    max = []
    max_val = 0
    min = []
    min_val = inf
    count_unique = 0
    count_all = 0
    for key in input_dict:
        count_unique += 1
        count_all += input_dict[key]
        if input_dict[key] >= max_val:
            if input_dict[key] != max_val:
                max.clear()
            max.append(key)
            max_val = input_dict[key]

        if input_dict[key] <= min_val:
            if input_dict[key] != min_val:
                min.clear()
            min.append(key)
            min_val = input_dict[key]

    print(count_unique, count_all, max, max_val, min, min_val)
    
start = time.time()
stop_words = word_counts_from_file(STOP_WORDS_FILE_PATH)
words = word_counts_from_file(DATA_FILE_PATH)
words = filter_stop_words(words, stop_words)
words = filter_words_length(words, MIN_LENGTH, MAX_LENGTH)
describe_dictionary(words)
time_cpu_single = round(time.time()-start, 4)
print(f"CPU Processing time: {time_cpu_single} s")


CPU Processing time: 0.0739 s


# 2. CPU - více vláknový algoritmus
● Zpracovávejte paralelně datový soubor.

● Pro výpočet využijte všechna dostupná CPU jádra.

In [8]:
def read_file_job(queue):
    try:
        with(open(DATA_FILE_PATH, 'r', encoding='utf-8-sig')) as f:
            for line in f:
                queue.put(line)

    except FileNotFoundError:
        print("File not found")
    except OSError:
        print("OS error occured")
    finally:
        for _ in range(MAX_PROCESSES):
            queue.put(None)

def worker(queue, dictionary_queue, stop_words, min_length, max_length):
    while True :
        item = queue.get()
        if item == None:
            dictionary_queue.put(None)
            break

        for word in item.lower().split():
            if word not in stop_words:
              if min_length <= len(word) <= max_length:
                dictionary_queue.put(word)


def result_job(dictionary_queue):
    res = defaultdict(int)
    counter = 0
    while True :
        item = dictionary_queue.get()
        if item == None:
            counter += 1
            if counter == MAX_PROCESSES:
                break
        else:
            res[item] += 1
        

In [None]:
MAX_PROCESSES = cpu_count()
queue = Queue()
dictionary_queue = Queue()
    
start = time.time()   
file_process = Process(target=read_file_job, daemon=True, args=(queue, ))
file_process.start()
stop_words = word_counts_from_file(STOP_WORDS_FILE_PATH)

processes = []
for i in range(MAX_PROCESSES):
  p = Process(target=worker, daemon=True, args=(queue, dictionary_queue, stop_words, MIN_LENGTH, MAX_LENGTH ))
  p.start()
  processes.append(p)

result_process = Process(target=result_job, daemon=True, args=(dictionary_queue,))
result_process.start()

file_process.join()

for process in processes:
  process.join()

result_process.join()
time_cpu_mult = round(time.time()-start, 4)
print(f"CPU - mult Processing time: {time_cpu_mult} s")

CPU - mult Processing time: 1.835 s


# 3. GPU verze

● GPU neumí jednoduše pracovat se datovým typem string, proto zde bude odlišný
způsob výpočtu. Slova z datového souboru i ze stop slov musí přemapovat na číselné
hodnoty – vytvořte si pomocný slovník (pro data.txt), kde budete mít uloženo vždy
název slova a k němu zvolte jeho číslo id, aby jste dokázali s pomocí id (integer) zpětně
mapovat název slova. Dále takto přemapujte textové soubory na vstupní vektory typu
int, které již můžete zpracovat na GPU.

● Odfiltrování slov podle počtu znaků se nebude v provádět na GPU (moc složité)
, ale
tuto filtraci proveďte před nahráním dat na GPU.

● Odfiltrování podle stop slov bude provedeno na GPU.

In [None]:
!pip install pycuda
from __future__ import division
import numpy as np
from pycuda.compiler import SourceModule
import pycuda.autoinit
from pycuda import gpuarray
import pycuda.driver as cuda
import numpy

cuda.init()
dev = cuda.Device(0)
ctx = dev.make_context()

# input words, stop words, result
AtomicCode='''
__global__ void atomic_ker(const int *input, int *add_out, int *max_out, const int N)
{
    int idx = blockIdx.x * blockDim.x + threadIdx.x;

    if(idx < N){
        atomicAdd(add_out, input[idx]);
        atomicMax(&max_out[0], input[idx]);
    }
}
'''

numElements = int(1e4)
BLOCK_SIZE = 256
gridDim = int(numElements/BLOCK_SIZE + 1)

atomic_mod = SourceModule(AtomicCode)
atomic_ker = atomic_mod.get_function('atomic_ker')

input = numpy.random.randint(low = 0, high = 100, size = numElements).astype(numpy.int32)
add_out = numpy.zeros(1).astype(numpy.int32)
max_out = numpy.zeros(1).astype(numpy.int32)

input_gpu = cuda.mem_alloc(input.nbytes)
add_out_gpu = cuda.mem_alloc(add_out.nbytes)
max_out_gpu = cuda.mem_alloc(max_out.nbytes)

cuda.memcpy_htod(input_gpu, input)

atomic_ker(input_gpu, add_out_gpu, max_out_gpu, numpy.int32(numElements), block=(BLOCK_SIZE,1,1), grid=(gridDim,1,1))
ctx.synchronize()

cuda.memcpy_dtoh(add_out, add_out_gpu)
cuda.memcpy_dtoh(max_out, max_out_gpu)

input_gpu.free()
add_out_gpu.free()
max_out_gpu.free()

print("Atomic operations test:")
print("add_out: {}".format(add_out[0]))
print("max_out: {}".format(max_out[0]))

# 4. Apache spark verze
● Využijte všechna dostupná CPU jádra.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, desc, length, col, lower

spark = SparkSession.builder.master("local[*]").config("spark.executor.memory", "32g").config("spark.driver.memory", "32g").appName("Project").getOrCreate()
start = time.time()
# STOP WORDS
# Read file by lines
stopDF = spark.read.text(STOP_WORDS_FILE_PATH).select('value')
# Split words in line by ' '
stopDF = stopDF.select(split(lower(stopDF.value), ' ').alias('split'))
# Add own row to every word
stopWordsDF = stopDF.select(explode(stopDF.split).alias('words'))
#stopWordsDF.show()

# WORDS
# Read file by lines
linesDF = spark.read.text(DATA_FILE_PATH).select('value')
#linesDF.show(truncate=False)
# Split words in line by ' '
linesDF = linesDF.select(split(lower(linesDF.value), ' ').alias('split'))
#linesDF.show(truncate=False)
# Add own row to every word
wordsDF = linesDF.select(explode(linesDF.split).alias('words'))
#wordsDF.show()
# Filter empty lines
wordsDF = wordsDF.filter(MAX_LENGTH >= length(col('words')))
wordsDF = wordsDF.filter(MIN_LENGTH <= length(col('words')))
#wordsDF.show()
# Filter by stop words
wordsDF = wordsDF.join(stopWordsDF, ["words"], "leftanti")
# Group according to words and count same words
wordCountDF = wordsDF.groupBy("words").count()
#wordCountDF.show()
# Order words by occurencies in descending order
#wordCountDF = wordCountDF.sort(col("count").asc())
#wordCountDF.show()
#
wordsSumDF = wordCountDF.agg({'count': 'count'})
#wordsSumDF.show()
# Print the number of all the words occurences
wordsSumDF = wordCountDF.agg({'count': 'sum'})
#wordsSumDF.show()

time_spark = round(time.time()-start, 4)
print(f"Spark Processing time: {time_spark} s")
spark.stop()

Spark Processing time: 0.2095 s


In [None]:
# By Calling function
def func1(x):
    return (x.words)

rdd2=wordsDF.rdd.map(lambda x: func1(x))
dataCol1 = rdd2.collect()
a = 0
for row in dataCol1:
    if(words.get(row, 0) == 0):
      a = a + 1
      words.pop("row", None)
      print(repr(row))
print(a)
print(words)

0
