In [190]:
import requests
from bs4 import BeautifulSoup
import datetime
import csv
import re

### Retrieve the data files information from s3

In [2]:
s3_url = 'bittigermusicplayerdata.s3-us-west-2.amazonaws.com/'
s3_bucket = 'bittigermusicplayerdata'

r = requests.get('http://' + s3_url)

soup = BeautifulSoup(r.content, 'xml')

In [3]:
def get_file_list(soup):
    search_file = []
    play_file = []
    download_file = []
    for file in soup.findAll('Contents'):
        tags = file.findAll('Key')
        if len(tags) == 1:
            tags_text = tags[0].text
            
            # add to search file
            if 'search' in tags_text:
                search_file.append(tags_text)
            # add to play file
            if 'play' in tags_text:
                play_file.append(tags_text)            
            # add to download file
            if 'down' in tags_text:
                download_file.append(tags_text)            
        else:
            print('Error when loading file to list')
    return search_file, play_file, download_file

def sub_list(s_list, p_list, d_list, dates):
    new_s_list = []
    new_p_list = []
    new_d_list = []
    for date in dates:
        in_s_list = list(filter(lambda x: date in x, s_list))
        in_p_list = list(filter(lambda x: date in x, p_list))
        in_d_list = list(filter(lambda x: date in x, d_list))
        if len(in_s_list) > 0 and len(in_p_list) > 0 and len(in_d_list) > 0:
            new_s_list.append(in_s_list)
            new_p_list.append(in_p_list)
            new_d_list.append(in_d_list)
            
    return new_s_list, new_p_list, new_d_list

def file_parser(soup, dates):
    s_list, p_list, d_list = get_file_list(soup)
    if dates:
        s_list, p_list, d_list = sub_list(s_list, p_list, d_list, dates)
        return [item for group in s_list for item in group], \
        [item for group in p_list for item in group], \
        [item for group in d_list for item in group]
    return s_list, p_list, d_list

In [27]:
search_file, play_file, download_file = file_parser(soup, False)

### ETL using spark

In [5]:
myAccessKey = os.getenv('AWS_ACCESS_KEY_ID')
mySecretKey = os.getenv('AWS_SECRET_ACCESS_KEY')

In [6]:
sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', myAccessKey)
sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', mySecretKey)

In [7]:
textFile = sc.textFile("s3a://bittigermusicplayerdata/1_3_search.log.tar.gz")

In [8]:
import datetime
start = datetime.datetime.now()
textFile.count()
end = datetime.datetime.now()
print(end - start)

0:00:03.544083


### Parsing search file logic
1. define parsing logic for single search log file
2. define pipeline for multiple search log files
3. define output function to save parsed files

In [35]:
def spark_unpersist():
    for (id_, rdd) in sc._jsc.getPersistentRDDs().items():
        rdd.unpersist()
    print(sc._jsc.getPersistentRDDs().items())

In [150]:
def parse_search_log(bucket, key, propertion=False):
    # load search file
    rdd_feed = sc.textFile("s3a://"+ bucket + '/' + key)
    if propertion:
        rdd_feed = rdd_feed.sample(False, propertion, seed=66)
    
    # get separate rows by \t
    def strip_elem(ls):
        return list(map(lambda x: x.rstrip().lstrip(), ls))

    rdd = rdd_feed.map(lambda x: x.split('\t'))
    rdd = rdd.map(strip_elem)

    # get each row length
    rdd = rdd.map(lambda x: (x, len(x)))
    # print(rdd.count())

    # filter out row length < 4
    rdd = rdd.filter(lambda x: x[1] == 4)
    # print(rdd.count())

    # filter out id not match digit
    rdd = rdd.filter(lambda x: re.match('^\d+$', x[0][0]))
    # print(rdd.count())

    rdd = rdd.map(lambda x: x[0][:3])

    # split date column to date and time
    rdd = rdd.map(lambda x: [*x[:2], *x[2].split(' ')])

    # reduce by id and add them up
    rdd = rdd.map(lambda x: (','.join([x[0], x[2]]), 1)).reduceByKey(lambda x, y: x + y)
    rdd = rdd.sortBy(lambda x: x[1], ascending=False)
    rdd = rdd.map(lambda x: [*x[0].split(','), x[1]])
    # print(rdd.count())

    return rdd

def cnt_rows(bucket, key):
    rdd_feed = sc.textFile("s3a://"+ bucket + '/' + key)
    cnt = rdd_feed.count()
    return cnt

In [151]:
def reduce_search_rdds(rdd_union):
    rdd_union = rdd_union.map(lambda x: (','.join([x[0], x[1]]), x[2])).reduceByKey(lambda x, y: x + y)
    rdd_union = rdd_union.map(lambda x: [*x[0].split(','), x[1]])
    return rdd_union

