## Zadanie 1

In [2]:
from datetime import datetime
import time
import pandas as pd
import dask
import dask.dataframe as dd
from dask.distributed import Client

client = Client()
print(client.scheduler_info)

lines = [
    "[Sun Dec 04 07:18:00 2005] [notice] workerEnv.init() ok /etc/httpd/conf/workers2.properties",
    "[Sun Dec 04 07:18:00 2005] [error] mod_jk child workerEnv in error state 6",
    "[Sun Dec 04 07:18:00 2005] [notice] workerEnv.init() ok /etc/httpd/conf/workers2.properties",
    "[Sun Dec 04 07:18:00 2005] [error] mod_jk child workerEnv in error state 7",
    "[Sun Dec 04 07:45:45 2005] [error] [client 63.13.186.196] Directory index forbidden by rule: /var/www/html/",
    "[Sun Dec 04 08:54:17 2005] [error] [client 147.31.138.75] Directory index forbidden by rule: /var/www/html/",
    "[Sun Dec 04 09:35:12 2005] [error] [client 207.203.80.15] Directory index forbidden by rule: /var/www/html/",
    "[Sun Dec 04 10:53:30 2005] [error] [client 218.76.139.20] Directory index forbidden by rule: /var/www/html/"
]

@dask.delayed
def parse(inp: str):
    record = {}
    
    date_start = inp.find('[') + 1
    date_end = inp.find(']')
    date_s = slice(date_start, date_end)

    level_start = inp.find('[', date_end) + 1
    level_end = inp.find(']', level_start)
    level_s = slice(level_start, level_end)

    client_start = inp.find('[', level_end)
    client_end = inp.find(']', client_start)

    record["date"] = inp[date_s]    
    record["level"] = inp[level_s]
    record["client"] = "" if client_start == -1 else inp[client_start + 8: client_end]
    record["message"] = inp[client_end + 2:] if record["client"] else inp[level_end + 2:]
    
    return record


@dask.delayed
def convert_date(rec):
    rec["date"] = datetime.strptime(rec["date"], "%a %b %d %H:%M:%S %Y")
    return rec


Perhaps you already have a cluster running?
Hosting the HTTP server on port 53094 instead


<bound method Client.scheduler_info of <Client: 'tcp://127.0.0.1:53095' processes=4 threads=8, memory=16.00 GiB>>


In [3]:
start_time = time.time()
output_sequential = []

for line in lines:
    record = parse(line).compute()
    record = convert_date(record).compute()
    output_sequential.append(list(record.values()))
    

df_sequential = pd.DataFrame(output_sequential, columns=["date", "level", "client", "message"])
sequential_time = time.time() - start_time
print("Sequential Processing Time:", sequential_time)
df_sequential.head()

Sequential Processing Time: 0.16487979888916016


Unnamed: 0,date,level,client,message
0,2005-12-04 07:18:00,notice,,workerEnv.init() ok /etc/httpd/conf/workers2.p...
1,2005-12-04 07:18:00,error,,mod_jk child workerEnv in error state 6
2,2005-12-04 07:18:00,notice,,workerEnv.init() ok /etc/httpd/conf/workers2.p...
3,2005-12-04 07:18:00,error,,mod_jk child workerEnv in error state 7
4,2005-12-04 07:45:45,error,63.13.186.196,Directory index forbidden by rule: /var/www/html/


In [4]:
start_time = time.time()
tasks = [convert_date(parse(line)) for line in lines]
output_parallel = dask.compute(*tasks)


df_parallel = pd.DataFrame([list(record.values()) for record in output_parallel], columns=["date", "level", "client", "message"])
parallel_time = time.time() - start_time
print("Parallel Processing Time:", parallel_time)
df_parallel.head()

Parallel Processing Time: 0.017551183700561523


Unnamed: 0,date,level,client,message
0,2005-12-04 07:18:00,notice,,workerEnv.init() ok /etc/httpd/conf/workers2.p...
1,2005-12-04 07:18:00,error,,mod_jk child workerEnv in error state 6
2,2005-12-04 07:18:00,notice,,workerEnv.init() ok /etc/httpd/conf/workers2.p...
3,2005-12-04 07:18:00,error,,mod_jk child workerEnv in error state 7
4,2005-12-04 07:45:45,error,63.13.186.196,Directory index forbidden by rule: /var/www/html/


## Zadanie 2

In [6]:
import dask.bag as db
import json
import os

In [7]:
DATAPATH = './data'
os.makedirs(DATAPATH, exist_ok=True)

b = dask.datasets.make_people(npartitions=100, records_per_partition=10000)

b.map(json.dumps).to_textfiles(os.path.join(DATAPATH, '*.json'))

people_bag = db.read_text(os.path.join(DATAPATH, '*.json')).map(json.loads)

def is_expired(credit_card_expiry):
    if credit_card_expiry:
        expiry_date = datetime.strptime(credit_card_expiry, "%m/%Y")
        return expiry_date < datetime.now()
    return False

