In [1]:
import pandas as pd
import numpy as np
import os, pickle

## **Log data**


* 29GB의 아주 큰 데이터로 조심스럽게 loading해야 할 필요가 있음
* 실습에서는 최초 5천만건만 잘라서 사용 (시스템에 무리가 감)
* 조금씩 데이터를 불러와서 크기를 파악하면서 작업 진행


In [2]:
!pwd

/home/M2017017/KKBox/notebook


In [3]:
! du -sh ../tut_data/user_logs_small.csv

3.7G	../tut_data/user_logs_small.csv


#### 차지할 memory 계산

In [4]:
user_log =  pd.read_csv('../tut_data/user_logs_small.csv', nrows=20)
import sys
print('size = {} bytes'.format(sys.getsizeof(user_log)))

size = 3404 bytes


$\Rightarrow$ 20 record가 3404byte임

In [5]:
print('# sample per GB = {} obs'.format(1024 * 1024 * 1204 / (3404/20)))

# sample per GB = 7417658.66039953 obs


$\Rightarrow$ 1GB에 약 700만건 들어감

### Free memory check
 * 126GB 정도 free


In [23]:
! free -m

              total        used        free      shared  buff/cache   available
Mem:         128826       77795       15673         304       35357       49343
Swap:         31249        9451       21798


 buff/cache  +  available

## Rough하게 data 파악

In [7]:
user_log = pd.read_csv('../tut_data/user_logs_small.csv', nrows=7417658)

In [8]:
user_log.head()

Unnamed: 0,msno,date,num_25,num_50,num_75,num_985,num_100,num_unq,total_secs
0,rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=,20150513,0,0,0,0,1,1,280.335
1,rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=,20150709,9,1,0,0,7,11,1658.948
2,yxiEWwE9VR5utpUecLxVdQ5B7NysUPfrNtGINaM2zA8=,20150105,3,3,0,0,68,36,17364.956
3,yxiEWwE9VR5utpUecLxVdQ5B7NysUPfrNtGINaM2zA8=,20150306,1,0,1,1,97,27,24667.317
4,yxiEWwE9VR5utpUecLxVdQ5B7NysUPfrNtGINaM2zA8=,20150501,3,0,0,0,38,38,9649.029


In [9]:
print('size = {} MB'.format(sys.getsizeof(user_log)/1024/1024))

size = 1167.215036392212 MB


$\Rightarrow$ 약 10초 걸렸으며 메모리 사이즈는 약 1GB

### 인별 log 개수 파악

In [10]:
tmp_pd = pd.DataFrame(user_log.groupby(['msno']).count()['date']).reset_index()

In [11]:
tmp_pd.head()

Unnamed: 0,msno,date
0,+++FOrTS7ab3tIgIh8eWwX4FqRv8w/FoiOuyXsFvphY=,1
1,+++IZseRRiQS9aaSkH6cMYU6bGDcxUieAi/tH67sC5s=,12
2,+++dz9ZCWE2HB/47pJU82NJXQzQuZDx1Wm50YSk/kKk=,1
3,+++hVY1rZox/33YtvDgmKA2Frg/2qhkz12B9ylCvh8o=,2
4,+++l/EXNMLTijfLBa8p2TUVVVp2aFGSuUI/h7mLmthw=,12


In [12]:
import numpy as np
import scipy.special

from bokeh.layouts import gridplot
from bokeh.plotting import figure, show, output_file
from bokeh.io import show, output_file, output_notebook
from bokeh.models import ColumnDataSource
from bokeh.palettes import Spectral5, Oranges9, Viridis256
from bokeh.sampledata.autompg import autompg as df
from bokeh.transform import factor_cmap
output_notebook()

In [13]:
p1 = figure(title="Distribution of logs",tools="save", background_fill_color="#E8DDCB")
hist, edges = np.histogram(tmp_pd['date'], density=True)

In [24]:
hist

array([0.26299566, 0.09669029, 0.03802282, 0.06549791, 0.02728199,
       0.04597413, 0.03918554, 0.01772708, 0.02869043, 0.00293414])

In [25]:
edges

array([ 1. ,  2.6,  4.2,  5.8,  7.4,  9. , 10.6, 12.2, 13.8, 15.4, 17. ])

In [14]:
# 각 column별로 요약자료임
tmp_pd.describe()

Unnamed: 0,date
count,1508321.0
mean,4.917825
std,4.202067
min,1.0
25%,1.0
50%,3.0
75%,8.0
max,17.0


