In [6]:
'''
Zadanie 1
Jieming Zhu, Shilin He, Pinjia He, Jinyang Liu, Michael R. Lyu. Loghub: A Large Collection of System Log Datasets for AI-driven Log Analytics. IEEE International Symposium on Software Reliability Engineering (ISSRE), 2023.
'''
import time
from datetime import datetime
import pandas as pd
import dask
from dask import delayed
import dask.dataframe as dd

def parse(inp: str):
    record = {}
    
    timestamp_str = inp[:15]
    record["date"] = timestamp_str.strip()
    
    rest = inp[16:].strip()
    
    hostname_end = rest.find(' ')
    hostname = rest[:hostname_end]
    rest = rest[hostname_end+1:]
    
    process_end = rest.find(':')
    process_info = rest[:process_end]
    message = rest[process_end+1:].strip()
    
    if '[' in process_info and ']' in process_info:
        process_name = process_info[:process_info.find('[')]
        pid = process_info[process_info.find('[')+1:process_info.find(']')]
    else:
        process_name = process_info
        pid = ''
    
    record["level"] = ''
    record["client"] = ''
    record["hostname"] = hostname
    record["process"] = process_name.strip()
    record["pid"] = pid
    record["message"] = message
    
    return record

def convert_date(rec):
    rec["date"] = datetime.strptime(rec["date"], "%b %d %H:%M:%S")
    rec["date"] = rec["date"].replace(year=datetime.now().year)
    return rec

with open('Linux_2k.log', 'r') as f:
    lines = f.readlines()

start_time_seq = time.time()

output_seq = []

for line in lines:
    record = parse(line)
    record = convert_date(record)
    output_seq.append(record)
    
df_seq = pd.DataFrame(output_seq)

df_seq.to_parquet('sekwencyjnie.parquet')

end_time_seq = time.time()
print(f"Czas wykonania wersji sekwencyjnej: {end_time_seq - start_time_seq:.2f} sekund")

start_time_par = time.time()

@delayed
def process_line(line):
    record = parse(line)
    record = convert_date(record)
    return record

delayed_results = [process_line(line) for line in lines]

results = dask.compute(*delayed_results)

ddf = dd.from_pandas(pd.DataFrame(results), npartitions=4)

ddf.to_parquet('zrownoleglone.parquet')

end_time_par = time.time()
print(f"Czas wykonania wersji zrównoleglonej: {end_time_par - start_time_par:.2f} sekund")

print(f"Różnica czasu (zrównoleglony - sekwencyjny): {(start_time_seq - end_time_seq) - (start_time_par - end_time_par):.2f} sekund")

print("\nSekwencyjny DataFrame:")
print(df_seq.head())

print("\nZrównoleglony DataFrame:")
print(ddf.head())

Czas wykonania wersji sekwencyjnej: 0.02 sekund
Czas wykonania wersji zrównoleglonej: 0.21 sekund
Różnica czasu (zrównoleglony - sekwencyjny): 0.19 sekund

Sekwencyjny DataFrame:
                 date level client hostname         process    pid  \
0 2024-06-14 15:16:01                 combo  sshd(pam_unix)  19939   
1 2024-06-14 15:16:02                 combo  sshd(pam_unix)  19937   
2 2024-06-14 15:16:02                 combo  sshd(pam_unix)  19937   
3 2024-06-15 02:04:59                 combo  sshd(pam_unix)  20882   
4 2024-06-15 02:04:59                 combo  sshd(pam_unix)  20884   

                                             message  
0  authentication failure; logname= uid=0 euid=0 ...  
1                           check pass; user unknown  
2  authentication failure; logname= uid=0 euid=0 ...  
3  authentication failure; logname= uid=0 euid=0 ...  
4  authentication failure; logname= uid=0 euid=0 ...  

Zrównoleglony DataFrame:
                 date level client hostname 

In [10]:
'''Zadanie 2'''
from dask.distributed import Client
import dask
import dask.bag as db
import json
import os
import random
from datetime import datetime, timedelta

client = Client(n_workers=4)

DATAPATH = './data'
os.makedirs(DATAPATH, exist_ok=True)

b = dask.datasets.make_people(npartitions=20, records_per_partition=5000)

def modify_ccexpires(record):
    if random.random() < 0.2:
        past_date = datetime.now() - timedelta(days=random.randint(30, 365*5))
        record['ccexpires'] = past_date.strftime('%m/%Y')
    else:
        future_date = datetime.now() + timedelta(days=random.randint(30, 365*5))
        record['ccexpires'] = future_date.strftime('%m/%Y')
    return record

