#Setup

In [None]:
!pip install apache-beam

Collecting apache-beam
  Downloading apache_beam-2.61.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.4 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam)
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m6.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting dill<0.3.2,>=0.3.1.1 (from apache-beam)
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m8.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting cloudpickle~=2.2.1 (from apache-beam)
  Downloading cloudpickle-2.2.1-py3-none-any.whl.metadata (6.9 kB)
Collecting fastavro<2,>=0.23.6 (from apache-beam)
  Downloading fastavro-1.10.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Collecting fasteners<1.0,>=0.3 (from apache-beam)
  D

In [None]:
import apache_beam as beam
from apache_beam.transforms import window
from apache_beam.options.pipeline_options import PipelineOptions
import datetime
import random

#beam.WindowInto()

1.	`beam.WindowInto(beam.window.FixedWindows())` - FixedWindows - Okna o stałej szerokości, np. 10 minut.
2.	`beam.WindowInto(beam.window.SlidingWindows())` - SlidingWindows - Okna przesuwające się w czasie, np. okna co 5 minut, które zachodzą na siebie.
3.	`beam.WindowInto(beam.window.SessionWindows())` SessionWindows - Okna o zmiennym czasie, tworzone w zależności od przerw w danych (np. jeśli dane nie napływają przez określony czas).


Przykład decyzji:

- Jeśli potrzebujesz regularnych raportów -> FixedWindows
- Jeśli monitorujesz anomalie -> SlidingWindows
- Jeśli analizujesz zachowania użytkowników -> SessionWindows

#generate_data

In [None]:
def generate_data():
    timestamp = datetime.datetime.now()
    for i in range(100):
        event_time = timestamp + datetime.timedelta(minutes=random.randint(0, 20))
        yield {
            'sensor_id': random.randint(1, 5),
            'temperature': random.uniform(20.0, 30.0),
            'timestamp': event_time
        }

In [None]:
list(generate_data())[:5]

[{'sensor_id': 1,
  'temperature': 25.479417146221127,
  'timestamp': datetime.datetime(2025, 1, 7, 14, 36, 53, 91661)},
 {'sensor_id': 2,
  'temperature': 24.621569430832658,
  'timestamp': datetime.datetime(2025, 1, 7, 14, 36, 53, 91661)},
 {'sensor_id': 3,
  'temperature': 26.922638491778066,
  'timestamp': datetime.datetime(2025, 1, 7, 14, 30, 53, 91661)},
 {'sensor_id': 3,
  'temperature': 21.55912404598026,
  'timestamp': datetime.datetime(2025, 1, 7, 14, 32, 53, 91661)},
 {'sensor_id': 1,
  'temperature': 29.856935466667483,
  'timestamp': datetime.datetime(2025, 1, 7, 14, 41, 53, 91661)}]

In [None]:
# Sposób 1 - bezpośrednie użycie generatora z ograniczeniem
for i, data in enumerate(generate_data()):
    if i < 2:  # Ograniczamy do 5 wierszy
        print(data)
    else:
        break

# Sposób 2 - z użyciem itertools (bardziej elegancki)
from itertools import islice

for data in islice(generate_data(), 2):
    print(data)

{'sensor_id': 5, 'temperature': 24.494212238159026, 'timestamp': datetime.datetime(2025, 1, 7, 14, 45, 26, 846437)}
{'sensor_id': 1, 'temperature': 28.0515076880288, 'timestamp': datetime.datetime(2025, 1, 7, 14, 33, 26, 846437)}
{'sensor_id': 1, 'temperature': 25.243036609587932, 'timestamp': datetime.datetime(2025, 1, 7, 14, 36, 26, 848225)}
{'sensor_id': 4, 'temperature': 23.172863473246018, 'timestamp': datetime.datetime(2025, 1, 7, 14, 29, 26, 848225)}


#PRZYPADEK 1: FixedWindows

In [None]:
def generate_data():
    timestamp = datetime.datetime.now()
    for i in range(100):
        event_time = timestamp + datetime.timedelta(minutes=random.randint(0, 20))
        yield {
            'sensor_id': random.randint(1, 5),
            'temperature': random.uniform(20.0, 30.0),
            'timestamp': event_time
        }

def print_element(element, prefix=''):
    print(f"{prefix} > {element}")
    return element

