In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
pd.set_option('display.max_columns', 500)

import dask
import dask.dataframe as dd

from helpers import *
import keggler as kg

import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns

import gc
gc.enable()

import warnings
warnings.simplefilter(action='ignore', category=Warning)

import os, psutil

# Set up a logger to dump messages to both log file and notebook
import logging as logging
def ini_log(filename):
    logger = logging.getLogger(__name__)
    ## avoid multiple printouts due to same handlers added several times
    if not logger.handlers:
        logger.setLevel(logging.DEBUG)

        handlers = [logging.StreamHandler(None), logging.FileHandler(filename, 'a')]

        fmt=logging.Formatter('%(asctime)-15s: %(levelname)s  %(message)s')
        for h in handlers:
            h.setFormatter(fmt)
            logger.addHandler(h)
    return logger
        
log = ini_log('out.log')

#PATH='data_mini/'
#prefix='_mini'

PATH='data/'
SUB_PATH='test_set/' #test_set #training_set
prefix='_0_2018071'

print(os.listdir(PATH))


['submissions', 'test_set', 'track_features', 'training_set']


In [2]:
from multiprocessing.pool import ThreadPool
import dask
dask.config.set(pool=ThreadPool(3))

<dask.config.set at 0x7fb313f99048>

# Read in training data

In [3]:
# Training
# fin = '{}/training_set/log{}*.csv.gz'.format(PATH,prefix)

# Test
# fin=['{}/{}/log_prehistory_20180715_000000000000.csv.gz'.format(PATH,SUB_PATH),
#      '{}/{}/log_prehistory_20180716_000000000000.csv.gz'.format(PATH,SUB_PATH)
#     ]
fin = '{}/{}/log_input_*.csv.gz'.format(PATH,SUB_PATH)

In [4]:
fin

'data//test_set//log_input_*.csv.gz'

In [5]:
dd_trn = dd.read_csv(fin,
                     #usecols=[i for i in range(21) if i != 16], # disable this to process data/test_set/log_input_
                     compression='gzip')

In [6]:
# dd_trn.memory_usage(deep=True).compute() / 1024**2

In [7]:
dd_trn.dtypes

session_id          object
track_id_clean      object
session_position     int64
session_length       int64
dtype: object

In [8]:
#dd_trn.head(5)

## Reduce memory footprint

In [9]:
reasons = {'trackdone': 1, 'fwdbtn': 2, 'trackerror': 8, 'remote': 7, 'clickrow': 4, 'backbtn': 3, 'playbtn': 6, 'appload': 5, 'endplay': 9, 'logout':10}
enc = {
    'hist_user_behavior_reason_start': reasons,
    'context_type': {'radio': 3, 'personalized_playlist': 4, 'charts': 6, 'user_collection': 2, 'editorial_playlist': 1, 'catalog': 5},
    'hist_user_behavior_reason_end': reasons
}

for c in enc:
    if c in dd_trn.columns:
        dd_trn[c] = dd_trn[c].map(enc[c]).fillna(0).astype(np.uint8)

In [10]:
# dd_trn.memory_usage(deep=True).sum().compute()/1024**2

In [11]:
# dd_trn.memory_usage(deep=True).compute()/1024**2

In [12]:
dd_trn.dtypes

session_id          object
track_id_clean      object
session_position     int64
session_length       int64
dtype: object

# Read in track features

In [13]:
df_trk = pd.concat([pd.read_csv(PATH+'track_features/'+f, 
                     usecols=range(22), 
                     dtype={'release_year': np.uint32},
                     compression='gzip'
                    ) for f in os.listdir(PATH+'track_features/') if f.startswith('tf_')
         ], axis=0).set_index('track_id')

In [14]:
df_trk['mode'] = df_trk['mode'].map({'major': 0, 'minor':1}).astype(np.uint8)

In [15]:
df_trk.memory_usage(deep=True)/1024**2

Index                     335.795269
duration                   28.277496
release_year               14.138748
us_popularity_estimate     28.277496
acousticness               28.277496
beat_strength              28.277496
bounciness                 28.277496
danceability               28.277496
dyn_range_mean             28.277496
energy                     28.277496
flatness                   28.277496
instrumentalness           28.277496
key                        28.277496
liveness                   28.277496
loudness                   28.277496
mechanism                  28.277496
mode                        3.534687
organism                   28.277496
speechiness                28.277496
tempo                      28.277496
time_signature             28.277496
valence                    28.277496
dtype: float64

# Merge DS with Track features

In [16]:
dd_out = dd_trn.merge(df_trk, 
                      how='left', 
                      right_index=True, 
                      left_on='track_id_clean'
                     ).drop(['track_id_clean'], 
                            axis=1
                           )

Cast boolean into `unit8` to reduce file size

In [18]:
for c,dtype in col_dtype.items():
    if c in dd_out.columns:
        dd_out[c] = dd_out[c].astype(dtype)

In [19]:
dd_out.dtypes

session_id                 object
session_position             int8
session_length               int8
duration                  float16
release_year                int16
us_popularity_estimate    float16
acousticness              float16
beat_strength             float16
bounciness                float16
danceability              float16
dyn_range_mean            float16
energy                    float16
flatness                  float16
instrumentalness          float16
key                         int16
liveness                  float16
loudness                  float16
mechanism                 float16
mode                         int8
organism                  float16
speechiness               float16
tempo                     float16
time_signature              int16
valence                   float16
dtype: object

In [20]:
# dd_out.memory_usage(deep=True).sum().compute()/1024**2

# Store the DD

In [21]:
dd_out.npartitions

66

In [22]:
# dd_out.to_csv(PATH+'/'+SUB_PATH+'/outDD_v2_*.csv.gz', index=False, float_format='%.5f', compression='gzip')
dd_out.to_hdf(PATH+'/'+SUB_PATH+'/subDD_v2_*.h5', key='df')

['data//test_set//subDD_v2_00.h5',
 'data//test_set//subDD_v2_01.h5',
 'data//test_set//subDD_v2_02.h5',
 'data//test_set//subDD_v2_03.h5',
 'data//test_set//subDD_v2_04.h5',
 'data//test_set//subDD_v2_05.h5',
 'data//test_set//subDD_v2_06.h5',
 'data//test_set//subDD_v2_07.h5',
 'data//test_set//subDD_v2_08.h5',
 'data//test_set//subDD_v2_09.h5',
 'data//test_set//subDD_v2_10.h5',
 'data//test_set//subDD_v2_11.h5',
 'data//test_set//subDD_v2_12.h5',
 'data//test_set//subDD_v2_13.h5',
 'data//test_set//subDD_v2_14.h5',
 'data//test_set//subDD_v2_15.h5',
 'data//test_set//subDD_v2_16.h5',
 'data//test_set//subDD_v2_17.h5',
 'data//test_set//subDD_v2_18.h5',
 'data//test_set//subDD_v2_19.h5',
 'data//test_set//subDD_v2_20.h5',
 'data//test_set//subDD_v2_21.h5',
 'data//test_set//subDD_v2_22.h5',
 'data//test_set//subDD_v2_23.h5',
 'data//test_set//subDD_v2_24.h5',
 'data//test_set//subDD_v2_25.h5',
 'data//test_set//subDD_v2_26.h5',
 'data//test_set//subDD_v2_27.h5',
 'data//test_set//su