In [15]:
p1.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:],
        fill_color="#036564", line_color="#033649")
p1.legend.location = "center_right"
p1.legend.background_fill_color = "darkgrey"
p1.xaxis.axis_label = '# of logs'
p1.yaxis.axis_label = 'density'
show(p1)

In [16]:
del tmp_pd

###### 평균 6개의 log가 있음

## 총 line의 개수 파악

In [17]:
# 총 line의 갯수를 세어 파일의 크기 파악

total_nrow = 0
with open('../tut_data/user_logs_small.csv', 'rb') as f:
    for lines in f:
        total_nrow += 1
        if total_nrow % 10000000 == 0:
            print('nrow = {}'.format(total_nrow))
print(total_nrow)

nrow = 10000000
nrow = 20000000
nrow = 30000000
nrow = 40000000
nrow = 50000000
50000000


원래는 
* 약 2분 정도 소요
* $\Rightarrow$ 3억 9천만건 정도 됨
이지만, 지금은 작은 데이터만 사용 $\Rightarrow$ 5천만건

### Log를 구간별로 잘라서 summary 정보를 만들어야 함
* 필요한 정보는 고객별, 노래를 들은 횟수와, 총 시간 정보, 날짜는 count의 형태로

In [18]:
usr_log_summary = pd.DataFrame()

#### Sequentially

In [19]:
nrows_to_read = 10000000
total_nrows = 50000000

n_iter = total_nrows // nrows_to_read
print('niter = {}'.format(n_iter))

keep_col = ['date_count', 'num_25_sum', 'num_50_sum', 'num_75_sum', 'num_985_sum', 'num_100_sum','num_unq_sum', 'total_secs_sum']
col_name = []
for i in range(n_iter):
    if i == 10:
        break
    print(i)
    if i == 0:
        tmp = pd.read_csv('../tut_data/user_logs_small.csv', header = 0, nrows=nrows_to_read)
        col_name = tmp.columns
    else:
        tmp = pd.read_csv('../tut_data/user_logs_small.csv', header = None
                        , skiprows = nrows_to_read*i + 1
                        , nrows=nrows_to_read)
    if tmp.shape[0] == 0:
        break
                        
    if i == 0:
        tmp_1 = tmp.groupby(['msno']).agg(['count', 'sum'])
        tmp_1.columns = ["_".join(x) for x in tmp_1.columns.ravel()]
        print(tmp_1.columns)
        tmp_2 = tmp_1[keep_col].reset_index()
        usr_log_summary = tmp_2
        #print('i = {}, shape = {}'.format(i, usr_log_summary.shape))
        
    else:
        tmp.columns = col_name
        print('tmp columns = {}'.format(col_name))
        tmp_1 = tmp.groupby(['msno']).agg(['count', 'sum'])
        tmp_1.columns = ["_".join(x) for x in tmp_1.columns.ravel()]
        print(tmp_1.columns)
        tmp_2 = tmp_1[keep_col].reset_index()
        usr_log_summary = pd.concat([usr_log_summary, tmp_2], axis = 0)
        usr_log_summary = usr_log_summary.groupby(['msno']).sum().reset_index()
        #print('i = {}, tmp_2 shape = {}'.format(i, tmp_2.shape))
        #print('i = {}, usr_log_summary shape = {}'.format(i, usr_log_summary.shape))
        #print('tmp_2 head = {}'.format(tmp_2[['msno', 'date_count', 'num_25_sum']].tail(3)))
        #print('usr_log_summary head = {}'.format(usr_log_summary[['msno', 'date_count', 'num_25_sum']].tail(3)))
    del [tmp, tmp_1, tmp_2]

niter = 5
0
Index(['date_count', 'date_sum', 'num_25_count', 'num_25_sum', 'num_50_count',
       'num_50_sum', 'num_75_count', 'num_75_sum', 'num_985_count',
       'num_985_sum', 'num_100_count', 'num_100_sum', 'num_unq_count',
       'num_unq_sum', 'total_secs_count', 'total_secs_sum'],
      dtype='object')
1
tmp columns = Index(['msno', 'date', 'num_25', 'num_50', 'num_75', 'num_985', 'num_100',
       'num_unq', 'total_secs'],
      dtype='object')
