- bigdata: nincs
- bigdata rendszer: van
  - nem korlátos adatmennyiségre tervezve
  - fel lehet kapcsolni + erőforrást

In [None]:
!pip install dask[complete]

## [dask!](https://docs.dask.org/en/latest/)
- [bag](https://docs.dask.org/en/latest/bag.html)
- [dataframe](https://docs.dask.org/en/latest/dataframe.html)

In [1]:
import dask.dataframe as dd
import dask.bag as db
from dask.distributed import Client

In [2]:
client = Client()

In [3]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:38223  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 8.23 GB


## get some "big" data

In [5]:
import os
import shutil
import pandas as pd
import numpy as np

In [8]:
np.random.seed(42069)
dask_data_dir = "daskdata"
df_count = sum(client.nthreads().values()) * 2

In [None]:
shutil.rmtree(dask_data_dir, ignore_errors=True)
os.makedirs(dask_data_dir, exist_ok=True)

In [None]:
url = f"https://borza-hotelcom-data.s3.eu-central-1.amazonaws.com/soccerway-event_df.pkl"

In [None]:
(
    pd.read_pickle(url)
    .assign(partition=lambda _df: np.random.randint(0, df_count, size=_df.shape[0]))
    .groupby("partition")
    .apply(
        lambda _df: _df.to_csv(os.path.join(dask_data_dir, f"event-{_df['partition'].iloc[0]}.csv"), index=False)
    )
)

## play with it

In [6]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:38223  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 8.23 GB


In [9]:
ddf = dd.read_csv(f"{dask_data_dir}/*.csv")

In [10]:
ddf

Unnamed: 0_level_0,match_id,person_id,time,event,partition
npartitions=8,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,int64,int64,object,object,int64
,...,...,...,...,...
...,...,...,...,...,...
,...,...,...,...,...
,...,...,...,...,...


In [11]:
ddf.head()

Unnamed: 0,match_id,person_id,time,event,partition
0,2425230,121322,33',y-card,0
1,2425230,245706,80',y-card,0
2,2480644,305021,67',goal,0
3,2480644,305021,84',goal,0
4,801008,88312,65',goal,0


In [13]:
ddf["event"].unique()

Dask Series Structure:
npartitions=1
    object
       ...
Name: event, dtype: object
Dask Name: unique-agg, 41 tasks

In [15]:
ddf["event"].unique().compute()

0          y-card
1            goal
2    penalty-goal
3      2nd-y-card
4        own-goal
5          r-card
6    penalty-miss
Name: event, dtype: object

In [None]:
ddf["time"].value_counts().compute()

In [None]:
def time_parser(df):
    return df["time"].str.replace("'", "").replace("", "np.nan").apply(eval)

In [None]:
ddf.assign(parsed_time=time_parser)

In [18]:
_s = ddf["match_id"]

In [19]:
ddf.sort_values("match_id")

AttributeError: 'DataFrame' object has no attribute 'sort_values'

In [None]:
min_bins = np.arange(0,101,10)

In [None]:
def time_parser(df):
    return df["time"].str.replace("'", "").replace("", "np.nan").apply(eval, meta=pd.Series(dtype=np.float64))

In [None]:
ddf.assign(parsed_time=time_parser)

In [None]:
def time_binner(df):
    return pd.cut(df["parsed_time"], bins=min_bins)

In [None]:
ddf.assign(parsed_time=time_parser).assign(time_bin=time_binner)

In [None]:
ddf.assign(parsed_time=time_parser).map_partitions(
    lambda df: df.assign(time_bin=time_binner)
    .groupby("time_bin")["event"]
    .value_counts(),
    meta=pd.Series(dtype=float),
).compute().groupby(["event", "time_bin"]).sum().drop("y-card").sort_values(ascending=False)

In [None]:
df = pd.read_pickle(url)

In [None]:
by_person = df.loc[lambda df: df["event"].str.contains("penalty"), :].groupby("person_id").apply(
    lambda df: df["event"].value_counts().to_frame().T
)

In [None]:
by_person.loc[lambda df: df.notnull().all(axis=1),:].assign(rate=lambda df: df["penalty-goal"] / df.sum(axis=1))["rate"].describe()

In [None]:
by_person.loc[lambda df: df.notnull().all(axis=1), :].assign(
    rate=lambda df: df["penalty-goal"] / df.sum(axis=1)
).loc[lambda df: df.sum(axis=1) > 20].sort_values("rate")

In [None]:
by_person.loc[lambda df: df.notnull().all(axis=1),:].sum().pipe(lambda df: df / df.sum())

In [None]:
by_person.sum()

In [None]:
by_person.to_frame().reset_index().pivot_table()