# DB 연결

In [1]:
import psycopg

In [2]:
dbname = 'postgres'
user = 'postgres'
password = 'mysecretpassword'
host = '192.168.56.11'
port = 30080

In [3]:
conn = psycopg.connect(dbname = dbname, user = user,
                        password = password, host = host, port = port)

postgresql과 연결을 위해 psycopg를 사용합니다.

# 로우 데이터 처리 

In [4]:
import re
import pandas as pd

In [5]:
path = "data/access_970411.log"

In [6]:
pattern = re.compile('(?P<host>\S+) - - \[(?P<time>.*)\] "(?P<method>.*) (?P<contents>.*)" (?P<status>\d+) (?P<size>\d+) "(?P<ua>.*)"')

정규 표현식을 이용해 로우 데이터를 컬럼 별로 구분하여 데이터프레임에 적재합니다.

In [7]:
def parse_access_log(path):
    for line in open(path, 'rb'):
        line = line.decode(encoding='ascii', errors='ignore')
        for m in pattern.finditer(line):
            yield m.groups()

In [8]:
columns = ["host", 'time', 'method', 'contents', 'status', 'size', 'ua']

In [9]:
log_raw = pd.DataFrame(parse_access_log(path), columns = columns)

In [10]:
log_raw.head()

Unnamed: 0,host,time,method,contents,status,size,ua
0,8.136.30.91,1997-04-11 00:00:00,GET,/m/c.html,200,743,Opera/8.61.(Windows NT 5.1; af-ZA) Presto/2.9....
1,50.227.78.16,1997-04-11 00:00:01,GET,/a/i.mp4,200,1705,Opera/9.25.(X11; Linux x86_64; brx-IN) Presto/...
2,205.41.123.39,1997-04-11 00:00:02,GET,/d/b.html,200,291,Mozilla/5.0 (Macintosh; PPC Mac OS X 10_11_8 r...
3,69.255.196.161,1997-04-11 00:00:03,GET,/z/c/g.mp4,200,6389,Mozilla/5.0 (Windows 98; nhn-MX; rv:1.9.1.20) ...
4,213.189.137.77,1997-04-11 00:00:04,GET,/k/b.html,200,3900,Mozilla/5.0 (compatible; MSIE 8.0; Windows NT ...


In [11]:
visit_df = log_raw[["method", "status", "size"]].copy()

팩트 테이블인 visit 테이블에 삽입할 데이터프레임을 만들어줍니다. 

method, status, size 이외의 데이터는 외래키로 디멘션 테이블의 데이터들을 참조하면서 생성해줍니다.

# CLIENT 

In [12]:
with conn.cursor() as cur:
    try:
        cur.execute("CREATE TEMPORARY TABLE tmp_client (id int, host varchar(255)) ON COMMIT DROP")
        with cur.copy("COPY tmp_client FROM STDIN") as copy:
            for i, data in enumerate(log_raw["host"]):
                copy.write_row((i, data))
        cur.execute("INSERT INTO client(host) SELECT host FROM tmp_client ON CONFLICT DO NOTHING")
    except BaseException:
        conn.rollback()
    else:
        conn.commit()

copy는 중복 데이터를 걸러낼 수 없기 때문에 임시 테이블에 데이터를 copy하고 임시 테이블을 읽어 client 테이블에 삽입합니다. 이렇게 하면 copy의 속도도 잡고 중복 처리도 할 수 있어 편리합니다.

In [13]:
with conn.cursor() as cur:
    cur.execute("select * from client")
    client_dict = {key : value for value, key in cur.fetchall()}
    visit_df["client_key"] = log_raw["host"].map(lambda x: client_dict[x])

In [14]:
visit_df.head()

Unnamed: 0,method,status,size,client_key
0,GET,200,743,1
1,GET,200,1705,2
2,GET,200,291,3
3,GET,200,6389,4
4,GET,200,3900,5


현재 로우 데이터의 host 값에서 디멘션 테이블의 키를 얻어와 visit 테이블에 적용해야 하기 때문에 디멘션 테이블을 읽어와 host 값을 key로 디멘션 테이블의 pk를 값으로 하는 딕셔너리를 구성해줍니다.

그 후 일괄로 visit 데이터프레임에 적용하여 외래키를 채워줍니다. 이런 방식을 택한 이유는 디멘션 테이블을 조인하며 insert하면 db의 부담도 커지고 매우 느리기 때문입니다. 카디널리티가 작은 디멘션 테이블의 정보를 파이썬에 불러와 처리해주고 visit 데이터프레임을 copy해버리면 join과 insert를 병행하는 것은 비교가 되지 않을만큼 빠른 속도로 데이터를 적재할 수 있습니다.

아래 디멘션 테이블들도 동일하게 처리합니다.

# CONTENTS

In [15]:
with conn.cursor() as cur:
    try:
        cur.execute("CREATE TEMPORARY TABLE tmp_contents (id int, path varchar(65532)) ON COMMIT DROP")
        with cur.copy("COPY tmp_contents FROM STDIN") as copy:
            for i, data in enumerate(log_raw["contents"]):
                copy.write_row((i, data))
        cur.execute("INSERT INTO contents(path) SELECT path FROM tmp_contents ON CONFLICT DO NOTHING")
    except BaseException as e:
        conn.rollback()
        print(e)
    else:
        conn.commit()

In [16]:
with conn.cursor() as cur:
    cur.execute("select contents_key, path from contents")
    contents_dict = {key : value for value, key in cur.fetchall()}
    visit_df["contents_key"] = log_raw["contents"].map(lambda x: contents_dict[x])

In [17]:
visit_df.head()

Unnamed: 0,method,status,size,client_key,contents_key
0,GET,200,743,1,1
1,GET,200,1705,2,2
2,GET,200,291,3,3
3,GET,200,6389,4,4
4,GET,200,3900,5,5


# USER AGENT 

In [18]:
from user_agents import parse

In [19]:
def parse_ua(ua):
    parsed_ua = parse(ua)
    return parsed_ua.browser.family, parsed_ua.os.family

In [20]:
ua_df = log_raw.apply(lambda x: parse_ua(x["ua"]), result_type = "expand", axis = 1)

In [21]:
ua_df.rename(columns = {0: "browser", 1: "os"}, inplace = True)

## browser 

In [22]:
with conn.cursor() as cur:
    try:
        cur.execute("CREATE TEMPORARY TABLE tmp_browser (id int, name varchar(255)) ON COMMIT DROP")
        with cur.copy("COPY tmp_browser FROM STDIN") as copy:
            for i, data in enumerate(ua_df["browser"]):
                copy.write_row((i, data))
        cur.execute("INSERT INTO browser(name) SELECT name FROM tmp_browser ON CONFLICT DO NOTHING")
    except BaseException as e:
        conn.rollback()
        print(e)
    else:
        conn.commit()

In [23]:
with conn.cursor() as cur:
    cur.execute("select browser_key, name from browser")
    browser_dict = {key : value for value, key in cur.fetchall()}
    visit_df["browser_key"] = ua_df["browser"].map(lambda x: browser_dict[x])

In [24]:
visit_df.head()

Unnamed: 0,method,status,size,client_key,contents_key,browser_key
0,GET,200,743,1,1,1
1,GET,200,1705,2,2,1
2,GET,200,291,3,3,3
3,GET,200,6389,4,4,4
4,GET,200,3900,5,5,5


##  os

In [25]:
with conn.cursor() as cur:
    try:
        cur.execute("CREATE TEMPORARY TABLE tmp_os (id int, name varchar(255)) ON COMMIT DROP")
        with cur.copy("COPY tmp_os FROM STDIN") as copy:
            for i, data in enumerate(ua_df["os"]):
                copy.write_row((i, data))
        cur.execute("INSERT INTO os(name) SELECT name FROM tmp_os ON CONFLICT DO NOTHING")
    except BaseException as e:
        conn.rollback()
        print(e)
    else:
        conn.commit()

In [26]:
with conn.cursor() as cur:
    cur.execute("select os_key, name from os")
    os_dict = {key : value for value, key in cur.fetchall()}
    visit_df["os_key"] = ua_df["os"].map(lambda x: os_dict[x])

In [27]:
visit_df.head()

Unnamed: 0,method,status,size,client_key,contents_key,browser_key,os_key
0,GET,200,743,1,1,1,1
1,GET,200,1705,2,2,1,2
2,GET,200,291,3,3,3,3
3,GET,200,6389,4,4,4,1
4,GET,200,3900,5,5,5,1


# TIME

In [28]:
time_series = pd.to_datetime(log_raw["time"], format='%Y-%m-%d %H:%M:%S', exact=False)

In [29]:
time_df = pd.DataFrame()

In [30]:
time_df["date"] = time_series.map(lambda x: x.date())
time_df["day"] = time_series.map(lambda x: x.day)
time_df["weekday"] = time_series.map(lambda x: x.weekday())
time_df["month"] = time_series.map(lambda x: x.month)
time_df["year"] = time_series.map(lambda x: x.year)
time_df["hour"] = time_series.map(lambda x: x.hour)
time_df["minute"] = time_series.map(lambda x: x.minute)
time_df["second"] = time_series.map(lambda x: x.second)

In [31]:
time_df["time_key"] = time_df.apply(lambda x: "%04d%02d%02d%02d%02d%02d" %(x["year"],
                                                                              x["month"],
                                                                              x["day"],
                                                                              x["hour"],
                                                                              x["minute"],
                                                                              x["second"]),
                                   axis = 1)

In [32]:
visit_df["time_key"] = time_df["time_key"]

In [35]:
with conn.cursor() as cur:
    try:
        cur.execute("CREATE TEMPORARY TABLE tmp_time (like time) ON COMMIT DROP")
        with cur.copy("COPY tmp_time(date, day, weekday, month, year, hour, minute, second, time_key) FROM STDIN") as copy:
            pre = ""
            for data in time_df.itertuples():
                if pre == data[-1]:
                    continue
                else:
                    copy.write_row(data[1:])
                    pre = data[-1]
        cur.execute("INSERT INTO time SELECT * FROM tmp_time ON CONFLICT DO NOTHING")
    except BaseException as e:
        conn.rollback()
        print(e)
    else:
        conn.commit()

In [36]:
visit_df.head()

Unnamed: 0,method,status,size,client_key,contents_key,browser_key,os_key,time_key
0,GET,200,743,1,1,1,1,19970411000000
1,GET,200,1705,2,2,1,2,19970411000001
2,GET,200,291,3,3,3,3,19970411000002
3,GET,200,6389,4,4,4,1,19970411000003
4,GET,200,3900,5,5,5,1,19970411000004


# VISIT

In [37]:
with conn.cursor() as cur:
    try:
        with cur.copy("COPY visit(method, status, size, client_key, contents_key, browser_key, os_key, time_key) FROM STDIN") as copy:
            for data in visit_df.itertuples():
                copy.write_row(data[1:])
    except BaseException as e:
        conn.rollback()
        print(e)
    else:
        conn.commit()

# 조회

In [38]:
query = """select t.date, cl.host, v.method, v.status, v.size, o.name, b.name, co.path from visit v, time t, browser b, os o, contents co, client cl
where v.client_key = cl.client_key and v.contents_key = co.contents_key and
v.time_key = t.time_key and v.os_key = o.os_key and v.browser_key = b.browser_key
limit 1"""
with conn.cursor() as cur:
    cur.execute(query)
    print(cur.fetchall())
conn.close()

[(datetime.date(1997, 4, 11), '8.136.30.91', 'GET', 200, 743, 'Windows', 'Opera', '/m/c.html')]


visit 테이블 1행의 데이터를 조회하는 간단한 쿼리입니다. 디멘션 테이블과의 조인으로 정보를 취합할 수 있습니다.