# Simulation of reading, transforming and dumping of offers-datasets

## imports
dask related

In [1]:
import dask.bag as _dbag

# inside our notebook the dask extension manages our cluster and client
# from dask.distributed import Client as _Client

# the following imports can be useful to examine the optimal number of partitions
# from dask.distributed import progress as _progress
# from dask.diagnostics import ProgressBar
# from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler

scenario related

In [2]:
import gzip as _gzip

import dateutil as _dateutil
import pytz as _pytz
import ujson as _json
from toolz import curry as _curry

from settings import (
    datasets_files,
    combined_destination,
    partitions,
    records_per_partition,
)

## definition of required dask client
pass in same scheduler-address as of local cluster

In [3]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:36051  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 16.66 GB


## definition of transformation functions

In [4]:
def add_local_timestamp(offer, timezone="Europe/Berlin"):
    offer["timestamp"] = (
        _dateutil.parser.parse(offer["time"])
        .astimezone(_pytz.timezone(timezone))
        .isoformat()
    )
    return offer

In [5]:
def shipping_costs_filter(offer, max_price):
    return offer["shipping_cost"] <= max_price

In [6]:
def add_shipping_costs_percentage(offer, round_to=2):
    offer["shipping_cost_percentage"] = round(
        offer["shipping_cost"] / offer["price"] * 100,
        round_to
    )
    return offer

In [7]:
def category_filter(offer, category_ids):
    return offer["category_id"] in category_ids

## transformations and calculations with the datasets
read the fake datasets. Then perform transformations and calculations.

In [8]:
pipeline = (
    _dbag.read_text(datasets_files)
    .map(_json.loads)
    .repartition(npartitions=partitions)
    .filter(_curry(category_filter, category_ids=range(5, 25)))
    .filter(_curry(shipping_costs_filter, max_price=6))
    .map(add_local_timestamp)
    .map(add_shipping_costs_percentage)
    .map(_json.dumps)
)

## process pipelines
Until now nothing is done, only the pipeline to execute is defined.
The related pipeline-functions are delayed.
To execute the pipeline we have to `compute`.

In [9]:
data = pipeline.compute()

## compare effect of number of partitions

In [10]:
import time
import pandas as pd
import pandas_bokeh

pandas_bokeh.output_notebook()

partitions_list = [5, 10, 16, 25, 50, 75, 100, 200]
amount_runs = 10

In [11]:
def stats_for_pipeline(func, partitions_list, amount_runs):
    timings = {}
    for npartitions in partitions_list:
        for _ in range(amount_runs):
            starttime = time.time()
            func(npartitions=npartitions)
            data = pipeline.compute()
            timings.setdefault(str(npartitions), []).append(time.time() - starttime)
    stats = {
        "timings": timings,
        "stats": {}
    }
    for key, value in timings.items():
        stats["stats"][key] = sum(value) / amount_runs
    return stats

In [12]:
def run_pipeline_repartition_after_json_loads(npartitions):
    pipeline = (
        _dbag
        .read_text(datasets_files)
        .map(_json.loads)
        .repartition(npartitions=npartitions)
        .filter(_curry(category_filter, category_ids=range(5, 25)))
        .filter(_curry(shipping_costs_filter, max_price=6))
        .map(add_local_timestamp)
        .map(add_shipping_costs_percentage)
        .map(_json.dumps)
        .repartition(npartitions=1)
    )
    pipeline.compute()

In [13]:
stats = stats_for_pipeline(
    func=run_pipeline_repartition_after_json_loads,
    partitions_list=partitions_list,
    amount_runs=amount_runs
)
stats

