# Summary
* Load the download and search log files with assigned schema. Examine the date and count distribution.
* Transform the file_name/search time to proper date format for future feature engineering purpose
* Join the download and search files with the valid_uid collected from the previos step play_log_preprocessing. Those uid will be the population we observe bahaviors within the one month time window.

In [2]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession

from pyspark.sql import Row
from pyspark.sql import functions

from pyspark.sql.functions import *
from pyspark.sql.types import *

import sys

In [3]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

## 1. Download Log

### Load Download log file and fix the dataframe schema

In [4]:
spark = SparkSession.builder.appName("Music Box down search log cleaning").getOrCreate()

In [4]:
down = sc.textFile("./data/all_down_log.log.fn")

In [5]:
down.take(10)

['167578852\tar\t7129012\tFade(Dj6 Up Remix)\t六月\t0 \t 1_down.l',
 '167928289\tar\t5060732\t明天会更好 (六十位歌手大合唱)\t群星\t0 \t 1_down.l',
 '167729304\tar\t472192\t寂寞有多长\t马条\t0 \t 1_down.l',
 '168022192\tar\t233916\t年轻的战场(2006年《加油好男儿》主题曲)\t张杰\t0 \t 1_down.l',
 '46532274\tar\t5398666\t如果爱我只是因为你寂寞\t高雅\t0 \t 1_down.l',
 '167666685\tar\t2961149\t大气开场音乐\t网络歌手\t0 \t 1_down.l',
 '167949944\tar\t3415466\t像梦一样自由\t张钰琪\t0 \t 1_down.l',
 '167664711\tar\t6700790\t大王叫我来巡山-(电影《万万没想到\t贾乃亮&甜馨\t0 \t 1_down.l',
 '167949944\tar\t4407436\t给所有知道我名字的人\t张钰琪\t0 \t 1_down.l',
 '167666685\tar\t5805490\t最后的旅行——记《龙族》绘梨衣 (日文念白 Rainton桐)\t网络歌手\t0 \t 1_down.l']

In [5]:
def parseLine(line):
    fields = line.split("\t")
    if len(fields) == 7:
        try:
            uid = float(fields[0])
            device = str(fields[1])
            song_id = float(fields[2])
            song_name = str(fields[3])
            singer = str(fields[4])
            paid_flag = str(fields[5])
            file_name = str(fields[6])
            return Row(uid, device, song_id, song_name, singer, paid_flag, file_name)
        except:
            return Row(None)
    else:
        return Row(None)

schema = StructType([StructField('uid', FloatType(), False),
                       StructField('device', StringType(), True),
                       StructField('song_id', FloatType(), False),
                       StructField('song_name', StringType(), True),
                       StructField('singer', StringType(), True),
                       StructField('paid_flag', StringType(), True),
                       StructField('file_name', StringType(), True)])

In [6]:
down1=down.map(parseLine).filter(lambda x:len(x)==len(schema))

In [7]:
down_df=spark.createDataFrame(down1,schema).cache()

In [8]:
pd.DataFrame(down_df.take(10),columns=down_df.columns)
#down_df.show()