Index(['date_count', 'date_sum', 'num_25_count', 'num_25_sum', 'num_50_count',
       'num_50_sum', 'num_75_count', 'num_75_sum', 'num_985_count',
       'num_985_sum', 'num_100_count', 'num_100_sum', 'num_unq_count',
       'num_unq_sum', 'total_secs_count', 'total_secs_sum'],
      dtype='object')
2
tmp columns = Index(['msno', 'date', 'num_25', 'num_50', 'num_75', 'num_985', 'num_100',
       'num_unq', 'total_secs'],
      dtype='object')
Index(['date_count', 'date_sum', 'num_25_count', 'num_25_sum', 'num_50_count',
       'num_50_

In [20]:
del usr_log_summary

## Multiprocess

In [21]:
from multiprocessing import Lock, Process, Queue, Manager
import time
import pandas as pd
import pickle

# Info
nrows_to_read = 3000000
total_nrows = 392106544
# nrows_to_read = 100
# total_nrows = 602
#number_of_task = total_nrows // nrows_to_read
number_of_task = 10
number_of_processes = 2


def worker(q, oq, _n_to_read):
    while(True):
        try:
            process_id = q.get(block = False)
            print('process id = {}'.format(process_id))
        except:
            print('process {} done!'.format(process_id))
            break
        
        chunk = int(process_id.split(' ')[-1])

        columns = list(pd.read_csv('../tut_data/user_logs_small.csv', header = 0, nrows= 1))
        tmp = pd.read_csv('../tut_data/user_logs_small.csv', header = None
                            , skiprows = _n_to_read * chunk + 1
                            , nrows= _n_to_read)
        tmp.columns = columns
        tmp_1 = tmp.groupby(['msno']).agg(['count', 'sum'])
        tmp_1.columns = ["_".join(x) for x in tmp_1.columns.ravel()]
        tmp_2 = tmp_1[keep_col].reset_index()
        print('tmp_2.columns = {}'.format(tmp_2.columns))
        oq.put(tmp_2) # 결과를 output queue에 저장


# start
processes = []
manager = Manager()

worker_q = manager.Queue() # Job에 대한 정보를 가지고 있는 queue
output_q = manager.Queue() # 결과물을 저장할 queue: queue에 아무것도 없을 때까지 계속 process를 monitoring

for i in range(number_of_task): # Job 설정
    worker_q.put("Task no " + str(i))
    print('Task no = {}'.format(i))
print('job registered!!')

# creating processes
for w in range(number_of_processes):
    p = Process(target=worker, args=(worker_q, output_q, nrows_to_read))
    processes.append(p)

# Start process
[p.start() for p in processes]

# completing process
[p.join() for p in processes]

log_summary = pd.DataFrame()

# print the output
while not output_q.empty():
    res = output_q.get()
    log_summary = pd.concat([log_summary, res], axis = 0)
    log_summary = log_summary.groupby(['msno']).sum().reset_index()
    
    print('tmp output shape = {}'.format(res.shape))
    print('log_summary shape = {}'.format(log_summary.shape))
    print('log_summary shape = {}'.format(res.head(2)))

Task no = 0
Task no = 1
Task no = 2
Task no = 3
Task no = 4
Task no = 5
Task no = 6
Task no = 7
Task no = 8
Task no = 9
job registered!!
process id = Task no 0
process id = Task no 1
tmp_2.columns = Index(['msno', 'date_count', 'num_25_sum', 'num_50_sum', 'num_75_sum',
       'num_985_sum', 'num_100_sum', 'num_unq_sum', 'total_secs_sum'],
      dtype='object')
process id = Task no 2
tmp_2.columns = Index(['msno', 'date_count', 'num_25_sum', 'num_50_sum', 'num_75_sum',
       'num_985_sum', 'num_100_sum', 'num_unq_sum', 'total_secs_sum'],
      dtype='object')
process id = Task no 3
tmp_2.columns = Index(['msno', 'date_count', 'num_25_sum', 'num_50_sum', 'num_75_sum',
       'num_985_sum', 'num_100_sum', 'num_unq_sum', 'total_secs_sum'],
      dtype='object')
process id = Task no 4
tmp_2.columns = Index(['msno', 'date_count', 'num_25_sum', 'num_50_sum', 'num_75_sum',
       'num_985_sum', 'num_100_sum', 'num_unq_sum', 'total_secs_sum'],
      dtype='object')
process id = Task no 5
tmp_2

In [22]:
! ls ../data

ls: cannot access '../data': No such file or directory