In [152]:
def search_parse_pipeline(bucket, search_files):
    rdd_list = list(map(lambda x: parse_search_log(bucket, x), search_files))
    rdd_union = reduce(lambda x, y: x.union(y), rdd_list)
    return rdd_union

In [191]:
def write_csv(out, fields, files):
    with open(out, 'w') as f:
        csvout = csv.writer(f, delimiter=',')
        csvout.writerow(fields)
        for line in files:
            csvout.writerow(line)

Parse log files & write condensed files to csv

In [179]:
start = datetime.datetime.now()
# count number of rows
row_num = sum(list(map(lambda x: cnt_rows(s3_bucket, x), search_file)))
print('Total Number of rows: %s'%(row_num))

# get etl files
search_date = search_parse_pipeline(s3_bucket, search_file).collect()
end = datetime.datetime.now()
print('Spend %.2f seconds...'%((end - start).total_seconds()))

Total Number of rows: 9795112
Spend 167.83 seconds...


In [192]:
write_csv('./dataset/search_log_freq.csv', ['uuid', 'date', 'freq'], search_date)

---

In [35]:
from pyspark import SparkConf

In [38]:
SparkConf().getAll()

[('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'PySparkShell'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.master', 'local[2]')]

In [39]:
spark.sparkContext._conf.getAll()  

[('spark.app.id', 'local-1537631863999'),
 ('spark.sql.catalogImplementation', 'hive'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'PySparkShell'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.master', 'local[2]'),
 ('spark.driver.port', '51305'),
 ('spark.driver.host', 'zihaos-mbp.fios-router.home')]

In [72]:
spark.catalog.listTables()

[Table(name='table', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [73]:
spark.catalog.isCached('table')

False

In [41]:
spark.catalog.listDatabases()

[Database(name='default', description='Default Hive database', locationUri='file:/Users/aiweiwei/Desktop/projects/music_box_spark/spark-warehouse')]

In [48]:
df = spark.read.csv('dataset/1_1_search.csv', header=True) # if small enough than you can use cache to persist/cache it

In [49]:
df.show()

+---------+------+-------------------+
|     uuid|device|               date|
+---------+------+-------------------+
|154436633|    ip|2017-03-01 00:00:24|
|154407262|    ar|2017-03-01 00:00:53|
|154407854|    ip|2017-03-01 00:00:54|
|154407252|    ar|2017-03-01 00:00:55|
|154407327|    ar|2017-03-01 00:00:55|
|154407255|    ip|2017-03-01 00:00:56|
|154407261|    ip|2017-03-01 00:00:59|
|154407267|    ar|2017-03-01 00:00:59|
|154407546|    ip|2017-03-01 00:01:00|
|154407254|    ar|2017-03-01 00:01:02|
|154407198|    ar|2017-03-01 00:01:06|
|154407244|    ar|2017-03-01 00:01:06|
|154407261|    ip|2017-03-01 00:01:07|
|154407362|    ar|2017-03-01 00:01:08|
|154407377|    ar|2017-03-01 00:01:08|
|154407348|    ar|2017-03-01 00:01:09|
|154407303|    ar|2017-03-01 00:01:14|
|154407406|    ar|2017-03-01 00:01:14|
|154407358|    ar|2017-03-01 00:01:15|
|154407327|    ar|2017-03-01 00:01:17|
+---------+------+-------------------+
only showing top 20 rows



In [None]:
# use persist in regular basis; cache is synonym

In [60]:
# only remove from cache
spark.catalog.clearCache()

In [71]:
df.createOrReplaceTempView("table")

In [68]:
# remove from cache and from tablelist
spark.catalog.dropTempView("table")

In [64]:
df.cache()

DataFrame[uuid: string, device: string, date: string]

In [30]:
df.createOrReplaceTempView?

In [57]:
spark.sql("SELECT * FROM table").show(10)

+---------+------+-------------------+
|     uuid|device|               date|
+---------+------+-------------------+
|154436633|    ip|2017-03-01 00:00:24|
|154407262|    ar|2017-03-01 00:00:53|
|154407854|    ip|2017-03-01 00:00:54|
|154407252|    ar|2017-03-01 00:00:55|
|154407327|    ar|2017-03-01 00:00:55|
|154407255|    ip|2017-03-01 00:00:56|
|154407261|    ip|2017-03-01 00:00:59|
|154407267|    ar|2017-03-01 00:00:59|
|154407546|    ip|2017-03-01 00:01:00|
|154407254|    ar|2017-03-01 00:01:02|
+---------+------+-------------------+
only showing top 10 rows

