In [1]:
import os
import glob
import psycopg2
import pandas as pd
import numpy as np
from sql_queries import *

In [2]:
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")

In [3]:
cur = conn.cursor()

In [4]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

# 处理 `song_data`

进行ETL操作，即从文件中读取数据，存入数据表

In [7]:
song_files = get_files("data/song_data")

In [9]:
filepath = song_files[0] #暂时只ETL第一个文件
print(filepath)

/home/sunxichen/PycharmProjects/Data_Engineering/Data Modeling/Project1/data/song_data/A/A/A/TRAAAEF128F4273421.json


In [12]:
df = pd.read_json(filepath, lines=True) 
df.head()

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,AR7G5I41187FB4CE6C,,,"London, England",Adam Ant,SONHOTT12A8C13493C,Something Girls,233.40363,1982


## #1: `songs` Table
提取 `songs` 表的数据
- 选择song ID, title, artist ID, year. and duration

In [15]:
song_data = df[["song_id", "title", "artist_id", "year", "duration"]].values[0].tolist()

In [16]:
songs_table_insert

'\nINSERT INTO songs (song_id, title, artist_id, year, duration)\nVALUES (%s, %s, %s, %s, %s) ON CONFLICT DO NOTHING;\n'

In [17]:
cur.execute(songs_table_insert, song_data)

In [18]:
conn.commit()

## #2: `artists` Table
提取 `artists` 表的数据
- 选择artist ID, name, location, latitude, and longtitude

