<a href="https://colab.research.google.com/github/lukaszplust/Projects/blob/main/moje_sbd.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [392]:
import os
import itertools
import math
from random import randint
import pdb

In [6]:
# BUFFER_SIZE - określa liczbę rekordów, które będą przechowywane w buforze w jednym momencie.
# Buforowanie poprawia wydajność operacji wejścia-wyjścia, minimalizując liczbę operacji odczytu i zapisu na dysk poprzez grupowanie ich w większe bloki.

# zakładam, ze blok to 512 bajtów, wiec wychodzi po 32 rekordy
BUFFER_SIZE = 32
#dlaczego rekord to 16 bitów?
# to chyba dlatego co jest nizej

# TO NADANE PRZEZ POLECENIE
# SET_BYTES_SIZE - określa rozmiar w bajtach pojedynczego rekordu bez dodatkowych znaków.
SET_BYTES_SIZE = 15

# Ta stała określa rozmiar w bajtach pojedynczego rekordu, włączając dodatkowy znak (nowa linia lub znak końca rekordu).
# Zazwyczaj jest to znak '\n' (nowa linia), który oddziela rekordy w pliku tekstowym
RECORD_BYTES_SIZE = SET_BYTES_SIZE + 1

# Ta stała określa całkowity rozmiar bufora w bajtach.
# Jest to iloczyn liczby rekordów w buforze (BUFFER_SIZE) i rozmiaru jednego rekordu w bajtach (RECORD_BYTES_SIZE).
# Bufor o takim rozmiarze będzie używany do operacji odczytu i zapisu blokowego, co zwiększa efektywność przez minimalizację liczby operacji I/O
BYTES_BUFFER_SIZE = BUFFER_SIZE * RECORD_BYTES_SIZE

In [194]:
class Record:

  def __init__(self, elements):
    self.elements = elements

  def __repr__(self):
    return f"{sorted(self.elements, reverse=True)}"

  @staticmethod
  def load_from_ints(ints):
    return Record(ints[1: ints[0] + 1])


  def save_to_ints(self):
    return [len(self.elements)] + self.elements + [0] * (RECORD_BYTES_SIZE - len(self.elements) - 1)

  def __lt__(self, other):
    if other is None:
        return True

    self_copy = set(self.elements)
    other_copy = set(other.elements)

    unique_self = self_copy - other_copy
    unique_other = other_copy - self_copy

    if not unique_other:
        return False
    if not unique_self:
        return True

    return max(unique_other) > max(unique_self)