{'timings': {'5': [1.0754623413085938,
   0.947650671005249,
   1.1184160709381104,
   0.9181466102600098,
   0.9405834674835205,
   0.9441275596618652,
   0.9694132804870605,
   1.0198206901550293,
   1.0773534774780273,
   0.9824705123901367],
  '10': [0.9694733619689941,
   0.9172508716583252,
   0.9527764320373535,
   1.007547378540039,
   1.0650088787078857,
   0.9321062564849854,
   0.9756920337677002,
   0.9463675022125244,
   0.9694352149963379,
   1.0092852115631104],
  '16': [0.9170272350311279,
   0.9957423210144043,
   1.0086669921875,
   0.914116621017456,
   0.948265790939331,
   0.9075121879577637,
   0.8957934379577637,
   1.0553560256958008,
   1.101820707321167,
   1.1400623321533203],
  '25': [1.1452827453613281,
   1.0719215869903564,
   1.0593006610870361,
   1.0963163375854492,
   1.0741980075836182,
   1.119070291519165,
   1.091841220855713,
   1.0703251361846924,
   1.076174259185791,
   1.1137492656707764],
  '50': [0.9577665328979492,
   0.958881139755249,
  

In [14]:
dfr = pd.DataFrame(stats['timings'])
dfr.plot_bokeh()

In [15]:
dfr.describe().T["mean"].plot_bokeh(kind="bar")

### position of repartition

In [16]:
def run_pipeline_repartition_first(npartitions):
    pipeline = (
        _dbag
        .read_text(datasets_files)
        .repartition(npartitions=npartitions)
        .map(_json.loads)
        .filter(_curry(category_filter, category_ids=range(5, 25)))
        .filter(_curry(shipping_costs_filter, max_price=6))
        .map(add_local_timestamp)
        .map(add_shipping_costs_percentage)
        .map(_json.dumps)
        .repartition(npartitions=1)
    )
    pipeline.compute()

In [17]:
stats = stats_for_pipeline(
    func=run_pipeline_repartition_first,
    partitions_list=partitions_list,
    amount_runs=amount_runs
)
stats

{'timings': {'5': [1.1514713764190674,
   1.0184791088104248,
   1.0899267196655273,
   1.2848212718963623,
   0.9763009548187256,
   1.0187466144561768,
   1.025041103363037,
   1.315382957458496,
   1.0882835388183594,
   1.049999475479126],
  '10': [1.036437749862671,
   1.0038478374481201,
   0.9926064014434814,
   1.0350306034088135,
   1.1449806690216064,
   1.0774900913238525,
   1.044585943222046,
   0.9781143665313721,
   1.0326666831970215,
   1.002838134765625],
  '16': [0.9697940349578857,
   1.0066781044006348,
   0.9811439514160156,
   0.9834551811218262,
   1.0146961212158203,
   0.9882469177246094,
   1.0126073360443115,
   1.0890233516693115,
   0.9730303287506104,
   1.0373554229736328],
  '25': [0.9589347839355469,
   0.9240622520446777,
   1.0020256042480469,
   0.9556350708007812,
   1.0341017246246338,
   0.9853677749633789,
   1.0746450424194336,
   0.982172966003418,
   1.0694732666015625,
   1.0157101154327393],
  '50': [0.9914984703063965,
   0.961884260177612

In [18]:
dfr = pd.DataFrame(stats['timings'])
dfr.plot_bokeh()

In [19]:
dfr.describe().T["mean"].plot_bokeh(kind="bar")

## combine data into one file

In [20]:
def write_offers_into_one_file(offers, destination, encoding="utf-8"):
    with _gzip.GzipFile(combined_destination, "w") as output_file:
        output_file.write("\n".join(data).encode(encoding))

In [21]:
write_offers_into_one_file(
    offers=data,
    destination=combined_destination
)

A look into the final data:

In [22]:
!zcat output_data/test.json.gz | head -n 1 | jq


gzip: stdout: Broken pipe
[1;39m{
  [0m[34;1m"id"[0m[1;39m: [0m[0;39m9728448[0m[1;39m,
  [0m[34;1m"product_id"[0m[1;39m: [0m[0;39m4734261[0m[1;39m,
  [0m[34;1m"category_id"[0m[1;39m: [0m[0;39m18[0m[1;39m,
  [0m[34;1m"name"[0m[1;39m: [0m[0;32m"Business Difference"[0m[1;39m,
  [0m[34;1m"brand"[0m[1;39m: [0m[0;32m"Tv-Hour-Billion"[0m[1;39m,
  [0m[34;1m"time"[0m[1;39m: [0m[0;32m"2020-10-14T12:30:55+00:00"[0m[1;39m,
  [0m[34;1m"price"[0m[1;39m: [0m[0;39m98.27[0m[1;39m,
  [0m[34;1m"shipping_cost"[0m[1;39m: [0m[0;39m1[0m[1;39m,
  [0m[34;1m"currency"[0m[1;39m: [0m[0;32m"NZD"[0m[1;39m,
  [0m[34;1m"identifiers"[0m[1;39m: [0m[1;39m{
    [0m[34;1m"eans"[0m[1;39m: [0m[1;39m[
      [0;32m"978-1-78204-666-0"[0m[1;39m,
      [0;32m"978-0-663-05605-7"[0m[1;39m,
      [0;32m"978-1-394-90834-9"[0m[1;39m
    [1;39m][0m[1;39m,
    [0m[34;1m"mpnrs"[0m[1;39m: [0m[1;39m[
      [0;32m"978-1-59201-271-8"[0m

Look how many offers are inside this file:

In [23]:
!zcat output_data/test.json.gz | wc -l

3181