Unnamed: 0,uid,device,song_id,song_name,singer,paid_flag,file_name
0,167578848.0,ar,7129012.0,Fade(Dj6 Up Remix),六月,0,1_down.log
1,167928288.0,ar,5060732.0,明天会更好 (六十位歌手大合唱),群星,0,1_down.log
2,167729312.0,ar,472192.0,寂寞有多长,马条,0,1_down.log
3,168022192.0,ar,233916.0,年轻的战场(2006年《加油好男儿》主题曲),张杰,0,1_down.log
4,46532272.0,ar,5398666.0,如果爱我只是因为你寂寞,高雅,0,1_down.log
5,167666688.0,ar,2961149.0,大气开场音乐,网络歌手,0,1_down.log
6,167949952.0,ar,3415466.0,像梦一样自由,张钰琪,0,1_down.log
7,167664704.0,ar,6700790.0,大王叫我来巡山-(电影《万万没想到,贾乃亮&甜馨,0,1_down.log
8,167949952.0,ar,4407436.0,给所有知道我名字的人,张钰琪,0,1_down.log
9,167666688.0,ar,5805490.0,最后的旅行——记《龙族》绘梨衣 (日文念白 Rainton桐),网络歌手,0,1_down.log


### Inspect the download table

#### how is the file date distributed？

In [15]:
file_dates_down=down_df.select('file_name').distinct().orderBy('file_name',ascending= True)
file_dates_down.head(60)
#download file dates start from 20170330

[Row(file_name=' 1_down.log'),
 Row(file_name=' 20170330_1_down.log'),
 Row(file_name=' 20170330_2_down.log'),
 Row(file_name=' 20170330_3_down.log'),
 Row(file_name=' 20170331_1_down.log'),
 Row(file_name=' 20170331_2_down.log'),
 Row(file_name=' 20170331_3_down.log'),
 Row(file_name=' 20170401_1_down.log'),
 Row(file_name=' 20170401_2_down.log'),
 Row(file_name=' 20170401_3_down.log'),
 Row(file_name=' 20170402_1_down.log'),
 Row(file_name=' 20170402_2_down.log'),
 Row(file_name=' 20170402_3_down.log'),
 Row(file_name=' 20170403_1_down.log'),
 Row(file_name=' 20170403_2_down.log'),
 Row(file_name=' 20170403_3_down.log'),
 Row(file_name=' 20170404_1_down.log'),
 Row(file_name=' 20170404_2_down.log'),
 Row(file_name=' 20170404_3_down.log'),
 Row(file_name=' 20170405_1_down.log'),
 Row(file_name=' 20170405_2_down.log'),
 Row(file_name=' 20170405_3_down.log'),
 Row(file_name=' 20170406_1_down.log'),
 Row(file_name=' 20170406_2_down.log'),
 Row(file_name=' 20170406_3_down.log'),
 Row(file

In [9]:
down_df.createOrReplaceTempView('download')

In [10]:
spark.sql('select count(distinct uid) distinct_uid from download').show()

+------------+
|distinct_uid|
+------------+
|       90281|
+------------+



In [13]:
uid_count_down=spark.sql('select uid, count(uid) count from download group by uid order by 2 desc')

In [19]:
uid_count_down.show() ## are the top 14 uid valid? values look too large.

+------------+------+
|         uid| count|
+------------+------+
|   1685126.0|292713|
| 3.7025504E7|227318|
|   1062806.0|150800|
|   1791497.0|149346|
|         0.0|142959|
|    497685.0|113276|
|    751824.0| 85865|
|    736305.0| 74248|
|   1749320.0| 43335|
|   1679121.0| 26040|
| 1.5594824E8| 24504|
| 2.8638488E7| 23806|
| 4.6532272E7| 21856|
|    637650.0| 18754|
|1.67703664E8|  9732|
| 3.2166204E7|  9678|
|1.68860544E8|  9486|
|1.67713456E8|  9486|
|1.60555088E8|  9373|
| 6.4268008E7|  7974|
+------------+------+
only showing top 20 rows



In [20]:
spark.sql('select paid_flag, count(1) count from download group by paid_flag').show()

+---------+-------+
|paid_flag|  count|
+---------+-------+
|       0 |8256882|
+---------+-------+



* paid_flag only contains one value. can be discarded

In [16]:
# exclude the first log because the file name cannot be converted to a date type 
down_df2=down_df.where(down_df['file_name']!=' 1_down.log')
down_df2=down_df2.select('uid','song_id','file_name').cache()

In [23]:
pd.DataFrame(down_df2.take(10),columns=down_df2.columns)

Unnamed: 0,uid,song_id,file_name
0,168019808.0,442554.0,20170330_1_down.log
1,168019808.0,6334611.0,20170330_1_down.log
2,168019808.0,9867382.0,20170330_1_down.log
3,168019808.0,6660691.0,20170330_1_down.log
4,168019808.0,157606.0,20170330_1_down.log
5,168019808.0,3372481.0,20170330_1_down.log
6,168019808.0,3216525.0,20170330_1_down.log
7,168019808.0,6427523.0,20170330_1_down.log
8,168019808.0,6538686.0,20170330_1_down.log
9,168019808.0,9327383.0,20170330_1_down.log


### * Load the valid uid table created from the previous play log files to filter the download log uid within the time window we want 2017-03-30 to 2017-05-12

In [21]:
valid_uid=spark.read.csv('./data/valid_uid_1monthWindow/valid_uid_1monthWindow.csv',inferSchema= True,header=True)

In [11]:
valid_uid.printSchema()

root
 |-- uid: double (nullable = true)



In [30]:
# play log valid uid count:
valid_uid.count()

105351

In [14]:
down_valid_uid=valid_uid.join(uid_count_down,on='uid',how='inner').drop(uid_count_down.uid)

In [28]:
down_valid_uid.show()

+------------+-----+
|         uid|count|
+------------+-----+
|1.67703664E8| 9732|
|1.68860544E8| 9486|
|1.67713456E8| 9486|
|1.60555088E8| 9373|
|1.68333216E8| 7450|
| 1.6892312E8| 7250|
|1.68805296E8| 7143|
| 1.6862512E8| 5990|
|1.68690496E8| 5940|
|1.67979376E8| 5850|
|1.68849344E8| 5410|
|1.68473152E8| 5249|
|1.68301744E8| 5170|
|1.67761888E8| 4858|
|1.68363536E8| 4736|
|1.68275776E8| 4714|
|1.67739232E8| 4561|
|1.68120768E8| 4141|
|1.68442976E8| 4035|
|1.68856192E8| 3994|
+------------+-----+
only showing top 20 rows



we can see all the uid with extremely large count are excluded after filtering out only valid uids

In [29]:
down_valid_uid.count()

89556

In [31]:
print('download uid/play log uid ratio= {}'.format(89556/105351))

download uid/play log uid ratio= 0.8500726144032804


### get all the useful download log columns and convert the file name to proper date format

In [17]:
down_df_clean=down_valid_uid.join(down_df2, on='uid',how='inner').drop(down_df2.uid)\
                            .withColumn('datestr',trim(down_df2.file_name).substr(1,9))\
                            .withColumn('unix_date',unix_timestamp('datestr', 'yyyyMMdd'))\
                            .withColumn('date',from_unixtime('unix_date').cast(DateType()))\
                            .drop('datestr').drop('unix_date').cache()

In [36]:
pd.DataFrame(down_df_clean.take(20),columns=down_df_clean.columns)

Unnamed: 0,uid,count,song_id,file_name,date
0,4550267.0,3,6196608.0,20170331_2_down.log,2017-03-31
1,4550267.0,3,6485492.0,20170331_2_down.log,2017-03-31
2,4550267.0,3,859133.0,20170331_2_down.log,2017-03-31
3,6216081.0,10,298180.0,20170404_2_down.log,2017-04-04
4,6216081.0,10,157526.0,20170410_2_down.log,2017-04-10
5,6216081.0,10,5040158.0,20170410_2_down.log,2017-04-10
6,6216081.0,10,21769686.0,20170413_2_down.log,2017-04-13
7,6216081.0,10,656643.0,20170413_2_down.log,2017-04-13
8,6216081.0,10,3206899.0,20170420_2_down.log,2017-04-20
9,6216081.0,10,541474.0,20170421_2_down.log,2017-04-21


In [18]:
#file name is correctly converted to date format. now count and file_name can be dropped
down_df_clean=down_df_clean.drop('count').drop('file_name')

In [19]:
down_df_clean.groupBy('date').count().orderBy('date',ascending=True).show() 

+----------+-------+
|      date|  count|
+----------+-------+
|      null| 464933|
|2017-03-30|1439545|
|2017-03-31| 511630|
|2017-04-01| 314203|
|2017-04-02| 260032|
|2017-04-03| 209962|
|2017-04-04| 208517|
|2017-04-05| 163680|
|2017-04-06| 153144|
|2017-04-07| 137793|
|2017-04-08| 148584|
|2017-04-09| 146677|
|2017-04-10| 125722|
|2017-04-11|  75087|
|2017-04-12| 107735|
|2017-04-13| 100620|
|2017-04-14| 100013|
|2017-04-15| 107840|
|2017-04-16| 105240|
|2017-04-17|  93764|
+----------+-------+
only showing top 20 rows



* 03/30 seems to have abnormaly large download count in one day. I suspect there is problem in how the files are dated or maybe because musicbox users renew the subscription every month end, which results in large traffic on that day. I'll exclude null and keep 03/30 records for now. 

In [20]:
down_df_clean2=down_df_clean.where(down_df_clean['date']>='2017-03-30')

In [22]:
down_df_clean2.groupBy('date').count().orderBy('date',ascending=True).show() 

+----------+-------+
|      date|  count|
+----------+-------+
|2017-03-30|1439545|
|2017-03-31| 511630|
|2017-04-01| 314203|
|2017-04-02| 260032|
|2017-04-03| 209962|
|2017-04-04| 208517|
|2017-04-05| 163680|
|2017-04-06| 153144|
|2017-04-07| 137793|
|2017-04-08| 148584|
|2017-04-09| 146677|
|2017-04-10| 125722|
|2017-04-11|  75087|
|2017-04-12| 107735|
|2017-04-13| 100620|
|2017-04-14| 100013|
|2017-04-15| 107840|
|2017-04-16| 105240|
|2017-04-17|  93764|
|2017-04-18|  91890|
+----------+-------+
only showing top 20 rows



In [26]:
down_df_clean2.select('uid').distinct().count() #excluding null dates does not drop the unique uid count. good.

89556

In [21]:
down_df_clean.where(down_df_clean['date'] == 'null').show()

+---+-------+----+
|uid|song_id|date|
+---+-------+----+
+---+-------+----+



In [108]:
down_df_clean.show()

+-----------+-----------+----------+
|        uid|    song_id|      date|
+-----------+-----------+----------+
|  4550267.0|  6196608.0|2017-03-31|
|  4550267.0|  6485492.0|2017-03-31|
|  4550267.0|   859133.0|2017-03-31|
|  6216081.0|   298180.0|2017-04-04|
|  6216081.0|   157526.0|2017-04-10|
|  6216081.0|  5040158.0|2017-04-10|
|  6216081.0|2.1769686E7|2017-04-13|
|  6216081.0|   656643.0|2017-04-13|
|  6216081.0|  3206899.0|2017-04-20|
|  6216081.0|   541474.0|2017-04-21|
|  6216081.0|   148209.0|2017-04-26|
|  6216081.0|2.1766068E7|2017-04-27|
|  6216081.0|  1521504.0|2017-04-30|
|  6869869.0|   176322.0|2017-04-13|
|2.2553002E7|2.0929864E7|2017-05-01|
|2.2553002E7|1.1416998E7|2017-05-01|
|2.2553002E7|  9883448.0|2017-05-01|
|5.7077508E7|  3432288.0|2017-04-16|
|5.7077508E7|  3385963.0|2017-04-16|
|5.7077508E7|  6525213.0|2017-04-16|
+-----------+-----------+----------+
only showing top 20 rows



save the table to a csv file

In [114]:
down_df_clean.repartition(1).write.csv('./data/down_df_clean',header=True)

## 2. Search Log

In [5]:
search = sc.textFile("./data/all_search_log.log.fn") # size = 1.02GB

In [12]:
search.take(10)

['154436633 \tip \t2017-03-01 00:00:24 \t%e9%83%ad%e5%be%b7%e7%ba%b2 \t 1_1_search.log',
 '154407262 \tar \t2017-03-01 00:00:53 \t%E6%AF%AF%E5%AD%90%E8%88%9E \t 1_1_search.log',
 '154407854 \tip \t2017-03-01 00:00:54 \t%e7%96%a4%2d%20%28%e7%94%b5%e8%a7%86%e5%89%a7%e3%80%8a%e9%a3%9e%e5%88%80%e5%8f%88%e8%a7%81%e9%a3%9e%e5%88%80%e3%80%8b%e4%b8%bb%e9%a2%98%e6%9b%b2%29 \t 1_1_search.log',
 '154407252 \tar \t2017-03-01 00:00:55 \t%E6%88%91%E8%A6%81%E5%88%9B%E4%B8%9A++%E5%94%90%E8%8E%89%E7%BE%A4 \t 1_1_search.log',
 '154407327 \tar \t2017-03-01 00:00:55 \t%E4%B8%AB%E5%A4%B4++%E7%8E%8B%E7%AB%A5%E8%AF%AD \t 1_1_search.log',
 '154407255 \tip \t2017-03-01 00:00:56 \t%e5%8a%a8%e5%bf%83 \t 1_1_search.log',
 '154407261 \tip \t2017-03-01 00:00:59 \t%e8%83%8e%e6%95%99 \t 1_1_search.log',
 '154407267 \tar \t2017-03-01 00:00:59 \t%E5%81%B7%E6%83%85++mc%E5%8D%8A%E9%98%B3 \t 1_1_search.log',
 '154407546 \tip \t2017-03-01 00:01:00 \t%e8%96%9b%e4%b9%8b%e8%b0%a6 \t 1_1_search.log',
 '154407254 \tar \t2017-03

### Clean the dataframe schema

In [6]:
def parseLine(line):
    fields = line.split("\t")
    if len(fields) == 5:
        try:
            uid = float(fields[0])
            device = str(fields[1])
            search_time = str(fields[2])
            url = str(fields[3])
            file_name = str(fields[4])
            return Row(uid, device, search_time, url, file_name)
        except:
            return Row(None)
    else:
        return Row(None)

schema = StructType([StructField('uid', FloatType(), False),
                     StructField('device', StringType(), True),
                     StructField('search_time', StringType(), False),
                     StructField('url', StringType(), True),
                     StructField('file_name', StringType(), True)])
#len(schema)

In [7]:
search2=search.map(parseLine).filter(lambda x:len(x)==len(schema))

In [8]:
search_df= spark.createDataFrame(search2, schema).cache()

In [16]:
search_df.show()

+------------+------+--------------------+--------------------+---------------+
|         uid|device|         search_time|                 url|      file_name|
+------------+------+--------------------+--------------------+---------------+
| 1.5443664E8|   ip |2017-03-01 00:00:24 |%e9%83%ad%e5%be%b...| 1_1_search.log|
|1.54407264E8|   ar |2017-03-01 00:00:53 |%E6%AF%AF%E5%AD%9...| 1_1_search.log|
|1.54407856E8|   ip |2017-03-01 00:00:54 |%e7%96%a4%2d%20%2...| 1_1_search.log|
|1.54407248E8|   ar |2017-03-01 00:00:55 |%E6%88%91%E8%A6%8...| 1_1_search.log|
|1.54407328E8|   ar |2017-03-01 00:00:55 |%E4%B8%AB%E5%A4%B...| 1_1_search.log|
|1.54407248E8|   ip |2017-03-01 00:00:56 | %e5%8a%a8%e5%bf%83 | 1_1_search.log|
|1.54407264E8|   ip |2017-03-01 00:00:59 | %e8%83%8e%e6%95%99 | 1_1_search.log|
|1.54407264E8|   ar |2017-03-01 00:00:59 |%E5%81%B7%E6%83%8...| 1_1_search.log|
|1.54407552E8|   ip |2017-03-01 00:01:00 |%e8%96%9b%e4%b9%8...| 1_1_search.log|
|1.54407248E8|   ar |2017-03-01 00:01:02

### Inspect search file

In [17]:
search_df.createOrReplaceTempView('search_df')

* how many unique uid?

In [18]:
spark.sql("select count(distinct uid) distinct_uid from search_df").show()

+------------+
|distinct_uid|
+------------+
|      128375|
+------------+



#### what's the distribution of uid count?

In [23]:
search_uid_count=spark.sql("select uid, count(1) count from search_df group by uid order by 2 desc")
search_uid_count.show() 
## looks normal. the first uid=0 will be excluded after joining to valid_uid table from play

+------------+------+
|         uid| count|
+------------+------+
|         0.0|153472|
|    497685.0|147226|
|    736305.0|142533|
| 3.7025504E7|100556|
|   1791497.0| 94932|
|   1062806.0| 60723|
| 2.8638488E7| 33819|
|   1679121.0| 24202|
|    637650.0| 18126|
| 6.4268008E7| 10526|
| 3.2166204E7|  9516|
|1.67724192E8|  7337|
| 4.6532272E7|  5037|
| 2.7954504E7|  4214|
| 1.6517426E7|  4168|
|1.68471616E8|  4042|
|   1710083.0|  3309|
| 1.5594824E8|  3058|
|1.68461664E8|  2988|
|   1685126.0|  2977|
+------------+------+
only showing top 20 rows



#### what's the distribution of file dates?

In [98]:
file_dates_src=search_df.select('file_name').distinct().orderBy('file_name',ascending= True)
file_dates_src.head(60)

[Row(file_name=' 1_1_search.log'),
 Row(file_name=' 1_2_search.log'),
 Row(file_name=' 1_3_search.log'),
 Row(file_name=' 1_4_search.log'),
 Row(file_name=' 1_search'),
 Row(file_name=' 20170330_1_search.log'),
 Row(file_name=' 20170330_2_search.log'),
 Row(file_name=' 20170330_3_search.log'),
 Row(file_name=' 20170331_1_search.log'),
 Row(file_name=' 20170331_2_search.log'),
 Row(file_name=' 20170331_3_search.log'),
 Row(file_name=' 20170401_1_search.log'),
 Row(file_name=' 20170401_2_search.log'),
 Row(file_name=' 20170401_3_search.log'),
 Row(file_name=' 20170402_1_search.log'),
 Row(file_name=' 20170402_2_search.log'),
 Row(file_name=' 20170402_3_search.log'),
 Row(file_name=' 20170403_1_search.log'),
 Row(file_name=' 20170403_2_search.log'),
 Row(file_name=' 20170403_3_search.log'),
 Row(file_name=' 20170404_1_search.log'),
 Row(file_name=' 20170404_2_search.log'),
 Row(file_name=' 20170404_3_search.log'),
 Row(file_name=' 20170405_1_search.log'),
 Row(file_name=' 20170405_2_searc

In [19]:
file_count=spark.sql("select file_name, count(1) count \
                     from search_df \
                     where trim(file_name) not in (' 1_1_search.log',' 1_2_search.log',' 1_3_search.log',' 1_4_search.log',' 1_search')\
                     group by file_name order by 1 desc")
# file_count2=file_count.filter((file_count.file_name !='1_1_search.log')& (file_count.file_name !='1_2_search.log')&\
#                               (file_count.file_name !='1_3_search.log')&(file_count.file_name !='1_4_search.log'))

In [101]:
pd.DataFrame(file_count2.take(40),columns=file_count.columns) ## file count is decreasing over time. why?

Unnamed: 0,file_name,count
0,20170330_3_search.log,836479
1,1_1_search.log,685870
2,20170331_3_search.log,250377
3,30_2_sea,242788
4,20170330_2_search.log,242788
5,1_2_search.log,213449
6,30_1_sea,147889
7,20170330_1_search.log,147889
8,20170331_2_search.log,144227
9,20170401_3_search.log,139379


In [24]:
search_valid_uid=valid_uid.join(search_uid_count,on='uid',how='inner').drop(search_uid_count.uid)

In [25]:
search_valid_uid.show() #looks good! extremely large search count uid have been eliminated after join

+------------+-----+
|         uid|count|
+------------+-----+
|1.67724192E8| 7337|
|1.68461664E8| 2988|
| 1.6830864E8| 2528|
|1.68508992E8| 2429|
|1.68742176E8| 2206|
|1.68729952E8| 2146|
|1.68917264E8| 2019|
|1.68742304E8| 1937|
| 1.6827864E8| 1891|
| 1.6846384E8| 1866|
|1.68541936E8| 1833|
|1.68743152E8| 1815|
| 7.7494248E7| 1753|
| 1.6828168E8| 1711|
|1.68942976E8| 1654|
|1.67750416E8| 1609|
| 1.6759232E8| 1592|
|1.68472272E8| 1541|
| 1.6766336E8| 1522|
| 1.6780128E8| 1496|
+------------+-----+
only showing top 20 rows



### cleaning the search_time and convert it to date

In [9]:
search_df_clean=search_df.select('uid','search_time')\
                          .withColumn('date',search_df.search_time.substr(1,10).cast(DateType()))                            

In [60]:
search_df_clean.show()

+------------+--------------------+----------+
|         uid|         search_time|      date|
+------------+--------------------+----------+
| 1.5443664E8|2017-03-01 00:00:24 |2017-03-01|
|1.54407264E8|2017-03-01 00:00:53 |2017-03-01|
|1.54407856E8|2017-03-01 00:00:54 |2017-03-01|
|1.54407248E8|2017-03-01 00:00:55 |2017-03-01|
|1.54407328E8|2017-03-01 00:00:55 |2017-03-01|
|1.54407248E8|2017-03-01 00:00:56 |2017-03-01|
|1.54407264E8|2017-03-01 00:00:59 |2017-03-01|
|1.54407264E8|2017-03-01 00:00:59 |2017-03-01|
|1.54407552E8|2017-03-01 00:01:00 |2017-03-01|
|1.54407248E8|2017-03-01 00:01:02 |2017-03-01|
|  1.544072E8|2017-03-01 00:01:06 |2017-03-01|
|1.54407248E8|2017-03-01 00:01:06 |2017-03-01|
|1.54407264E8|2017-03-01 00:01:07 |2017-03-01|
| 1.5440736E8|2017-03-01 00:01:08 |2017-03-01|
|1.54407376E8|2017-03-01 00:01:08 |2017-03-01|
|1.54407344E8|2017-03-01 00:01:09 |2017-03-01|
|1.54407296E8|2017-03-01 00:01:14 |2017-03-01|
|1.54407408E8|2017-03-01 00:01:14 |2017-03-01|
| 1.5440736E8

In [97]:
search_df_clean.groupBy('date').count().orderBy('date', ascending=True).show() 

+----------+-------+
|      date|  count|
+----------+-------+
|2017-03-01| 685870|
|2017-03-02| 213449|
|2017-03-03| 133699|
|2017-03-04| 121626|
|2017-03-30|1617833|
|2017-03-31| 510746|
|2017-04-01| 351061|
|2017-04-02| 328088|
|2017-04-03| 289248|
|2017-04-04| 274964|
|2017-04-05| 220321|
|2017-04-06| 204615|
|2017-04-07| 202929|
|2017-04-08| 226179|
|2017-04-09| 218917|
|2017-04-10| 176189|
|2017-04-11| 114918|
|2017-04-12| 163474|
|2017-04-13| 167287|
|2017-04-14| 162041|
+----------+-------+
only showing top 20 rows



* unusually large count in 03/30. the same problem as the download file. file dating might have some problems,  I will exclude the first 4 days' records of March but keep 03/30 to make it consistent with the download file.

In [10]:
search_df_clean2=search_df_clean.filter(search_df_clean.date>='2017-03-30').cache()

In [14]:
search_df_clean2.select('date').distinct().orderBy('date', ascending=True).show() 

+----------+
|      date|
+----------+
|2017-03-30|
|2017-03-31|
|2017-04-01|
|2017-04-02|
|2017-04-03|
|2017-04-04|
|2017-04-05|
|2017-04-06|
|2017-04-07|
|2017-04-08|
|2017-04-09|
|2017-04-10|
|2017-04-11|
|2017-04-12|
|2017-04-13|
|2017-04-14|
|2017-04-15|
|2017-04-16|
|2017-04-17|
|2017-04-18|
+----------+
only showing top 20 rows



In [29]:
search_df_clean2.select('uid').distinct().count()

97829

In [30]:
search_df_clean3=search_df_clean.filter(search_df_clean.date>='2017-03-31')
search_df_clean3.select('uid').distinct().count()
# if excluding 3/30 file, 3000 uid will be eliminated

94924

In [13]:
search_df_clean2.select('uid','date').repartition(1).write.csv('./data/search_df_clean',header=True)

In [118]:
spark.stop()

# END of preprocessing 