In [601]:
class ReadBuffer:

  def __init__(self, path):

    self.path = path
    self.read_possition = 0
    self.file_possition = 0
    self.size = BUFFER_SIZE
    self.file_size = os.path.getsize(path)
    # liczba rekordów obecnie załadowanych do bufora
    self.loaded_size = 0
    self.buffer = []
    self.disk_reads_count = 0
    #ładowanie początkowego zestawu danych do bufora
    self.load_next()

  def check_more(self):
    return (self.file_possition < self.file_size or self.read_possition < self.loaded_size)


  # see_next uzywać w SeriesIterator, gdzie weryfikuje gdzie konczy sie seria
  # (porownuje kolejne liczba z ta z poprzednia i ustawiam flage end_of_series)
  # dzieki see_next() mogą podglądnąc kolejny record bez przesuwania wskaznika

  def see_next(self):
    return None if self.read_possition == self.loaded_size else self.buffer[self.read_possition]

  # ODCZYT REKORDÓW: Metoda read_next zwraca następny rekord
  def read_next(self):
    # sprawdzam, czy są jeszcze rekordy do odczytu
    if not self.check_more():
        return None

    # jeśli są jakieś rekordy do odczytu to zwracam następny rekord z bufora
    record = self.buffer[self.read_possition]
    self.read_possition += 1

    # jeśli pozycja odczytu osiągnie rozmiar bufora
    if self.read_possition == self.size:
        # ładuje kolejną porcję danych do bufora (self.load_next())
        self.load_next()
        # resetuje self.read_possition
        self.read_possition = 0

    return record

  def load_next(self):
    self.buffer = []
    with open(self.path, "rb", buffering =0) as f:
      # seek() function is used to change the position of
      # the File Handle to a given specific position
      # przesuwam wskaźnik odczytu pliku do self.file_pos
      f.seek(self.file_possition)

      bytes_to_read = min(BYTES_BUFFER_SIZE, self.file_size - self.file_possition)

      temporrary_buffer = f.read(bytes_to_read)

      self.file_possition += bytes_to_read
      self.loaded_size = bytes_to_read / RECORD_BYTES_SIZE
      temporrary_ints = list(temporrary_buffer)

      # dodaje liste rekordów do bufora (self.buffer)
      for i in range(len(temporrary_buffer) // RECORD_BYTES_SIZE):
          record_ints = temporrary_ints[
                        RECORD_BYTES_SIZE * i:RECORD_BYTES_SIZE * (i + 1)
                        ]
          self.buffer.append(Record.load_from_ints(record_ints))

      # zamykam plik i zwiększam licznik operacji odczytu z dysku (self.disk_reads_count)
      f.close()
      self.disk_reads_count += 1


  def __iter__(self):
    return self

  def __next__(self):
    next_record = self.read_next()
    if next_record is None:
      raise StopIteration
    return next_record


In [602]:
class WriteBuffer:

  def __init__(self, path, append_mode=False):

    self.write_possition = 0
    self.buffer = [None] * BUFFER_SIZE

    self.path = path
    self.size = BUFFER_SIZE
    if not append_mode and os.path.isfile(path):
      os.remove(path)
    self.series_written = 0
    self.last_written = None
    self.disk_writes_count = 0


  def save_next(self):
    ints_to_write = []

    for record in self.buffer[0:self.write_possition]:
      ints_to_write += record.save_to_ints()

    with open(self.path,"ab", buffering = 0) as f:
      f.write(bytearray(ints_to_write))
      f.close()
      self.disk_writes_count += 1


  def flush(self):
    if self.write_possition > 0:
      self.save_next()
      self.write_possition = 0

  def write_next(self, record):

    if record < self.last_written:
      self.series_written += 1

    if self.write_possition == self.size:
      self.flush()

    self.buffer[self.write_possition] = record
    self.write_possition += 1
    self.last_written = record

In [603]:
class SeriesIterator:

  def __init__(self, read_buffer):
    self.read_buffer = read_buffer
    self.current_record = None
    self.end_of_series = False


  def read_next(self):
    #pdb.set_trace()

    # jesli zosała ustawiona flaga konca serii to zwracam None
    if self.end_of_series:
      return None

    # odczytuje kolejne rekordy i przypisuje je do current_record
    # tu nalezy zaznaczyc ze read_next() sam przesunie wskaznik na
    # kolejny rekord
    self.current_record = self.read_buffer.read_next()

    if self.current_record is None:
      return None

    # podgladam jaki bedzie nastepny rekord
    next_record = self.read_buffer.see_next()

    # tu jesli pogdlądniety rekord jest mniejszy od poprzedniego to oznacza ze zakonczyła sie pewna seria
    if next_record is not None and next_record < self.current_record:
      # i wtedy nalezy ustawic flage konca serii na true
      self.end_of_series = True

    # zwracam obecny rekord
    return self.current_record

  def __iter__(self):
    return self

  def __next__(self):
    # jesli została ustawiona flaga en_of_series na True to self.read_next() będzie równy None
    # i wtedy jest StopIteration bo koniec serii
    res_record = self.read_next()
    if res_record is None:
      raise StopIteration
    return res_record

In [604]:
def print_tape(name):
  print(f'Tape: {name}')

  buffer = ReadBuffer(name)
  series_counter = 0
  records_counter = 0

  while buffer.check_more():
    #pdb.set_trace()
    iterator = SeriesIterator(buffer)
    series = []
    for r in iterator:
      series.append(r)
      records_counter +=1
    # gdy wyjdzie z petli to dodaje '||' na koniec serii
    print(" ".join(map(str, series)), end=" | " if buffer.check_more() else "\n")
    # i zwiekszam licznik serii
    series_counter +=1

  print(f'Series count: {series_counter}')
  print(f'Records count: {records_counter}')

def series_count(name):
  series_counter = 0
  buffer = ReadBuffer(name)
  while buffer.check_more():
    iterator = SeriesIterator(buffer)
    for _ in iterator:
      pass
    series_counter +=1
  return series_counter

In [584]:
#Funckja split rozdziela rekordy z pliku źródłowego na dwa docelowe pliki, tworząc w nich sekwencje posortowane
def split(first_tape, second_tape,tape):

  t1_buffer = ReadBuffer(tape)
  t2_buffer = WriteBuffer(first_tape)
  t3_buffer = WriteBuffer(second_tape)

  # następny rekord z pliku źródłowego za pośrednictwem bufora odczytu (ReadBuffer)
  last_record = t1_buffer.read_next()
  # do bufora t2 zapisuje 1 element z t1
  t2_buffer.write_next(last_record)

  destination_buffer = t2_buffer
  # zaczynam zapisywac do t2 i jesli kolejny record jest wiekszy od poprzednika to zapisuje w t3
  # calosc opiera sie na odpowiedniej zmianie destination_buffer
  for r in t1_buffer:
    #pdb.set_trace()
    if r < last_record:
      destination_buffer = t3_buffer if destination_buffer == t2_buffer else t2_buffer

    (t2_buffer if destination_buffer == t2_buffer else t3_buffer).write_next(r)
    last_record = r

  # Flushing buforów na końcu zapisuje wszystkie pozostałe dane z buforów do plików, zapewniając,
  # że wszystkie dane są zapisywane i bufor jest pusty przed kolejnymi operacjami
  t2_buffer.flush()
  t3_buffer.flush()

  #dodane

  print("\nTasma 1: ")
  #print(t2_buffer)
  print_tape(first_tape)
  print("\nTasma 2 :")
  print_tape(second_tape)

  return MetaInfo(t1_buffer.disk_reads_count,
  # t2_buffer.disk_writes_count + t3_buffer.disk_writes_count: Łączna liczba operacji zapisu na dysku dla obu buforów
  t2_buffer.disk_writes_count + t3_buffer.disk_writes_count,
  # t2_buffer.series_written + t3_buffer.series_written: Łączna liczba sekwencji posortowanych (runs) zapisanych do obu buforów
  t2_buffer.series_written + t3_buffer.series_written)

In [672]:
# Funkcja series_merge jest używana do scalenia dwóch posortowanych sekwencji rekordów w jedną posortowaną sekwencję.
def series_merge(first_tape, second_tape, write_buffer: WriteBuffer):

  # current_ser1 - kolejne wartosc z t1
  records_in_tape1 = first_tape.read_next()
  # current_ser2 - kolejne wartosc z t2
  records_in_tape2 = second_tape.read_next()
  pdb.set_trace()
  while records_in_tape1 is not None and records_in_tape2 is not None:
    # sprawdzam czy rekord z tasmy 1 jest mniejszy niz z tasmy 2
    if records_in_tape1 < records_in_tape2:
      # jesli rekord z tasmy 1 jest mniejszy to zapisuje go (ten z pierwszej tasmy) do buffora
      write_buffer.write_next(records_in_tape1)
      # i odczytuje kolejny rekord
      records_in_tape1 =  first_tape.read_next()
    else:
      # jesli rekord z tasmy 1 jest wiekszy niz z tasmy 2 to zapisuje rekord (z drugiej tasmy) do bufora
      write_buffer.write_next(records_in_tape2)
      # odczytuje kolejny rekord z drugiej tasmy
      records_in_tape2 = second_tape.read_next()

  # gdy wyjde poza tasme
  for tape in (first_tape, second_tape):
    # teraz sprawdzam czy to z tasmy 1 czy z tasmy 2 wyszedłem poza nią (czyli uzyskałem None)
    possibly_none_value = records_in_tape1 if tape == first_tape else records_in_tape2

    if possibly_none_value is not None:
      # rekord z taśmy z której nie wyszedłem zapisuje do write_buffer
      write_buffer.write_next(possibly_none_value)
      # na taśmie z której nie uzyskałem None pozostały mi jeszcze wartości
      for remaining_records in tape:
        write_buffer.write_next(remaining_records)

In [662]:
def merge(first_tape, second_tape, tape):
  t1_buffer = WriteBuffer(tape)

  t2_buffer = ReadBuffer(first_tape)# tasma 1
  t3_buffer = ReadBuffer(second_tape)# tasma 2
  #pdb.set_trace()
  while t2_buffer.check_more() and t3_buffer.check_more():
    series_merge(SeriesIterator(t2_buffer), SeriesIterator(t3_buffer), t1_buffer)


  for buffer in (t2_buffer, t3_buffer):
    for r in buffer:
        t1_buffer.write_next(r)

  t1_buffer.flush()

  return MetaInfo(t2_buffer.disk_reads_count + t3_buffer.disk_reads_count,
                  t1_buffer.disk_writes_count,
                  t1_buffer.series_written)

In [663]:
def prepare_tapes():

  tape1 = WriteBuffer("tapes/t1")

  for r in ReadBuffer("tapes/start_tape"):
    tape1.write_next(r)

  tape1.flush()

  for tape in ("t2", "t3"):
    path = f"tapes/{tape}"
    if os.path.isfile(path):
        os.remove(path)

In [664]:
def tape_sort(tape):

  phases_count = 0
  series_written = 0

  reads_count = 0
  writes_count = 0

  while series_written != 1:

    split_information = split("tapes/t2", "tapes/t3", tape)
    merge_information = merge("tapes/t2", "tapes/t3", tape)

    series_written = merge_information.series_count

    reads_count += split_information.reads_count
    reads_count += merge_information.reads_count

    writes_count += split_information.writes_count
    writes_count += merge_information.writes_count


    print(f'\nPhase: {phases_count + 1}')
    print_tape(tape)

    phases_count +=1

  # zwracam obiekt SortInfo zawierający liczbę odczytów, zapisów i faz sortowania, co pozwala na analizę wydajności algorytmu.
  return SortInfo(reads_count, writes_count, phases_count)


In [665]:
class SortInfo:
    def __init__(self, reads_count, writes_count, phases_count):

        # reads_count: Liczba operacji odczytu wykonanych podczas sortowania
        self.reads_count = reads_count
        # writes_count: Liczba operacji zapisu wykonanych podczas sortowania
        self.writes_count = writes_count
        # phases_count: Liczba faz (iteracji) sortowania
        self.phases_count = phases_count

In [666]:
class MetaInfo:
    def __init__(self, reads_count, writes_count, series_count):

        # reads_count: Liczba operacji odczytu z dysku
        self.reads_count = reads_count

        # writes_count: Liczba operacji zapisu na dysk
        self.writes_count = writes_count

        # runs_count: Liczba przebiegów (runs) wykonanych podczas sortowania
        self.series_count = series_count

Wywoływanie

In [667]:
!rm tapes/t1
!rm tapes/t2
!rm tapes/t3

In [668]:
tape = "tapes/t1"
test_file_path = "tapes/test"

In [669]:
write_buffer = WriteBuffer(tape, append_mode=True)
count = 0

with open(test_file_path) as test_file:
  for line in test_file:
    set_numbers = [int(s) for s in line.rstrip().split()]
    new_record = Record(set_numbers)
    write_buffer.write_next(new_record)
    count += 1
write_buffer.flush()
print(f'Added {count} records to tape')

Added 8 records to tape


In [670]:
#print(f"Displaying tape {tape}")
#print_tape(tape)

In [671]:
pprint(f"Sorting tape {tape}")
print(f"Displaying tape before sorting:")
print_tape(tape)
sort_info = tape_sort(tape)
#sort_info = tape_sort(tape)
print(f"\nDisplaying tape after sorting:")
print_tape(tape)
print(f"Tape {tape} sorted!")
print(f"Sorting metadata:")
print(f"Phase count: {sort_info.phases_count}")
print(f"Reads count: {sort_info.reads_count}")
print(f"Writes count: {sort_info.writes_count}")

Sorting tape tapes/t1
Displaying tape before sorting:
Tape: tapes/t1
[44] [55] | [12] [42] [94] | [18] | [6] [67]
Series count: 4
Records count: 8

Tasma 1: 
Tape: tapes/t2
[44] [55] | [18]
Series count: 2
Records count: 3

Tasma 2 :
Tape: tapes/t3
[12] [42] [94] | [6] [67]
Series count: 2
Records count: 5
> [0;32m<ipython-input-661-e2e89b42d47a>[0m(9)[0;36mseries_merge[0;34m()[0m
[0;32m      7 [0;31m  [0mcurrent_ser2[0m [0;34m=[0m [0msecond_tape[0m[0;34m.[0m[0mread_next[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m      8 [0;31m  [0mpdb[0m[0;34m.[0m[0mset_trace[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m----> 9 [0;31m  [0;32mwhile[0m [0mcurrent_ser1[0m [0;32mis[0m [0;32mnot[0m [0;32mNone[0m [0;32mand[0m [0mcurrent_ser2[0m [0;32mis[0m [0;32mnot[0m [0;32mNone[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m     10 [0;31m    [0;31m# sprawdzam czy rekord z tasmy 1 jest mniejszy niz z tasmy 2[0m[0;