In [19]:
artist_data = df[["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]].values[0].tolist()
artist_data

['AR7G5I41187FB4CE6C', 'Adam Ant', 'London, England', nan, nan]

In [21]:
cur.execute(artists_table_insert, artist_data)
conn.commit()

# 处理 `log_data`
对 `log_data` 进行ETL操作来创建 `time` and `users` 以及 `songplays` 表

In [29]:
log_files = get_files("data/log_data")

In [30]:
filepath = log_files[0]

In [31]:
df = pd.read_json(filepath, lines=True)
df.head(10)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged Out,,,0,,,free,,PUT,Login,,741,,307,1542760054796,,
1,,Logged In,Theodore,M,1,Smith,,free,"Houston-The Woodlands-Sugar Land, TX",GET,Home,1540306000000.0,741,,200,1542760086796,Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) G...,52.0
2,,Logged Out,,,0,,,paid,,GET,Home,,774,,200,1542761399796,,
3,,Logged Out,,,1,,,paid,,GET,Home,,774,,200,1542761485796,,
4,,Logged Out,,,2,,,paid,,PUT,Login,,774,,307,1542761486796,,
5,,Logged In,Tegan,F,3,Levine,,paid,"Portland-South Portland, ME",GET,Home,1540794000000.0,774,,200,1542761784796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80.0
6,Facto Delafe y las flores azules,Logged In,Tegan,F,4,Levine,315.81995,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,774,Enero en la playa,200,1542761878796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80.0
7,Kings Of Leon,Logged In,Kate,F,0,Harrell,204.2771,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540473000000.0,671,Manhattan,200,1542761921796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",97.0
8,Franz Ferdinand,Logged In,Kate,F,1,Harrell,204.12036,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540473000000.0,671,Michael,200,1542762125796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",97.0
9,Blue October,Logged In,Tegan,F,5,Levine,272.32608,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,774,Drilled A Wire Through My Cheek,200,1542762193796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80.0


## #3: `time` Table
- 筛选 `page`是`NextSong`的
- 将 `ts` timestamp列转换为datetime
- 提取出 `ts`列里的小时，day, week of year, month等等信息
- 提取 `time`表需要的列，命名为 `column_labels`
- 结合`column_labels`以及从`ts`提取到的数据，创建一个新的df, 命名为`time_df`

In [32]:
df = df.query("page == 'NextSong'")
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
6,Facto Delafe y las flores azules,Logged In,Tegan,F,4,Levine,315.81995,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,774,Enero en la playa,200,1542761878796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
7,Kings Of Leon,Logged In,Kate,F,0,Harrell,204.2771,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540473000000.0,671,Manhattan,200,1542761921796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",97
8,Franz Ferdinand,Logged In,Kate,F,1,Harrell,204.12036,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540473000000.0,671,Michael,200,1542762125796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",97
9,Blue October,Logged In,Tegan,F,5,Levine,272.32608,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,774,Drilled A Wire Through My Cheek,200,1542762193796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
11,Elisa,Logged In,Kate,F,2,Harrell,248.97261,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540473000000.0,671,Almeno Tu Nell'Universo,200,1542762329796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",97


In [33]:
t = pd.to_datetime(df['ts'], unit='ms')
df['ts'] = pd.to_datetime(df['ts'], unit='ms')
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
6,Facto Delafe y las flores azules,Logged In,Tegan,F,4,Levine,315.81995,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,774,Enero en la playa,200,2018-11-21 00:57:58.796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
7,Kings Of Leon,Logged In,Kate,F,0,Harrell,204.2771,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540473000000.0,671,Manhattan,200,2018-11-21 00:58:41.796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",97
8,Franz Ferdinand,Logged In,Kate,F,1,Harrell,204.12036,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540473000000.0,671,Michael,200,2018-11-21 01:02:05.796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",97
9,Blue October,Logged In,Tegan,F,5,Levine,272.32608,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,774,Drilled A Wire Through My Cheek,200,2018-11-21 01:03:13.796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
11,Elisa,Logged In,Kate,F,2,Harrell,248.97261,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540473000000.0,671,Almeno Tu Nell'Universo,200,2018-11-21 01:05:29.796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",97


In [34]:
time_data = list((t, t.dt.hour, t.dt.day, t.dt.weekofyear, t.dt.month, t.dt.year, t.dt.weekday))
column_labels = list(('start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday'))

  """Entry point for launching an IPython kernel.


In [36]:
time_data[0]

6     2018-11-21 00:57:58.796
7     2018-11-21 00:58:41.796
8     2018-11-21 01:02:05.796
9     2018-11-21 01:03:13.796
11    2018-11-21 01:05:29.796
                ...          
512   2018-11-21 23:47:05.796
513   2018-11-21 23:50:02.796
514   2018-11-21 23:51:48.796
515   2018-11-21 23:53:48.796
516   2018-11-21 23:57:45.796
Name: ts, Length: 437, dtype: datetime64[ns]

In [38]:
time_df =  pd.DataFrame.from_dict(dict(zip(column_labels, time_data)))
time_df.head()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
6,2018-11-21 00:57:58.796,0,21,47,11,2018,2
7,2018-11-21 00:58:41.796,0,21,47,11,2018,2
8,2018-11-21 01:02:05.796,1,21,47,11,2018,2
9,2018-11-21 01:03:13.796,1,21,47,11,2018,2
11,2018-11-21 01:05:29.796,1,21,47,11,2018,2


In [39]:
for i, row in time_df.iterrows():
    cur.execute(time_table_insert, list(row))
    conn.commit()

## #4 `users`表
- 选出 user ID, first ID, first name, last name, gender以及level列

In [40]:
user_df = df[["userId", "firstName", "lastName", "gender", "level"]]

In [42]:
for i, row in user_df.iterrows():
    cur.execute(users_table_insert, row)
    conn.commit()

## #5: `songplays` 表
- 首先要根据 `song` `artist` `length` 从 `songs`表和`artists`表中选取 songid和artistid
- 入表

In [44]:
for index, row in df.iterrows():
    cur.execute(song_select, (row.song, row.artist, row.length))
    results = cur.fetchone()
    
    if results:
        songid, artistid = results
    else:
        songid, artistid = None, None
        
    # insert songplay record
    songplay_data = (index, row.ts, row.userId, row.level, songid, artistid, row.sessionId,\
                     row.location, row.userAgent)
    cur.execute(songplays_table_insert, songplay_data)
    conn.commit()

In [45]:
conn.close()