expired_people = people_bag.filter(lambda record: is_expired(record.get('credit_card_expiry')))
expired_people = expired_people.repartition(npartitions=10)
expired_people.map(json.dumps).to_textfiles(os.path.join(DATAPATH, 'expired_{i}.json'))

expired_people.compute()

[]

In [8]:
!head -n 2 ./data/00.json

{"age": 0, "name": ["Robin", "Wagner"], "occupation": "Plumber", "telephone": "+1-208-410-8964", "address": {"address": "15 Shephard Parkway", "city": "Denver"}, "credit-card": {"number": "4731 7530 6120 1770", "expiration-date": "11/23"}}
{"age": 91, "name": ["Booker", "French"], "occupation": "Solicitor", "telephone": "+1-278-172-9760", "address": {"address": "93 Redfield Circle", "city": "Chattanooga"}, "credit-card": {"number": "3404 740253 40726", "expiration-date": "04/23"}}


## Zadanie 3

In [10]:
b = db.read_text(os.path.join(DATAPATH, '*.json')).map(json.loads)

adults_bag = people_bag.filter(lambda x: x['age'] >= 18)
adults_df = adults_bag.to_dataframe()
adults_df.tail(8)

Unnamed: 0,age,name,occupation,telephone,address,credit-card
8594,106,"['William', 'Whitfield']",Catering Manager,+15754668121,"{'address': '482 Point Lobos Shore', 'city': '...","{'number': '2504 8147 8205 1057', 'expiration-..."
8595,65,"['Khadijah', 'Benson']",Audiologist,+1-828-506-7709,"{'address': '286 George Drive', 'city': 'Sugar...","{'number': '3494 476880 07023', 'expiration-da..."
8596,119,"['Shu', 'Douglas']",Hosiery Mechanic,+17705117355,"{'address': '237 Nibbi Stravenue', 'city': 'Ra...","{'number': '4834 3376 3816 7650', 'expiration-..."
8597,66,"['Marilu', 'Conway']",TV Editor,+1-917-930-9789,"{'address': '918 Glenhaven Hills', 'city': 'Or...","{'number': '5342 6884 8492 5175', 'expiration-..."
8598,116,"['Georgetta', 'Tran']",Pattern Maker,+1-216-647-3724,"{'address': '102 Fitch Bridge', 'city': 'Fores...","{'number': '2566 3163 2635 7839', 'expiration-..."
8599,53,"['Justin', 'Ross']",Furniture Restorer,+12533161098,"{'address': '27 Crestwell Trace', 'city': 'Mou...","{'number': '4305 0248 0574 1059', 'expiration-..."
8600,66,"['Dylan', 'Craft']",Investigator,+1-401-551-0301,"{'address': '427 Aspen Bend', 'city': 'Saratoga'}","{'number': '3708 949698 97162', 'expiration-da..."
8601,115,"['Glady', 'Tran']",Tour Guide,+1-774-971-2713,"{'address': '652 Zampa Mall', 'city': 'Glenview'}","{'number': '5105 5479 1442 6492', 'expiration-..."


In [11]:
adults_df.to_parquet('adults_data.parquet')

In [12]:
adults_df.compute()

Unnamed: 0,age,name,occupation,telephone,address,credit-card
0,91,"['Booker', 'French']",Solicitor,+1-278-172-9760,"{'address': '93 Redfield Circle', 'city': 'Cha...","{'number': '3404 740253 40726', 'expiration-da..."
1,61,"['Echo', 'Stanton']",Marine Broker,+19510370761,"{'address': '54 Channel Square', 'city': 'Manh...","{'number': '3443 515292 24638', 'expiration-da..."
2,116,"['Henry', 'Olsen']",Retired,+18131603324,"{'address': '1322 Sabin Center', 'city': 'Sout...","{'number': '3464 534629 86815', 'expiration-da..."
3,44,"['Ward', 'Lindsey']",Radiographer,+1-940-190-6946,"{'address': '116 Livingston Run', 'city': 'Vis...","{'number': '4090 2969 2594 9916', 'expiration-..."
4,24,"['Leontine', 'Miller']",Catering Consultant,+1-660-675-6539,"{'address': '549 Anzavista Ferry', 'city': 'Sp...","{'number': '3787 905605 06503', 'expiration-da..."
...,...,...,...,...,...,...
8597,66,"['Marilu', 'Conway']",TV Editor,+1-917-930-9789,"{'address': '918 Glenhaven Hills', 'city': 'Or...","{'number': '5342 6884 8492 5175', 'expiration-..."
8598,116,"['Georgetta', 'Tran']",Pattern Maker,+1-216-647-3724,"{'address': '102 Fitch Bridge', 'city': 'Fores...","{'number': '2566 3163 2635 7839', 'expiration-..."
8599,53,"['Justin', 'Ross']",Furniture Restorer,+12533161098,"{'address': '27 Crestwell Trace', 'city': 'Mou...","{'number': '4305 0248 0574 1059', 'expiration-..."
8600,66,"['Dylan', 'Craft']",Investigator,+1-401-551-0301,"{'address': '427 Aspen Bend', 'city': 'Saratoga'}","{'number': '3708 949698 97162', 'expiration-da..."