b = b.map(modify_ccexpires)

b.map(json.dumps).to_textfiles(os.path.join(DATAPATH, 'people_modified_*.json'))

b = db.read_text(os.path.join(DATAPATH, 'people_modified_*.json')).map(json.loads)

def is_expired(record):
    cc_expires = record.get('ccexpires', '')
    if not cc_expires:
        return False
    try:
        expiry_date = datetime.strptime(cc_expires, '%m/%Y')
        last_day_of_month = expiry_date.replace(day=28) + timedelta(days=4)
        last_day_of_month -= timedelta(days=last_day_of_month.day)
        return last_day_of_month < datetime.now()
    except ValueError:
        return False
        
expired_cards = b.filter(is_expired)

expired_cards = expired_cards.repartition(10)

output_path = os.path.join(DATAPATH, 'expired_*.json')
expired_cards.map(json.dumps).to_textfiles(output_path)

expired_count = expired_cards.count().compute()
print(f"Liczba rekordów z wygasłymi kartami kredytowymi: {expired_count}")

print("Przykładowe rekordy z wygasłymi kartami:")
for record in expired_cards.take(3):
    print(record)

Liczba rekordów z wygasłymi kartami kredytowymi: 19843
Przykładowe rekordy z wygasłymi kartami:
{'age': 111, 'name': ['Clemente', 'Carrillo'], 'occupation': 'Publishing Manager', 'telephone': '+16793348245', 'address': {'address': '1224 Valley Terrace', 'city': 'Sierra Vista'}, 'credit-card': {'number': '4103 2967 6457 4473', 'expiration-date': '02/25'}, 'ccexpires': '10/2021'}
{'age': 83, 'name': ['Delbert', 'Kane'], 'occupation': 'Marine Engineer', 'telephone': '+1-627-346-6151', 'address': {'address': '1340 Lydia Center', 'city': 'Sandy Springs'}, 'credit-card': {'number': '3482 316212 35300', 'expiration-date': '03/24'}, 'ccexpires': '06/2024'}
{'age': 64, 'name': ['Marget', 'Gould'], 'occupation': 'Circus Worker', 'telephone': '+16411791829', 'address': {'address': '1252 Olmstead Station', 'city': 'Garden Grove'}, 'credit-card': {'number': '5514 0498 5832 2386', 'expiration-date': '02/23'}, 'ccexpires': '01/2024'}


In [14]:
from dask.distributed import Client
import dask
import dask.bag as db
import json
import os
import pandas as pd

client = Client(n_workers=4)

DATAPATH = './data'
os.makedirs(DATAPATH, exist_ok=True)

b = dask.datasets.make_people(npartitions=10, records_per_partition=1000)
b.map(json.dumps).to_textfiles(os.path.join(DATAPATH, 'people_*.json'))

b = db.read_text(os.path.join(DATAPATH, 'people_*.json')).map(json.loads)

adults_bag = b.filter(lambda record: record['age'] >= 18)

adults_df = adults_bag.to_dataframe()

adults_df_pd = adults_df.compute()

output_file = os.path.join(DATAPATH, 'adults.parquet')

if os.path.exists(output_file):
    os.remove(output_file)

adults_df_pd.to_parquet(output_file, index=False)

df_parquet = pd.read_parquet(output_file)
print(df_parquet.head())

   age                   name         occupation        telephone  \
0   64    ['Junko', 'Guzman']      Projectionist     +13148964143   
1  114     ['Shona', 'Riggs']          Treasurer  +1-331-410-9646   
2   65      ['Tommy', 'Best']  Transport Officer  +1-469-985-9150   
3   47       ['Lino', 'Hays']      Audit Manager  +1-513-788-6110   
4   44  ['Thurman', 'Hinton']            Student  +1-909-371-1490   

                                             address  \
0  {'address': '579 Temescal Arcade', 'city': 'Po...   
1  {'address': '644 Saint Louis Bayou', 'city': '...   
2  {'address': '70 Garden Motorway', 'city': 'Sha...   
3  {'address': '275 San Benito Freeway', 'city': ...   
4  {'address': '1334 Pacific Circle', 'city': 'Bu...   

                                         credit-card  
0  {'number': '3791 977106 59878', 'expiration-da...  
1  {'number': '3407 112039 63039', 'expiration-da...  
2  {'number': '3711 870298 50084', 'expiration-da...  
3  {'number': '2422 5120 722