def run():
    options = PipelineOptions()

    with beam.Pipeline(options=options) as p:

        fixed_windows = (
            p
            | "Generuj dane - Fixed" >> beam.Create(list(generate_data()))  # Konwersja do listy
            | "Print dane wejściowe - Fixed" >> beam.Map(lambda x: print_element(x, "Fixed Input"))
            | "Dodaj timestamp - Fixed" >> beam.Map(
                lambda x: beam.window.TimestampedValue(x, x['timestamp'].timestamp()))
            | "Okna 15-min" >> beam.WindowInto(beam.window.FixedWindows(15 * 60))
            | "Przygotuj klucz-wartość" >> beam.Map(lambda x: (x['sensor_id'], x['temperature']))
            | "Grupuj po sensorze - Fixed" >> beam.GroupByKey()
            | "Print po grupowaniu - Fixed" >> beam.Map(lambda x: print_element(x, "Fixed Grouped"))
            | "Oblicz średnią - Fixed" >> beam.Map(
                lambda x: (x[0], sum(x[1]) / len(x[1]))
            )
            | "Print wynik końcowy - Fixed" >> beam.Map(lambda x: print_element(x, "Fixed Result"))
        )

if __name__ == '__main__':
    run()



Fixed Input > {'sensor_id': 5, 'temperature': 27.250813444150005, 'timestamp': datetime.datetime(2025, 1, 7, 14, 27, 23, 822785)}
Fixed Input > {'sensor_id': 5, 'temperature': 29.905900718081323, 'timestamp': datetime.datetime(2025, 1, 7, 14, 25, 23, 822785)}
Fixed Input > {'sensor_id': 4, 'temperature': 26.07122086032122, 'timestamp': datetime.datetime(2025, 1, 7, 14, 37, 23, 822785)}
Fixed Input > {'sensor_id': 4, 'temperature': 20.86915977875521, 'timestamp': datetime.datetime(2025, 1, 7, 14, 40, 23, 822785)}
Fixed Input > {'sensor_id': 1, 'temperature': 24.806977139008787, 'timestamp': datetime.datetime(2025, 1, 7, 14, 38, 23, 822785)}
Fixed Input > {'sensor_id': 1, 'temperature': 29.253955684875546, 'timestamp': datetime.datetime(2025, 1, 7, 14, 33, 23, 822785)}
Fixed Input > {'sensor_id': 5, 'temperature': 23.99025900271141, 'timestamp': datetime.datetime(2025, 1, 7, 14, 28, 23, 822785)}
Fixed Input > {'sensor_id': 2, 'temperature': 28.32341681076429, 'timestamp': datetime.dateti

In [None]:
"Dodaj timestamp - Fixed" >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'].timestamp()))

# Załóżmy, że x to słownik:
x = {
    'sensor_id': 1,
    'temperature': 25.5,
    'timestamp': datetime.datetime(2024, 1, 7, 10, 30, 0)  # przykładowa data
}

# beam.window.TimestampedValue przyjmuje dwa argumenty:
# 1. element - całe dane (x)
# 2. timestamp - unix timestamp w sekundach

# x['timestamp'].timestamp() konwertuje datetime na unix timestamp
# np. datetime(2024, 1, 7, 10, 30, 0).timestamp() -> 1704626400.0

# W rezultacie tworzymy TimestampedValue, który mówi Beam'owi:
# "to zdarzenie wystąpiło dokładnie w tym momencie czasowym"

In [None]:
"Okna 15-min" >> beam.WindowInto(beam.window.FixedWindows(15 * 60))

# beam.window.FixedWindows(15 * 60) tworzy okna 15-minutowe:
# - 15 * 60 = 900 sekund = 15 minut
# - każde okno jest dokładnie 15 minut długie
# - okna nie nachodzą na siebie

# Przykład podziału na okna:
# Dla danych z timestampami:
# 10:00:00 -> wpada do okna 10:00-10:15
# 10:14:59 -> wpada do okna 10:00-10:15
# 10:15:00 -> wpada do okna 10:15-10:30
# 10:29:59 -> wpada do okna 10:15-10:30

# Wszystkie elementy w jednym oknie będą przetwarzane razem
# np. jeśli mamy 5 odczytów między 10:00 a 10:15,
# zostaną one zagregowane w jednym oknie

# PRZYPADEK 2: SlidingWindows

In [None]:
def generate_data():
    timestamp = datetime.datetime.now()
    for i in range(100):
        event_time = timestamp + datetime.timedelta(minutes=random.randint(0, 20))
        yield {
            'sensor_id': random.randint(1, 5),
            'temperature': random.uniform(20.0, 30.0),
            'timestamp': event_time
        }

def print_element(element, prefix=''):
    print(f"{prefix} > {element}")
    return element

def run():
    options = PipelineOptions()

    with beam.Pipeline(options=options) as p:


        sliding_windows = (
            p
            | "Generuj dane - Sliding" >> beam.Create(list(generate_data()))
            | "Print dane wejściowe - Sliding" >> beam.Map(lambda x: print_element(x, "Sliding Input"))
            | "Dodaj timestamp - Sliding" >> beam.Map(
                lambda x: beam.window.TimestampedValue((x['sensor_id'], x['temperature']), x['timestamp'].timestamp()))
            | "Okna przesuwne 30-min" >> beam.WindowInto(
                beam.window.SlidingWindows(30 * 60, 5 * 60))
            | "Grupuj odczyty" >> beam.GroupByKey()
            | "Print po grupowaniu - Sliding" >> beam.Map(lambda x: print_element(x, "Sliding Grouped"))
            | "Wykryj anomalie" >> beam.Map(
                lambda x: {
                    'sensor_id': x[0],
                    'temperatures': list(x[1]),
                    'anomalies': [t for t in x[1] if abs(t - sum(x[1])/len(x[1])) > 2]
                }
            )
            | "Print wynik końcowy - Sliding" >> beam.Map(lambda x: print_element(x, "Sliding Result"))
        )


if __name__ == '__main__':
    run()



Sliding Input > {'sensor_id': 5, 'temperature': 28.193792550498074, 'timestamp': datetime.datetime(2025, 1, 7, 14, 53, 55, 189604)}
Sliding Input > {'sensor_id': 4, 'temperature': 27.025469529165452, 'timestamp': datetime.datetime(2025, 1, 7, 15, 9, 55, 189604)}
Sliding Input > {'sensor_id': 4, 'temperature': 23.724654749472414, 'timestamp': datetime.datetime(2025, 1, 7, 14, 56, 55, 189604)}
Sliding Input > {'sensor_id': 1, 'temperature': 29.57972965186522, 'timestamp': datetime.datetime(2025, 1, 7, 15, 1, 55, 189604)}
Sliding Input > {'sensor_id': 4, 'temperature': 23.77232485497126, 'timestamp': datetime.datetime(2025, 1, 7, 14, 58, 55, 189604)}
Sliding Input > {'sensor_id': 5, 'temperature': 23.619653456893225, 'timestamp': datetime.datetime(2025, 1, 7, 15, 4, 55, 189604)}
Sliding Input > {'sensor_id': 5, 'temperature': 22.59875867963045, 'timestamp': datetime.datetime(2025, 1, 7, 15, 4, 55, 189604)}
Sliding Input > {'sensor_id': 2, 'temperature': 25.653146477713396, 'timestamp': da

In [None]:
"Dodaj timestamp - Sliding" >> beam.Map(lambda x: beam.window.TimestampedValue((x['sensor_id'], x['temperature']), x['timestamp'].timestamp()))

# Załóżmy przykładowe dane wejściowe:
x = {
    'sensor_id': 1,
    'temperature': 25.5,
    'timestamp': datetime.datetime(2024, 1, 7, 10, 30, 0)
}

# Ta transformacja robi dwie rzeczy naraz:
# 1. Tworzy krotkę z danych: (x['sensor_id'], x['temperature'])
#    Czyli z przykładu: (1, 25.5)

# 2. Dodaje znacznik czasowy poprzez TimestampedValue:
#    - Pierwszy argument to nasza krotka: (1, 25.5)
#    - Drugi to unix timestamp: 1704626400.0

# Różnica w porównaniu do FixedWindows:
# - W Fixed przekazywaliśmy całe 'x'
# - W Sliding przekazujemy tylko potrzebne dane w formie krotki

In [None]:
"Okna przesuwne 30-min" >> beam.WindowInto(beam.window.SlidingWindows(30 * 60, 5 * 60))

# SlidingWindows przyjmuje dwa argumenty:
# 1. 30 * 60 = 1800 sekund = długość okna (30 minut)
# 2. 5 * 60 = 300 sekund = okres przesunięcia (5 minut)

# Przykład jak działają okna przesuwne:
# Dla danych z timestampem 10:30:00:
# - Wpada do okna 10:15-10:45
# - Wpada do okna 10:20-10:50
# - Wpada do okna 10:25-10:55
# - Wpada do okna 10:30-11:00

# Każde zdarzenie należy do wielu okien jednocześnie!

# PRZYPADEK 3: SessionWindows

In [None]:
def generate_data():
    timestamp = datetime.datetime.now()
    for i in range(100):
        event_time = timestamp + datetime.timedelta(minutes=random.randint(0, 20))
        yield {
            'sensor_id': random.randint(1, 5),
            'temperature': random.uniform(20.0, 30.0),
            'timestamp': event_time
        }

def print_element(element, prefix=''):
    print(f"{prefix} > {element}")
    return element

def run():
    options = PipelineOptions()

    with beam.Pipeline(options=options) as p:

        # PRZYPADEK 3: SessionWindows
        session_windows = (
            p
            | "Generuj dane - Session" >> beam.Create(list(generate_data()))
            | "Print dane wejściowe - Session" >> beam.Map(lambda x: print_element(x, "Session Input"))
            | "Przygotuj dane - Session" >> beam.Map(
                lambda x: (x['sensor_id'], x))  # Konwersja do formatu (key, value)
            | "Dodaj timestamp - Session" >> beam.Map(
                lambda x: beam.window.TimestampedValue(x, x[1]['timestamp'].timestamp()))
            | "Sesje 10-min gap" >> beam.WindowInto(
                beam.window.Sessions(10 * 60))
            | "Grupuj po sensorze - Session" >> beam.GroupByKey()
            | "Print po grupowaniu - Session" >> beam.Map(lambda x: print_element(x, "Session Grouped"))
            | "Analizuj sekwencje" >> beam.Map(
                lambda x: {
                    'sensor_id': x[0],
                    'session_duration': (max(d['timestamp'] for d in x[1]) -
                                     min(d['timestamp'] for d in x[1])).total_seconds(),
                    'readings_count': len(list(x[1]))
                }
            )
            | "Print wynik końcowy - Session" >> beam.Map(lambda x: print_element(x, "Session Result"))
        )

if __name__ == '__main__':
    run()



Session Input > {'sensor_id': 3, 'temperature': 25.57535974908511, 'timestamp': datetime.datetime(2025, 1, 7, 15, 40, 37, 561631)}
Session Input > {'sensor_id': 1, 'temperature': 25.83268794249436, 'timestamp': datetime.datetime(2025, 1, 7, 15, 34, 37, 561631)}
Session Input > {'sensor_id': 3, 'temperature': 25.50565501559163, 'timestamp': datetime.datetime(2025, 1, 7, 15, 26, 37, 561631)}
Session Input > {'sensor_id': 5, 'temperature': 26.023258446135735, 'timestamp': datetime.datetime(2025, 1, 7, 15, 21, 37, 561631)}
Session Input > {'sensor_id': 2, 'temperature': 25.989437676542998, 'timestamp': datetime.datetime(2025, 1, 7, 15, 23, 37, 561631)}
Session Input > {'sensor_id': 3, 'temperature': 24.478497359027923, 'timestamp': datetime.datetime(2025, 1, 7, 15, 21, 37, 561631)}
Session Input > {'sensor_id': 1, 'temperature': 28.029769255472605, 'timestamp': datetime.datetime(2025, 1, 7, 15, 25, 37, 561631)}
Session Input > {'sensor_id': 2, 'temperature': 28.542792410393453, 'timestamp'

In [None]:
"Dodaj timestamp - Session" >> beam.Map(
    lambda x: beam.window.TimestampedValue(x, x[1]['timestamp'].timestamp()))

# Cel: Dodanie znacznika czasu do elementów przetwarzanych w potoku.
# beam.Map: Transformacja, która stosuje funkcję (tutaj lambda) do każdego elementu x w PCollection.
# lambda x: beam.window.TimestampedValue(...):
# Tworzymy obiekt TimestampedValue, który opakowuje element x wraz ze znacznikiem czasu.
# x[1]['timestamp'] zakłada, że x jest krotką lub strukturą podobną do słownika, gdzie drugi element (x[1]) zawiera klucz 'timestamp'.
# .timestamp() konwertuje znacznik czasu (np. datetime) na liczbę sekund od epoki Unix (czas uniksowy).
# Efekt: Każdy element w PCollection ma teraz przypisany znacznik czasu, który Apache Beam użyje do okien czasowych.

In [None]:
"Sesje 10-min gap" >> beam.WindowInto(
    beam.window.Sessions(10 * 60))

# Cel: Grupowanie danych w okna sesji na podstawie przypisanych wcześniej znaczników czasu.
# beam.WindowInto: Ustawia strategię okienkowania dla PCollection. W tym przypadku korzystamy z okien sesji.
# beam.window.Sessions(10 * 60): Tworzy okna sesji z luką czasową wynoszącą 10 minut (wyrażone w sekundach: 10 * 60).
# Okna sesji: Dzielą dane na dynamiczne przedziały czasowe, które kończą się, jeśli przez określoną lukę (tutaj 10 minut) nie pojawią się nowe dane.
# Efekt:
# Elementy z podobnymi znacznikami czasu, które są blisko siebie (w granicach 10 minut), zostaną zgrupowane w to samo okno sesji.
# Jeżeli między zdarzeniami wystąpi luka większa niż 10 minut, zostanie utworzone nowe okno sesji.