In [1]:
import os
filename = '/usr/local/Cellar/apache-spark/2.2.1/libexec/python/pyspark/shell.py'
exec(open(filename).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.2.1
      /_/

Using Python version 3.6.1 (default, May 11 2017 13:04:09)
SparkSession available as 'spark'.


In [2]:
from pyspark.sql import SparkSession

In [3]:
# May take awhile locally
spark = SparkSession.builder.appName("Feature").getOrCreate()

## Feature Engineering 

### Step 1: Select Feature

In [4]:
# Let Spark know about the header and infer the Schema types!
df_whole = spark.read.csv('../data/all_play.log.fn', sep = '\t', inferSchema=True, header=True)

In [5]:
df_whole.printSchema()

root
 |-- uid: string (nullable = true)
 |-- device: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- song_type: string (nullable = true)
 |-- song_name: string (nullable = true)
 |-- singers: string (nullable = true)
 |-- play_time: string (nullable = true)
 |-- song_length: string (nullable = true)
 |-- paid_flag: string (nullable = true)
 |-- file_name: string (nullable = true)



In [6]:
df_whole.show()

+----------+------+---------+---------+--------------------+--------------------+---------+-----------+---------+------------------+
|       uid|device|  song_id|song_type|           song_name|             singers|play_time|song_length|paid_flag|         file_name|
+----------+------+---------+---------+--------------------+--------------------+---------+-----------+---------+------------------+
| 154422682|   ar |20870993 |       1 |                 用情 |              狮子合唱团 |   22013 |       332 |       0 | 20170301_play.log|
|154421907 |   ip | 6560858 |       0 |             表情不要悲伤 |    伯贤&D.O.&张艺兴&朴灿烈 |      96 |       161 |       0 | 20170301_play.log|
|154422630 |   ar | 3385963 |       1 |Baby, Don't Cry(人...|                EXO |  235868 |       235 |       0 | 20170301_play.log|
|154410267 |   ar | 6777172 |       0 |   3D-环绕音律1(3D Mix) |             McTaiM |     164 |       237 |       0 | 20170301_play.log|
|154407793 |   ar |19472465 |       0 |              刚好遇见你 |         

In [31]:
# Parsing data from date
# https://stackoverflow.com/questions/46410887/pyspark-string-matching-to-create-new-column

from pyspark.sql.functions import regexp_extract, col

# use regulization expression for string manipulation 
# required feature 1 - date
# e.g. "20170301_play.log" -> "20170301"
df_whole = df_whole.withColumn('date', regexp_extract(col('file_name'), '([0-9]{8})(_)(\w+)', 1))
#df_whole.show()

# required feature 2 - uid
# e.g. "154422682" -> '154422682'(reserve the first night digits)
df_whole = df_whole.withColumn('uid', regexp_extract(col('uid'), '([0-9]{9})', 1))
#df_whole.show()

# optional feature - play_time, song_length, song_id(maybe used for music recommendation)
# select all there feature: date, uid, play_time, song_length, song_id
# restore into a new dataframe
df_whole = df_whole.withColumn('play_time', regexp_extract(col('play_time'), '([0-9])', 1))
df_whole = df_whole.withColumn('song_length', regexp_extract(col('song_length'), '([0-9])', 1))
df_whole = df_whole.withColumn('song_id', regexp_extract(col('song_id'), '([0-9])', 1))
df_select = df_whole.select(['date', 'uid', 'play_time', 'song_length', 'song_id'])
df_select.show()

+--------+---------+---------+-----------+---------+
|    date|      uid|play_time|song_length|  song_id|
+--------+---------+---------+-----------+---------+
|20170301|154422682|   22013 |       332 |20870993 |
|20170301|154421907|      96 |       161 | 6560858 |
|20170301|154422630|  235868 |       235 | 3385963 |
|20170301|154410267|     164 |       237 | 6777172 |
|20170301|154407793|      24 |       201 |19472465 |
|20170301|154422626|  275249 |         0 | 3198036 |
|20170301|154422681|     300 |       300 |  891952 |
|20170301|154408091|     243 |       243 | 4623962 |
|20170301|154422571|     207 |       207 |  703750 |
|20170301|154417311|      56 |       184 | 6491500 |
|20170301|154421166|     139 |       275 | 1967689 |
|20170301|154421859|       4 |        27 | 6126586 |
|20170301|154422660|     299 |       300 |11914644 |
|20170301|154422590|     261 |       261 | 6468891 |
|20170301|154419565|       8 |        65 |15196649 |
|20170301|154414286|      26 |         0 | 714

In [15]:
df_select.printSchema()

root
 |-- date: string (nullable = true)
 |-- uid: string (nullable = true)
 |-- play_time: string (nullable = true)
 |-- song_length: string (nullable = true)
 |-- song_id: string (nullable = true)



### Transfer Data Type

Before we trasfer the data type, we should see whether there exist some outlier values. First, let's see the variable  of "date".

In [37]:
# show the all possible type of date
df_select.groupBy("date").count().show(90,False)

+--------+-------+
|date    |count  |
+--------+-------+
|20170504|2520194|
|20170413|3385074|
|20170308|1230621|
|20170410|3439052|
|20170428|2957021|
|20170417|3115820|
|20170302|2452263|
|20170420|3013598|
|20170430|3159112|
|20170405|3856179|
|20170505|2575692|
|20170404|4950216|
|20170402|5709371|
|20170424|1508418|
|null    |222965 |
|20170406|3887075|
|20170416|3564697|
|20170331|7040986|
|20170415|3598986|
|20170429|3343811|
|20170419|3026294|
|20170509|2495600|
|20170421|3043848|
|20170501|2959583|
|20170411|2336126|
|20170422|3253855|
|20170303|1851942|
|20170306|1351465|
|20170408|4061267|
|20170409|3952596|
|20170304|1709097|
|20170427|3131496|
|20170423|3229367|
|20170506|2881461|
|20170407|3813355|
|20170503|2558772|
|20170426|2793864|
|20170510|2495406|
|20170401|5800351|
|20170309|1172860|
|20170425|2823102|
|20170418|3017081|
|20170512|2526709|
|20170507|2773459|
|20170502|2489854|
|20170330|4759605|
|20170412|3460913|
|20170307|1288366|
|20170403|3595625|
|20170305|16

Now, we could see there are two outlier values. We need to move this. 

In [43]:
# Drop any row that contains missing data
df_select = df_select.na.drop()

Then, let's see the variable of "date".

In [44]:
# show the all possible type of date
df_select.describe(["date"]).show()

+-------+--------------------+
|summary|                date|
+-------+--------------------+
|  count|           164217719|
|   mean|2.0170413824625567E7|
| stddev|    58.5894625692998|
|    min|                    |
|    max|            20170512|
+-------+--------------------+



In [45]:
# show the all possible type of date
df_select.groupBy("date").count().show(90,False)

+--------+-------+
|date    |count  |
+--------+-------+
|20170504|2516949|
|20170413|3380796|
|20170308|1230621|
|20170410|3435108|
|20170428|2952460|
|20170417|3112159|
|20170302|2452263|
|20170420|3008111|
|20170430|3153132|
|20170405|3850905|
|20170505|2571197|
|20170404|4941358|
|20170402|5699764|
|20170424|1506606|
|20170406|3881751|
|20170416|3558109|
|20170331|7033246|
|20170415|3591673|
|20170429|3337713|
|20170419|3021366|
|20170509|2491675|
|20170421|3038619|
|20170501|2953738|
|20170411|2332928|
|20170422|3247357|
|20170303|1851942|
|20170306|1351465|
|20170408|4053207|
|20170409|3945463|
|20170304|1709097|
|20170427|3127340|
|20170423|3223963|
|20170506|2874456|
|20170407|3807564|
|20170503|2555248|
|20170426|2789468|
|20170510|2490997|
|20170401|5792550|
|20170309|1172860|
|20170425|2819081|
|20170418|3013580|
|20170512|2522303|
|20170507|2767560|
|20170502|2486697|
|20170330|4755802|
|20170412|3457415|
|20170307|1288366|
|20170403|3588991|
|20170305|1607932|
|20170511|24

Now, we could see there still exists the blank value. We need to delete it.

In [46]:
df_select = df_select.filter("date != ''")
# show the all possible type of date
df_select.groupBy("date").count().show(90,False)

+--------+-------+
|date    |count  |
+--------+-------+
|20170504|2516949|
|20170413|3380796|
|20170308|1230621|
|20170410|3435108|
|20170428|2952460|
|20170417|3112159|
|20170302|2452263|
|20170420|3008111|
|20170430|3153132|
|20170405|3850905|
|20170505|2571197|
|20170404|4941358|
|20170402|5699764|
|20170424|1506606|
|20170406|3881751|
|20170416|3558109|
|20170331|7033246|
|20170415|3591673|
|20170429|3337713|
|20170419|3021366|
|20170509|2491675|
|20170421|3038619|
|20170501|2953738|
|20170411|2332928|
|20170422|3247357|
|20170303|1851942|
|20170306|1351465|
|20170408|4053207|
|20170409|3945463|
|20170304|1709097|
|20170427|3127340|
|20170423|3223963|
|20170506|2874456|
|20170407|3807564|
|20170503|2555248|
|20170426|2789468|
|20170510|2490997|
|20170401|5792550|
|20170309|1172860|
|20170425|2819081|
|20170418|3013580|
|20170512|2522303|
|20170507|2767560|
|20170502|2486697|
|20170330|4755802|
|20170412|3457415|
|20170307|1288366|
|20170403|3588991|
|20170305|1607932|
|20170511|24

Now we could see there is no blank value!

In [54]:
# change the date_type
# https://stackoverflow.com/questions/32284620/how-to-change-a-dataframe-column-from-string-type-to-double-type-in-pyspark
"""
    date: DateType
    uid: IntegerType
    play_time: IntegerType
    song_length: IntegerType
    song_id: IntegerType   
"""
from pyspark.sql.types import DoubleType, IntegerType, DateType
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType

# https://stackoverflow.com/questions/38080748/convert-pyspark-string-to-date-format

# This function converts the string cell into a date:
func =  udf (lambda x: datetime.strptime(x, '%Y%m%d'), DateType())

df_select = df_select.withColumn('date', func(col('date')))

df_select = df_select.withColumn("uid", df_select["uid"].cast(IntegerType()))
#df_select = df_select.withColumn("play_time", df_select["play_time"].cast(IntegerType()))
#df_select = df_select.withColumn("song_length", df_select["song_length"].cast(IntegerType()))
#df_select = df_select.withColumn("song_id", df_select["song_id"].cast(IntegerType()))
df_select.printSchema()

root
 |-- date: date (nullable = true)
 |-- uid: integer (nullable = true)
 |-- play_time: integer (nullable = true)
 |-- song_length: integer (nullable = true)
 |-- song_id: integer (nullable = true)



In [48]:
df_select.show()

+----------+---------+---------+-----------+-------+
|      date|      uid|play_time|song_length|song_id|
+----------+---------+---------+-----------+-------+
|2017-03-01|154422682|     null|       null|   null|
|2017-03-01|154421907|     null|       null|   null|
|2017-03-01|154422630|     null|       null|   null|
|2017-03-01|154410267|     null|       null|   null|
|2017-03-01|154407793|     null|       null|   null|
|2017-03-01|154422626|     null|       null|   null|
|2017-03-01|154422681|     null|       null|   null|
|2017-03-01|154408091|     null|       null|   null|
|2017-03-01|154422571|     null|       null|   null|
|2017-03-01|154417311|     null|       null|   null|
|2017-03-01|154421166|     null|       null|   null|
|2017-03-01|154421859|     null|       null|   null|
|2017-03-01|154422660|     null|       null|   null|
|2017-03-01|154422590|     null|       null|   null|
|2017-03-01|154419565|     null|       null|   null|
|2017-03-01|154414286|     null|       null|  

Also do the same job for the other three feature - play_time, song_length, song_id

In [53]:
df_select.select(["play_time"]).na.drop().count()

145023546