## Przygotowanie danych

In [2]:
import pandas as pd
import pickle
from dask_ml import preprocessing
import datetime

In [3]:
from dask.distributed import Client, LocalCluster

In [4]:
cluster = LocalCluster()
client = Client(cluster)

In [4]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:40489  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 16.19 GB


In [2]:
df_headers = ['Sale','SalesAmountInEuro','time_delay_for_conversion','click_timestamp',
              'nb_clicks_1week','product_price','product_age_group','device_type',
              'audience_id','product_gender','product_brand',
              'prod_category1','prod_category2','prod_category3','prod_category4',
              'prod_category5','prod_category6','prod_category7','product_country',
              'product_id','product_title','partner_id','user_id']

dtypes={'prod_category1': 'category','prod_category2': 'category',
        'prod_category3': 'category','prod_category4': 'category',
        'prod_category5': 'category','prod_category6': 'category',
        'prod_category7': 'category', 'product_category': 'str'}

encoder_labels = ['product_age_group','device_type','audience_id',
                  'product_gender', 'product_brand','prod_category1','prod_category2',
                  'prod_category3','prod_category4','prod_category5',
                  'prod_category6','prod_category7','product_country',
                  'product_id','product_title','partner_id','user_id']

df = pd.read_csv("../../../data/CriteoSearchData.csv", header=None,
                 names=df_headers, delimiter='\t', dtype=dtypes)

In [4]:
# potrzebne do labelencodera
df['product_title'] = df.product_title.astype(str)
df['product_title'] = df['product_title'].replace('nan', 'BROKEN')

df['product_title'] = df.product_title.astype(str)
df['user_id'] = df.user_id.astype(str)

In [3]:
df['click_timestamp'] = df['click_timestamp'].apply(lambda x: str(pd.to_datetime(int(x), unit='s').date()))

### Praca z dask_ml.preprocessing.LabelEncoder

#### Przygotowanie LabelEncodera

In [None]:
# not needed anymore, load pickle file instead
labelEncoders = {x:preprocessing.LabelEncoder() for x in encoder_labels}
for k,v in labelEncoders.items():
    print('starting {}'.format(k))
    df[k] = v.fit_transform(df[k])
    print('done')

#### Zapis labelEncodera do pliku

In [31]:
import pickle
with open('../lablencoder_new.pickle', 'wb') as handle: #wiem, że tu jest literówka, trudno
    pickle.dump(labelEncoders, handle, protocol=pickle.HIGHEST_PROTOCOL)

#### Wykorzystanie labelEncodera

Załaduj przygotowany labelEncoder za pomocą:

In [1]:
import pickle
labelEncoders = pickle.load(open("../../../data/lablencoder.pickle","rb"))

In [4]:
labelEncoders

{'product_age_group': LabelEncoder(),
 'device_type': LabelEncoder(),
 'audience_id': LabelEncoder(),
 'product_gender': LabelEncoder(),
 'product_brand': LabelEncoder(),
 'prod_category1': LabelEncoder(),
 'prod_category2': LabelEncoder(),
 'prod_category3': LabelEncoder(),
 'prod_category4': LabelEncoder(),
 'prod_category5': LabelEncoder(),
 'prod_category6': LabelEncoder(),
 'prod_category7': LabelEncoder(),
 'product_country': LabelEncoder(),
 'product_id': LabelEncoder(),
 'product_title': LabelEncoder(),
 'partner_id': LabelEncoder(),
 'user_id': LabelEncoder()}

#### Badanie zawartości labelEncodera

In [11]:
labelEncoders

{'product_age_group': LabelEncoder(),
 'device_type': LabelEncoder(),
 'audience_id': LabelEncoder(),
 'product_gender': LabelEncoder(),
 'product_brand': LabelEncoder(),
 'prod_category1': LabelEncoder(),
 'prod_category2': LabelEncoder(),
 'prod_category3': LabelEncoder(),
 'prod_category4': LabelEncoder(),
 'prod_category5': LabelEncoder(),
 'prod_category6': LabelEncoder(),
 'prod_category7': LabelEncoder(),
 'product_country': LabelEncoder(),
 'product_id': LabelEncoder(),
 'product_title': LabelEncoder(),
 'partner_id': LabelEncoder(),
 'user_id': LabelEncoder()}

Translacja kolumn na wartości labelEncodera

In [7]:
for k,v in labelEncoders.items():
    df[k] = v.transform(df[k].values)
    print('{} done'.format(k))

product_age_group done
device_type done
audience_id done
product_gender done
product_brand done
prod_category1 done
prod_category2 done
prod_category3 done
prod_category4 done
prod_category5 done
prod_category6 done
prod_category7 done
product_country done
product_id done
product_title done
partner_id done
user_id done


In [None]:
for k,v in labelEncoders.items():
    print("{} table lookup".format(k))
    print("showing {} classes:".format(len(v.classes_)))
    print(v.classes_)
    print("\n")

Kolumna product_title zawiera wartość BROKEN zastępującą wcześniej pojawiający sie NaN

In [37]:
'BROKEN' in labelEncoders.get('product_title').classes_

True

In [38]:
labelEncoders.get('product_title').transform(['BROKEN'])

array([596334])

In [39]:
labelEncoders.get('product_title').inverse_transform([596334])

array(['BROKEN'], dtype=object)

tornado.application - ERROR - Uncaught exception GET /status/ws (127.0.0.1)
HTTPServerRequest(protocol='http', host='127.0.0.1:8787', method='GET', uri='/status/ws', version='HTTP/1.1', remote_ip='127.0.0.1')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/tornado/websocket.py", line 954, in _accept_connection
    open_result = handler.open(*handler.open_args, **handler.open_kwargs)
  File "/usr/local/lib/python3.8/dist-packages/tornado/web.py", line 3173, in wrapper
    return method(self, *args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/bokeh/server/views/ws.py", line 137, in open
    raise ProtocolError("Token is expired.")
bokeh.protocol.exceptions.ProtocolError: Token is expired.


Eksport całego przeparsowanego datasetu:

In [10]:
df.to_parquet('CPStimestampFormattedCorrectly.parquet')

Plik per partner:

In [6]:
grouped = df.groupby('partner_id')

In [13]:
for v in df['partner_id'].unique():
    grouped.get_group(v).to_parquet('../../data/partners/partner_' + str(v) + '.parquet')