In [40]:
import os, re
from pyspark.sql.types import *
import pandas as pd

In [10]:
from pyspark.sql import SparkSession
# May take a little while on a local computer
spark = SparkSession.builder.appName("music_box").getOrCreate()

# load play_log

In [11]:
play_path = '/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/'
play_list = []
for fname in os.listdir(play_path):
    play_list.append(play_path+fname)

In [12]:
play_list

['/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/.DS_Store',
 '/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/20170401_1_play.csv',
 '/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/20170401_2_play.csv',
 '/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/20170401_3_play.csv',
 '/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/20170402_1_play.csv',
 '/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/20170402_2_play.csv',
 '/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/20170402_3_play.csv',
 '/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/20170403_1_play.csv',
 '/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/20170403_2_play.csv',
 '/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/20170404_1_play.csv',
 '/Users/wanjiewang/PycharmPro

In [13]:
# load all play log:
play_schema = StructType([ StructField("uid",IntegerType(),nullable = False), 
                          StructField("new_device", StringType()), 
                          StructField("song_id",IntegerType()), 
                          StructField("new_play_time", DoubleType()),
                          StructField("new_song_length", DoubleType()),
                          StructField("date", DateType())
                        ])

In [14]:
for i in range(len(play_list)):
    if i == 0:
        df_play = spark.read.csv(play_list[i],sep = '|', header = True, schema = play_schema, encoding = 'UTF-8')
    else:
        df_play_merge = spark.read.csv(play_list[i],sep = '|', header = True, schema = play_schema, encoding = 'UTF-8')
        df_play = df_play.unionAll(df_play_merge)

df_play.printSchema()

print ("Total Rows: " + str(df_play.count()))

root
 |-- uid: integer (nullable = true)
 |-- new_device: string (nullable = true)
 |-- song_id: integer (nullable = true)
 |-- new_play_time: double (nullable = true)
 |-- new_song_length: double (nullable = true)
 |-- date: date (nullable = true)

Total Rows: 98039208


In [15]:
df_play.show(5)

+---------+----------+-------+-------------+---------------+----------+
|      uid|new_device|song_id|new_play_time|new_song_length|      date|
+---------+----------+-------+-------------+---------------+----------+
| 38830551|        ar|1152372|        171.0|          172.0|2017-04-01|
|167923158|        ar| 985228|        251.0|          251.0|2017-04-01|
|168045107|        ar|6818758|          0.0|         1683.0|2017-04-01|
|168024853|        ar|3971731|        300.0|          300.0|2017-04-01|
|167580792|        ar|6989313|        113.0|          113.0|2017-04-01|
+---------+----------+-------+-------------+---------------+----------+
only showing top 5 rows



In [91]:
from pyspark.sql.functions import *

In [69]:
df_play = df_play.withColumn('mon',month(df_play['date']))
df_play = df_play.withColumn('dayofMon',dayofmonth(df_play['date']))
df_play = df_play.withColumn('3dayofMon',(dayofmonth(df_play['date'])-1)/3)
df_play = df_play.withColumn('5dayofMon',(dayofmonth(df_play['date'])-1)/5)
df_play = df_play.withColumn('7dayofMon',(dayofmonth(df_play['date'])-1)/7)
df_play = df_play.withColumn('14dayofMon',(dayofmonth(df_play['date'])-1)/14)

In [70]:
df_play.show(5)

+---------+----------+-------+-------------+---------------+----------+---+--------+---------+---------+---------+----------+
|      uid|new_device|song_id|new_play_time|new_song_length|      date|mon|dayofMon|3dayofMon|5dayofMon|7dayofMon|14dayofMon|
+---------+----------+-------+-------------+---------------+----------+---+--------+---------+---------+---------+----------+
| 38830551|        ar|1152372|        171.0|          172.0|2017-04-01|  4|       1|      0.0|      0.0|      0.0|       0.0|
|167923158|        ar| 985228|        251.0|          251.0|2017-04-01|  4|       1|      0.0|      0.0|      0.0|       0.0|
|168045107|        ar|6818758|          0.0|         1683.0|2017-04-01|  4|       1|      0.0|      0.0|      0.0|       0.0|
|168024853|        ar|3971731|        300.0|          300.0|2017-04-01|  4|       1|      0.0|      0.0|      0.0|       0.0|
|167580792|        ar|6989313|        113.0|          113.0|2017-04-01|  4|       1|      0.0|      0.0|      0.0|    

## freq_1d


In [135]:
df_play_Apr = df_play.filter(df_play['mon']==4)
df_play_May = df_play.filter(df_play['mon']==5)


In [136]:
gb_1d = df_play_Apr.groupBy('uid')

In [137]:
freq_1d_song = gb_1d.agg({'song_id':'count'}).alias('song')

In [138]:
freq_1d_playtime = gb_1d.agg({'new_play_time':'sum'}).alias('playtime')

In [139]:
df_final = freq_1d_song.join(freq_1d_playtime,col('song.uid') == col('playtime.uid'),'inner')

In [140]:
df_final = df_final.withColumn('freq_1d_song',df_final['count(song_id)']/30)
df_final = df_final.withColumn('freq_1d_playtime',df_final['sum(new_play_time)']/30)
df_final = df_final.select(['song.uid','freq_1d_song','freq_1d_playtime']).alias('final')

## freq_3d, freq_5d, freq_7d

In [141]:
df_3d_song = df_play_Apr.groupBy(['uid','3dayofMon']).agg({'song_id':'count'}).groupBy('uid').agg({'count(song_id)':'sum'}).alias('3dsong')
df_3d_play = df_play_Apr.groupBy(['uid','3dayofMon']).agg({'new_play_time':'sum'}).groupBy('uid').agg({'sum(new_play_time)':'sum'}).alias('3dplay')

df_temp = df_3d_song.join(df_3d_play,col('3dsong.uid') == col('3dplay.uid'),'inner')
df_temp = df_temp.withColumn('freq_3d_song',df_temp['sum(count(song_id))']/10)
df_temp = df_temp.withColumn('freq_3d_playtime',df_temp['sum(sum(new_play_time))']/10)
df_temp = df_temp.select(['3dsong.uid','freq_3d_song','freq_3d_playtime']).alias('3d')
df_final = df_final.join(df_temp,col('final.uid')==col('3d.uid'),'inner').select(['final.uid','freq_1d_song','freq_1d_playtime','freq_3d_song','freq_3d_playtime']).alias('final')


In [142]:
df_5d_song = df_play_Apr.groupBy(['uid','5dayofMon']).agg({'song_id':'count'}).groupBy('uid').agg({'count(song_id)':'sum'}).alias('5dsong')
df_5d_play = df_play_Apr.groupBy(['uid','5dayofMon']).agg({'new_play_time':'sum'}).groupBy('uid').agg({'sum(new_play_time)':'sum'}).alias('5dplay')

df_temp = df_5d_song.join(df_5d_play,col('5dsong.uid') == col('5dplay.uid'),'inner')
df_temp = df_temp.withColumn('freq_5d_song',df_temp['sum(count(song_id))']/6)
df_temp = df_temp.withColumn('freq_5d_playtime',df_temp['sum(sum(new_play_time))']/6)
df_temp = df_temp.select(['5dsong.uid','freq_5d_song','freq_5d_playtime']).alias('5d')
df_final = df_final.join(df_temp,col('final.uid')==col('5d.uid'),'inner').select(['final.uid','freq_1d_song','freq_1d_playtime','freq_3d_song','freq_3d_playtime','freq_5d_song','freq_5d_playtime']).alias('final')


In [143]:
df_7d_song = df_play_Apr.groupBy(['uid','7dayofMon']).agg({'song_id':'count'}).groupBy('uid').agg({'count(song_id)':'sum'}).alias('7dsong')
df_7d_play = df_play_Apr.groupBy(['uid','7dayofMon']).agg({'new_play_time':'sum'}).groupBy('uid').agg({'sum(new_play_time)':'sum'}).alias('7dplay')

df_temp = df_7d_song.join(df_7d_play,col('7dsong.uid') == col('7dplay.uid'),'inner')
df_temp = df_temp.withColumn('freq_7d_song',df_temp['sum(count(song_id))']/4)
df_temp = df_temp.withColumn('freq_7d_playtime',df_temp['sum(sum(new_play_time))']/4)
df_temp = df_temp.select(['7dsong.uid','freq_7d_song','freq_7d_playtime']).alias('7d')
df_final = df_final.join(df_temp,col('final.uid')==col('7d.uid'),'inner').select(['final.uid','freq_1d_song','freq_1d_playtime','freq_3d_song','freq_3d_playtime','freq_5d_song','freq_5d_playtime','freq_7d_song','freq_7d_playtime']).alias('final')


In [144]:
df_final.show(5)

+--------+-------------------+------------------+------------+----------------+------------------+------------------+------------+----------------+
|     uid|       freq_1d_song|  freq_1d_playtime|freq_3d_song|freq_3d_playtime|      freq_5d_song|  freq_5d_playtime|freq_7d_song|freq_7d_playtime|
+--------+-------------------+------------------+------------+----------------+------------------+------------------+------------+----------------+
|10906795|0.16666666666666666|40.166666666666664|         0.5|           120.5|0.8333333333333334|200.83333333333334|        1.25|          301.25|
|13277485| 0.5333333333333333|46.233333333333334|         1.6|           138.7|2.6666666666666665|231.16666666666666|         4.0|          346.75|
|13610475| 49.666666666666664|            3006.6|       149.0|          9019.8|248.33333333333334|           15033.0|       372.5|         22549.5|
|20974764|                1.9|             226.1|         5.7|           678.3|               9.5|            11

429392

In [146]:
df_final.write.save('play_log_freq')

# label

In [147]:
df_play_May = df_play.filter(df_play['mon']==5)

In [156]:
df_play_May.show(5)

+---------+----------+-------+-------------+---------------+----------+---+--------+---------+---------+---------+----------+
|      uid|new_device|song_id|new_play_time|new_song_length|      date|mon|dayofMon|3dayofMon|5dayofMon|7dayofMon|14dayofMon|
+---------+----------+-------+-------------+---------------+----------+---+--------+---------+---------+---------+----------+
|167942200|        ar| 442218|        260.0|          260.0|2017-05-01|  5|       1|      0.0|      0.0|      0.0|       0.0|
|167682905|        ar| 440614|          4.0|          253.0|2017-05-01|  5|       1|      0.0|      0.0|      0.0|       0.0|
|168009486|        ar| 157606|        288.0|          288.0|2017-05-01|  5|       1|      0.0|      0.0|      0.0|       0.0|
|168022196|        ar| 835317|          3.0|          260.0|2017-05-01|  5|       1|      0.0|      0.0|      0.0|       0.0|
|167879980|        ar|  90861|        300.0|          300.0|2017-05-01|  5|       1|      0.0|      0.0|      0.0|    

In [160]:
df_used = df_play_May.select('uid').alias('active')

In [158]:
df_used.show(5)

+---------+
|      uid|
+---------+
|167942200|
|167682905|
|168009486|
|168022196|
|167879980|
+---------+
only showing top 5 rows



In [179]:
test = df_final.join(df_used,col('final.uid') == col('active.uid'),'left')

In [163]:
test.show(5)

+--------+-------------------+------------------+------------+----------------+------------------+------------------+------------+----------------+--------+
|     uid|       freq_1d_song|  freq_1d_playtime|freq_3d_song|freq_3d_playtime|      freq_5d_song|  freq_5d_playtime|freq_7d_song|freq_7d_playtime|     uid|
+--------+-------------------+------------------+------------+----------------+------------------+------------------+------------+----------------+--------+
|10906795|0.16666666666666666|40.166666666666664|         0.5|           120.5|0.8333333333333334|200.83333333333334|        1.25|          301.25|    null|
|13277485| 0.5333333333333333|46.233333333333334|         1.6|           138.7|2.6666666666666665|231.16666666666666|         4.0|          346.75|    null|
|13610475| 49.666666666666664|            3006.6|       149.0|          9019.8|248.33333333333334|           15033.0|       372.5|         22549.5|13610475|
|13610475| 49.666666666666664|            3006.6|       14

In [180]:
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs) 



In [181]:
df1 = test.where(col('active.uid').isNotNull())
df2 = test.where(col('active.uid').isNull())

In [182]:
df1 = df1.withColumn('label',lit(1))
df2 = df2.withColumn('label',lit(0))
res = unionAll(*[df1, df2])

In [184]:
res = res.select(['final.uid','freq_1d_song','freq_1d_playtime','freq_3d_song','freq_3d_playtime','freq_5d_song','freq_5d_playtime','freq_7d_song','freq_7d_playtime','label'])

In [185]:
res.show(5)

+--------+------------------+----------------+------------+----------------+------------------+----------------+------------+----------------+-----+
|     uid|      freq_1d_song|freq_1d_playtime|freq_3d_song|freq_3d_playtime|      freq_5d_song|freq_5d_playtime|freq_7d_song|freq_7d_playtime|label|
+--------+------------------+----------------+------------+----------------+------------------+----------------+------------+----------------+-----+
|13610475|49.666666666666664|          3006.6|       149.0|          9019.8|248.33333333333334|         15033.0|       372.5|         22549.5|    1|
|13610475|49.666666666666664|          3006.6|       149.0|          9019.8|248.33333333333334|         15033.0|       372.5|         22549.5|    1|
|13610475|49.666666666666664|          3006.6|       149.0|          9019.8|248.33333333333334|         15033.0|       372.5|         22549.5|    1|
|13610475|49.666666666666664|          3006.6|       149.0|          9019.8|248.33333333333334|         15

In [186]:
res.printSchema()

root
 |-- uid: integer (nullable = true)
 |-- freq_1d_song: double (nullable = true)
 |-- freq_1d_playtime: double (nullable = true)
 |-- freq_3d_song: double (nullable = true)
 |-- freq_3d_playtime: double (nullable = true)
 |-- freq_5d_song: double (nullable = true)
 |-- freq_5d_playtime: double (nullable = true)
 |-- freq_7d_song: double (nullable = true)
 |-- freq_7d_playtime: double (nullable = true)
 |-- label: integer (nullable = false)



# transform to a dataframe can be used by ml lib

In [187]:
from pyspark.ml.feature import VectorAssembler


In [202]:
res.columns

['uid',
 'freq_1d_song',
 'freq_1d_playtime',
 'freq_3d_song',
 'freq_3d_playtime',
 'freq_5d_song',
 'freq_5d_playtime',
 'freq_7d_song',
 'freq_7d_playtime',
 'label']

In [203]:
assembler = VectorAssembler(
  inputCols=['freq_1d_song',
 'freq_1d_playtime',
 'freq_3d_song',
 'freq_3d_playtime',
 'freq_5d_song',
 'freq_5d_playtime',
 'freq_7d_song',
 'freq_7d_playtime'],
              outputCol="features")

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:53312)
Traceback (most recent call last):
  File "/Users/wanjiewang/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 852, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/wanjiewang/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 990, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:53312)

In [191]:
output = assembler.transform(res)


In [199]:
output.show(5)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:53312)
Traceback (most recent call last):
  File "/Users/wanjiewang/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 852, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/wanjiewang/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 990, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:53312)

In [192]:
final_data = output.select("features",'label')

In [193]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [197]:
train_data.show(5)

Py4JJavaError: An error occurred while calling o5116.showString.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(uid#2237, 200)
+- *(125) HashAggregate(keys=[uid#2237], functions=[partial_count(song_id#2239)], output=[uid#2237, count#22433L])
   +- Union
      :- *(1) Project [uid#2237, song_id#2239]
      :  +- *(1) Filter ((month(date#2242) = 4) && isnotnull(uid#2237))
      :     +- *(1) FileScan csv [uid#2237,song_id#2239,date#2242] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(2) Project [uid#2249, song_id#2251]
      :  +- *(2) Filter ((month(date#2254) = 4) && isnotnull(uid#2249))
      :     +- *(2) FileScan csv [uid#2249,song_id#2251,date#2254] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(3) Project [uid#2267, song_id#2269]
      :  +- *(3) Filter ((month(date#2272) = 4) && isnotnull(uid#2267))
      :     +- *(3) FileScan csv [uid#2267,song_id#2269,date#2272] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(4) Project [uid#2285, song_id#2287]
      :  +- *(4) Filter ((month(date#2290) = 4) && isnotnull(uid#2285))
      :     +- *(4) FileScan csv [uid#2285,song_id#2287,date#2290] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(5) Project [uid#2303, song_id#2305]
      :  +- *(5) Filter ((month(date#2308) = 4) && isnotnull(uid#2303))
      :     +- *(5) FileScan csv [uid#2303,song_id#2305,date#2308] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(6) Project [uid#2321, song_id#2323]
      :  +- *(6) Filter ((month(date#2326) = 4) && isnotnull(uid#2321))
      :     +- *(6) FileScan csv [uid#2321,song_id#2323,date#2326] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(7) Project [uid#2339, song_id#2341]
      :  +- *(7) Filter ((month(date#2344) = 4) && isnotnull(uid#2339))
      :     +- *(7) FileScan csv [uid#2339,song_id#2341,date#2344] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(8) Project [uid#2357, song_id#2359]
      :  +- *(8) Filter ((month(date#2362) = 4) && isnotnull(uid#2357))
      :     +- *(8) FileScan csv [uid#2357,song_id#2359,date#2362] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(9) Project [uid#2375, song_id#2377]
      :  +- *(9) Filter ((month(date#2380) = 4) && isnotnull(uid#2375))
      :     +- *(9) FileScan csv [uid#2375,song_id#2377,date#2380] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(10) Project [uid#2393, song_id#2395]
      :  +- *(10) Filter ((month(date#2398) = 4) && isnotnull(uid#2393))
      :     +- *(10) FileScan csv [uid#2393,song_id#2395,date#2398] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(11) Project [uid#2411, song_id#2413]
      :  +- *(11) Filter ((month(date#2416) = 4) && isnotnull(uid#2411))
      :     +- *(11) FileScan csv [uid#2411,song_id#2413,date#2416] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(12) Project [uid#2429, song_id#2431]
      :  +- *(12) Filter ((month(date#2434) = 4) && isnotnull(uid#2429))
      :     +- *(12) FileScan csv [uid#2429,song_id#2431,date#2434] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(13) Project [uid#2447, song_id#2449]
      :  +- *(13) Filter ((month(date#2452) = 4) && isnotnull(uid#2447))
      :     +- *(13) FileScan csv [uid#2447,song_id#2449,date#2452] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(14) Project [uid#2465, song_id#2467]
      :  +- *(14) Filter ((month(date#2470) = 4) && isnotnull(uid#2465))
      :     +- *(14) FileScan csv [uid#2465,song_id#2467,date#2470] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(15) Project [uid#2483, song_id#2485]
      :  +- *(15) Filter ((month(date#2488) = 4) && isnotnull(uid#2483))
      :     +- *(15) FileScan csv [uid#2483,song_id#2485,date#2488] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(16) Project [uid#2501, song_id#2503]
      :  +- *(16) Filter ((month(date#2506) = 4) && isnotnull(uid#2501))
      :     +- *(16) FileScan csv [uid#2501,song_id#2503,date#2506] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(17) Project [uid#2519, song_id#2521]
      :  +- *(17) Filter ((month(date#2524) = 4) && isnotnull(uid#2519))
      :     +- *(17) FileScan csv [uid#2519,song_id#2521,date#2524] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(18) Project [uid#2537, song_id#2539]
      :  +- *(18) Filter ((month(date#2542) = 4) && isnotnull(uid#2537))
      :     +- *(18) FileScan csv [uid#2537,song_id#2539,date#2542] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(19) Project [uid#2555, song_id#2557]
      :  +- *(19) Filter ((month(date#2560) = 4) && isnotnull(uid#2555))
      :     +- *(19) FileScan csv [uid#2555,song_id#2557,date#2560] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(20) Project [uid#2573, song_id#2575]
      :  +- *(20) Filter ((month(date#2578) = 4) && isnotnull(uid#2573))
      :     +- *(20) FileScan csv [uid#2573,song_id#2575,date#2578] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(21) Project [uid#2591, song_id#2593]
      :  +- *(21) Filter ((month(date#2596) = 4) && isnotnull(uid#2591))
      :     +- *(21) FileScan csv [uid#2591,song_id#2593,date#2596] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(22) Project [uid#2609, song_id#2611]
      :  +- *(22) Filter ((month(date#2614) = 4) && isnotnull(uid#2609))
      :     +- *(22) FileScan csv [uid#2609,song_id#2611,date#2614] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(23) Project [uid#2627, song_id#2629]
      :  +- *(23) Filter ((month(date#2632) = 4) && isnotnull(uid#2627))
      :     +- *(23) FileScan csv [uid#2627,song_id#2629,date#2632] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(24) Project [uid#2645, song_id#2647]
      :  +- *(24) Filter ((month(date#2650) = 4) && isnotnull(uid#2645))
      :     +- *(24) FileScan csv [uid#2645,song_id#2647,date#2650] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(25) Project [uid#2663, song_id#2665]
      :  +- *(25) Filter ((month(date#2668) = 4) && isnotnull(uid#2663))
      :     +- *(25) FileScan csv [uid#2663,song_id#2665,date#2668] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(26) Project [uid#2681, song_id#2683]
      :  +- *(26) Filter ((month(date#2686) = 4) && isnotnull(uid#2681))
      :     +- *(26) FileScan csv [uid#2681,song_id#2683,date#2686] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(27) Project [uid#2699, song_id#2701]
      :  +- *(27) Filter ((month(date#2704) = 4) && isnotnull(uid#2699))
      :     +- *(27) FileScan csv [uid#2699,song_id#2701,date#2704] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(28) Project [uid#2717, song_id#2719]
      :  +- *(28) Filter ((month(date#2722) = 4) && isnotnull(uid#2717))
      :     +- *(28) FileScan csv [uid#2717,song_id#2719,date#2722] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(29) Project [uid#2735, song_id#2737]
      :  +- *(29) Filter ((month(date#2740) = 4) && isnotnull(uid#2735))
      :     +- *(29) FileScan csv [uid#2735,song_id#2737,date#2740] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(30) Project [uid#2753, song_id#2755]
      :  +- *(30) Filter ((month(date#2758) = 4) && isnotnull(uid#2753))
      :     +- *(30) FileScan csv [uid#2753,song_id#2755,date#2758] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(31) Project [uid#2771, song_id#2773]
      :  +- *(31) Filter ((month(date#2776) = 4) && isnotnull(uid#2771))
      :     +- *(31) FileScan csv [uid#2771,song_id#2773,date#2776] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(32) Project [uid#2789, song_id#2791]
      :  +- *(32) Filter ((month(date#2794) = 4) && isnotnull(uid#2789))
      :     +- *(32) FileScan csv [uid#2789,song_id#2791,date#2794] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(33) Project [uid#2807, song_id#2809]
      :  +- *(33) Filter ((month(date#2812) = 4) && isnotnull(uid#2807))
      :     +- *(33) FileScan csv [uid#2807,song_id#2809,date#2812] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(34) Project [uid#2825, song_id#2827]
      :  +- *(34) Filter ((month(date#2830) = 4) && isnotnull(uid#2825))
      :     +- *(34) FileScan csv [uid#2825,song_id#2827,date#2830] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(35) Project [uid#2843, song_id#2845]
      :  +- *(35) Filter ((month(date#2848) = 4) && isnotnull(uid#2843))
      :     +- *(35) FileScan csv [uid#2843,song_id#2845,date#2848] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(36) Project [uid#2861, song_id#2863]
      :  +- *(36) Filter ((month(date#2866) = 4) && isnotnull(uid#2861))
      :     +- *(36) FileScan csv [uid#2861,song_id#2863,date#2866] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(37) Project [uid#2879, song_id#2881]
      :  +- *(37) Filter ((month(date#2884) = 4) && isnotnull(uid#2879))
      :     +- *(37) FileScan csv [uid#2879,song_id#2881,date#2884] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(38) Project [uid#2897, song_id#2899]
      :  +- *(38) Filter ((month(date#2902) = 4) && isnotnull(uid#2897))
      :     +- *(38) FileScan csv [uid#2897,song_id#2899,date#2902] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(39) Project [uid#2915, song_id#2917]
      :  +- *(39) Filter ((month(date#2920) = 4) && isnotnull(uid#2915))
      :     +- *(39) FileScan csv [uid#2915,song_id#2917,date#2920] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(40) Project [uid#2933, song_id#2935]
      :  +- *(40) Filter ((month(date#2938) = 4) && isnotnull(uid#2933))
      :     +- *(40) FileScan csv [uid#2933,song_id#2935,date#2938] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(41) Project [uid#2951, song_id#2953]
      :  +- *(41) Filter ((month(date#2956) = 4) && isnotnull(uid#2951))
      :     +- *(41) FileScan csv [uid#2951,song_id#2953,date#2956] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(42) Project [uid#2969, song_id#2971]
      :  +- *(42) Filter ((month(date#2974) = 4) && isnotnull(uid#2969))
      :     +- *(42) FileScan csv [uid#2969,song_id#2971,date#2974] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(43) Project [uid#2987, song_id#2989]
      :  +- *(43) Filter ((month(date#2992) = 4) && isnotnull(uid#2987))
      :     +- *(43) FileScan csv [uid#2987,song_id#2989,date#2992] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(44) Project [uid#3005, song_id#3007]
      :  +- *(44) Filter ((month(date#3010) = 4) && isnotnull(uid#3005))
      :     +- *(44) FileScan csv [uid#3005,song_id#3007,date#3010] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(45) Project [uid#3023, song_id#3025]
      :  +- *(45) Filter ((month(date#3028) = 4) && isnotnull(uid#3023))
      :     +- *(45) FileScan csv [uid#3023,song_id#3025,date#3028] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(46) Project [uid#3041, song_id#3043]
      :  +- *(46) Filter ((month(date#3046) = 4) && isnotnull(uid#3041))
      :     +- *(46) FileScan csv [uid#3041,song_id#3043,date#3046] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(47) Project [uid#3059, song_id#3061]
      :  +- *(47) Filter ((month(date#3064) = 4) && isnotnull(uid#3059))
      :     +- *(47) FileScan csv [uid#3059,song_id#3061,date#3064] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(48) Project [uid#3077, song_id#3079]
      :  +- *(48) Filter ((month(date#3082) = 4) && isnotnull(uid#3077))
      :     +- *(48) FileScan csv [uid#3077,song_id#3079,date#3082] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(49) Project [uid#3095, song_id#3097]
      :  +- *(49) Filter ((month(date#3100) = 4) && isnotnull(uid#3095))
      :     +- *(49) FileScan csv [uid#3095,song_id#3097,date#3100] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(50) Project [uid#3113, song_id#3115]
      :  +- *(50) Filter ((month(date#3118) = 4) && isnotnull(uid#3113))
      :     +- *(50) FileScan csv [uid#3113,song_id#3115,date#3118] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(51) Project [uid#3131, song_id#3133]
      :  +- *(51) Filter ((month(date#3136) = 4) && isnotnull(uid#3131))
      :     +- *(51) FileScan csv [uid#3131,song_id#3133,date#3136] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(52) Project [uid#3149, song_id#3151]
      :  +- *(52) Filter ((month(date#3154) = 4) && isnotnull(uid#3149))
      :     +- *(52) FileScan csv [uid#3149,song_id#3151,date#3154] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(53) Project [uid#3167, song_id#3169]
      :  +- *(53) Filter ((month(date#3172) = 4) && isnotnull(uid#3167))
      :     +- *(53) FileScan csv [uid#3167,song_id#3169,date#3172] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(54) Project [uid#3185, song_id#3187]
      :  +- *(54) Filter ((month(date#3190) = 4) && isnotnull(uid#3185))
      :     +- *(54) FileScan csv [uid#3185,song_id#3187,date#3190] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(55) Project [uid#3203, song_id#3205]
      :  +- *(55) Filter ((month(date#3208) = 4) && isnotnull(uid#3203))
      :     +- *(55) FileScan csv [uid#3203,song_id#3205,date#3208] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(56) Project [uid#3221, song_id#3223]
      :  +- *(56) Filter ((month(date#3226) = 4) && isnotnull(uid#3221))
      :     +- *(56) FileScan csv [uid#3221,song_id#3223,date#3226] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(57) Project [uid#3239, song_id#3241]
      :  +- *(57) Filter ((month(date#3244) = 4) && isnotnull(uid#3239))
      :     +- *(57) FileScan csv [uid#3239,song_id#3241,date#3244] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(58) Project [uid#3257, song_id#3259]
      :  +- *(58) Filter ((month(date#3262) = 4) && isnotnull(uid#3257))
      :     +- *(58) FileScan csv [uid#3257,song_id#3259,date#3262] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(59) Project [uid#3275, song_id#3277]
      :  +- *(59) Filter ((month(date#3280) = 4) && isnotnull(uid#3275))
      :     +- *(59) FileScan csv [uid#3275,song_id#3277,date#3280] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(60) Project [uid#3293, song_id#3295]
      :  +- *(60) Filter ((month(date#3298) = 4) && isnotnull(uid#3293))
      :     +- *(60) FileScan csv [uid#3293,song_id#3295,date#3298] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(61) Project [uid#3311, song_id#3313]
      :  +- *(61) Filter ((month(date#3316) = 4) && isnotnull(uid#3311))
      :     +- *(61) FileScan csv [uid#3311,song_id#3313,date#3316] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(62) Project [uid#3329, song_id#3331]
      :  +- *(62) Filter ((month(date#3334) = 4) && isnotnull(uid#3329))
      :     +- *(62) FileScan csv [uid#3329,song_id#3331,date#3334] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(63) Project [uid#3347, song_id#3349]
      :  +- *(63) Filter ((month(date#3352) = 4) && isnotnull(uid#3347))
      :     +- *(63) FileScan csv [uid#3347,song_id#3349,date#3352] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(64) Project [uid#3365, song_id#3367]
      :  +- *(64) Filter ((month(date#3370) = 4) && isnotnull(uid#3365))
      :     +- *(64) FileScan csv [uid#3365,song_id#3367,date#3370] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(65) Project [uid#3383, song_id#3385]
      :  +- *(65) Filter ((month(date#3388) = 4) && isnotnull(uid#3383))
      :     +- *(65) FileScan csv [uid#3383,song_id#3385,date#3388] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(66) Project [uid#3401, song_id#3403]
      :  +- *(66) Filter ((month(date#3406) = 4) && isnotnull(uid#3401))
      :     +- *(66) FileScan csv [uid#3401,song_id#3403,date#3406] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(67) Project [uid#3419, song_id#3421]
      :  +- *(67) Filter ((month(date#3424) = 4) && isnotnull(uid#3419))
      :     +- *(67) FileScan csv [uid#3419,song_id#3421,date#3424] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(68) Project [uid#3437, song_id#3439]
      :  +- *(68) Filter ((month(date#3442) = 4) && isnotnull(uid#3437))
      :     +- *(68) FileScan csv [uid#3437,song_id#3439,date#3442] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(69) Project [uid#3455, song_id#3457]
      :  +- *(69) Filter ((month(date#3460) = 4) && isnotnull(uid#3455))
      :     +- *(69) FileScan csv [uid#3455,song_id#3457,date#3460] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(70) Project [uid#3473, song_id#3475]
      :  +- *(70) Filter ((month(date#3478) = 4) && isnotnull(uid#3473))
      :     +- *(70) FileScan csv [uid#3473,song_id#3475,date#3478] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(71) Project [uid#3491, song_id#3493]
      :  +- *(71) Filter ((month(date#3496) = 4) && isnotnull(uid#3491))
      :     +- *(71) FileScan csv [uid#3491,song_id#3493,date#3496] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(72) Project [uid#3509, song_id#3511]
      :  +- *(72) Filter ((month(date#3514) = 4) && isnotnull(uid#3509))
      :     +- *(72) FileScan csv [uid#3509,song_id#3511,date#3514] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(73) Project [uid#3527, song_id#3529]
      :  +- *(73) Filter ((month(date#3532) = 4) && isnotnull(uid#3527))
      :     +- *(73) FileScan csv [uid#3527,song_id#3529,date#3532] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(74) Project [uid#3545, song_id#3547]
      :  +- *(74) Filter ((month(date#3550) = 4) && isnotnull(uid#3545))
      :     +- *(74) FileScan csv [uid#3545,song_id#3547,date#3550] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(75) Project [uid#3563, song_id#3565]
      :  +- *(75) Filter ((month(date#3568) = 4) && isnotnull(uid#3563))
      :     +- *(75) FileScan csv [uid#3563,song_id#3565,date#3568] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(76) Project [uid#3581, song_id#3583]
      :  +- *(76) Filter ((month(date#3586) = 4) && isnotnull(uid#3581))
      :     +- *(76) FileScan csv [uid#3581,song_id#3583,date#3586] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(77) Project [uid#3599, song_id#3601]
      :  +- *(77) Filter ((month(date#3604) = 4) && isnotnull(uid#3599))
      :     +- *(77) FileScan csv [uid#3599,song_id#3601,date#3604] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(78) Project [uid#3617, song_id#3619]
      :  +- *(78) Filter ((month(date#3622) = 4) && isnotnull(uid#3617))
      :     +- *(78) FileScan csv [uid#3617,song_id#3619,date#3622] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(79) Project [uid#3635, song_id#3637]
      :  +- *(79) Filter ((month(date#3640) = 4) && isnotnull(uid#3635))
      :     +- *(79) FileScan csv [uid#3635,song_id#3637,date#3640] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(80) Project [uid#3653, song_id#3655]
      :  +- *(80) Filter ((month(date#3658) = 4) && isnotnull(uid#3653))
      :     +- *(80) FileScan csv [uid#3653,song_id#3655,date#3658] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(81) Project [uid#3671, song_id#3673]
      :  +- *(81) Filter ((month(date#3676) = 4) && isnotnull(uid#3671))
      :     +- *(81) FileScan csv [uid#3671,song_id#3673,date#3676] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(82) Project [uid#3689, song_id#3691]
      :  +- *(82) Filter ((month(date#3694) = 4) && isnotnull(uid#3689))
      :     +- *(82) FileScan csv [uid#3689,song_id#3691,date#3694] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(83) Project [uid#3707, song_id#3709]
      :  +- *(83) Filter ((month(date#3712) = 4) && isnotnull(uid#3707))
      :     +- *(83) FileScan csv [uid#3707,song_id#3709,date#3712] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(84) Project [uid#3725, song_id#3727]
      :  +- *(84) Filter ((month(date#3730) = 4) && isnotnull(uid#3725))
      :     +- *(84) FileScan csv [uid#3725,song_id#3727,date#3730] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(85) Project [uid#3743, song_id#3745]
      :  +- *(85) Filter ((month(date#3748) = 4) && isnotnull(uid#3743))
      :     +- *(85) FileScan csv [uid#3743,song_id#3745,date#3748] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(86) Project [uid#3761, song_id#3763]
      :  +- *(86) Filter ((month(date#3766) = 4) && isnotnull(uid#3761))
      :     +- *(86) FileScan csv [uid#3761,song_id#3763,date#3766] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(87) Project [uid#3779, song_id#3781]
      :  +- *(87) Filter ((month(date#3784) = 4) && isnotnull(uid#3779))
      :     +- *(87) FileScan csv [uid#3779,song_id#3781,date#3784] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(88) Project [uid#3797, song_id#3799]
      :  +- *(88) Filter ((month(date#3802) = 4) && isnotnull(uid#3797))
      :     +- *(88) FileScan csv [uid#3797,song_id#3799,date#3802] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(89) Project [uid#3815, song_id#3817]
      :  +- *(89) Filter ((month(date#3820) = 4) && isnotnull(uid#3815))
      :     +- *(89) FileScan csv [uid#3815,song_id#3817,date#3820] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(90) Project [uid#3833, song_id#3835]
      :  +- *(90) Filter ((month(date#3838) = 4) && isnotnull(uid#3833))
      :     +- *(90) FileScan csv [uid#3833,song_id#3835,date#3838] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(91) Project [uid#3851, song_id#3853]
      :  +- *(91) Filter ((month(date#3856) = 4) && isnotnull(uid#3851))
      :     +- *(91) FileScan csv [uid#3851,song_id#3853,date#3856] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(92) Project [uid#3869, song_id#3871]
      :  +- *(92) Filter ((month(date#3874) = 4) && isnotnull(uid#3869))
      :     +- *(92) FileScan csv [uid#3869,song_id#3871,date#3874] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(93) Project [uid#3887, song_id#3889]
      :  +- *(93) Filter ((month(date#3892) = 4) && isnotnull(uid#3887))
      :     +- *(93) FileScan csv [uid#3887,song_id#3889,date#3892] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(94) Project [uid#3905, song_id#3907]
      :  +- *(94) Filter ((month(date#3910) = 4) && isnotnull(uid#3905))
      :     +- *(94) FileScan csv [uid#3905,song_id#3907,date#3910] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(95) Project [uid#3923, song_id#3925]
      :  +- *(95) Filter ((month(date#3928) = 4) && isnotnull(uid#3923))
      :     +- *(95) FileScan csv [uid#3923,song_id#3925,date#3928] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(96) Project [uid#3941, song_id#3943]
      :  +- *(96) Filter ((month(date#3946) = 4) && isnotnull(uid#3941))
      :     +- *(96) FileScan csv [uid#3941,song_id#3943,date#3946] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(97) Project [uid#3959, song_id#3961]
      :  +- *(97) Filter ((month(date#3964) = 4) && isnotnull(uid#3959))
      :     +- *(97) FileScan csv [uid#3959,song_id#3961,date#3964] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(98) Project [uid#3977, song_id#3979]
      :  +- *(98) Filter ((month(date#3982) = 4) && isnotnull(uid#3977))
      :     +- *(98) FileScan csv [uid#3977,song_id#3979,date#3982] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(99) Project [uid#3995, song_id#3997]
      :  +- *(99) Filter ((month(date#4000) = 4) && isnotnull(uid#3995))
      :     +- *(99) FileScan csv [uid#3995,song_id#3997,date#4000] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(100) Project [uid#4013, song_id#4015]
      :  +- *(100) Filter ((month(date#4018) = 4) && isnotnull(uid#4013))
      :     +- *(100) FileScan csv [uid#4013,song_id#4015,date#4018] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(101) Project [uid#4031, song_id#4033]
      :  +- *(101) Filter ((month(date#4036) = 4) && isnotnull(uid#4031))
      :     +- *(101) FileScan csv [uid#4031,song_id#4033,date#4036] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(102) Project [uid#4049, song_id#4051]
      :  +- *(102) Filter ((month(date#4054) = 4) && isnotnull(uid#4049))
      :     +- *(102) FileScan csv [uid#4049,song_id#4051,date#4054] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(103) Project [uid#4067, song_id#4069]
      :  +- *(103) Filter ((month(date#4072) = 4) && isnotnull(uid#4067))
      :     +- *(103) FileScan csv [uid#4067,song_id#4069,date#4072] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(104) Project [uid#4085, song_id#4087]
      :  +- *(104) Filter ((month(date#4090) = 4) && isnotnull(uid#4085))
      :     +- *(104) FileScan csv [uid#4085,song_id#4087,date#4090] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(105) Project [uid#4103, song_id#4105]
      :  +- *(105) Filter ((month(date#4108) = 4) && isnotnull(uid#4103))
      :     +- *(105) FileScan csv [uid#4103,song_id#4105,date#4108] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(106) Project [uid#4121, song_id#4123]
      :  +- *(106) Filter ((month(date#4126) = 4) && isnotnull(uid#4121))
      :     +- *(106) FileScan csv [uid#4121,song_id#4123,date#4126] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(107) Project [uid#4139, song_id#4141]
      :  +- *(107) Filter ((month(date#4144) = 4) && isnotnull(uid#4139))
      :     +- *(107) FileScan csv [uid#4139,song_id#4141,date#4144] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(108) Project [uid#4157, song_id#4159]
      :  +- *(108) Filter ((month(date#4162) = 4) && isnotnull(uid#4157))
      :     +- *(108) FileScan csv [uid#4157,song_id#4159,date#4162] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(109) Project [uid#4175, song_id#4177]
      :  +- *(109) Filter ((month(date#4180) = 4) && isnotnull(uid#4175))
      :     +- *(109) FileScan csv [uid#4175,song_id#4177,date#4180] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(110) Project [uid#4193, song_id#4195]
      :  +- *(110) Filter ((month(date#4198) = 4) && isnotnull(uid#4193))
      :     +- *(110) FileScan csv [uid#4193,song_id#4195,date#4198] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(111) Project [uid#4211, song_id#4213]
      :  +- *(111) Filter ((month(date#4216) = 4) && isnotnull(uid#4211))
      :     +- *(111) FileScan csv [uid#4211,song_id#4213,date#4216] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(112) Project [uid#4229, song_id#4231]
      :  +- *(112) Filter ((month(date#4234) = 4) && isnotnull(uid#4229))
      :     +- *(112) FileScan csv [uid#4229,song_id#4231,date#4234] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(113) Project [uid#4247, song_id#4249]
      :  +- *(113) Filter ((month(date#4252) = 4) && isnotnull(uid#4247))
      :     +- *(113) FileScan csv [uid#4247,song_id#4249,date#4252] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(114) Project [uid#4265, song_id#4267]
      :  +- *(114) Filter ((month(date#4270) = 4) && isnotnull(uid#4265))
      :     +- *(114) FileScan csv [uid#4265,song_id#4267,date#4270] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(115) Project [uid#4283, song_id#4285]
      :  +- *(115) Filter ((month(date#4288) = 4) && isnotnull(uid#4283))
      :     +- *(115) FileScan csv [uid#4283,song_id#4285,date#4288] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(116) Project [uid#4301, song_id#4303]
      :  +- *(116) Filter ((month(date#4306) = 4) && isnotnull(uid#4301))
      :     +- *(116) FileScan csv [uid#4301,song_id#4303,date#4306] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(117) Project [uid#4319, song_id#4321]
      :  +- *(117) Filter ((month(date#4324) = 4) && isnotnull(uid#4319))
      :     +- *(117) FileScan csv [uid#4319,song_id#4321,date#4324] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(118) Project [uid#4337, song_id#4339]
      :  +- *(118) Filter ((month(date#4342) = 4) && isnotnull(uid#4337))
      :     +- *(118) FileScan csv [uid#4337,song_id#4339,date#4342] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(119) Project [uid#4355, song_id#4357]
      :  +- *(119) Filter ((month(date#4360) = 4) && isnotnull(uid#4355))
      :     +- *(119) FileScan csv [uid#4355,song_id#4357,date#4360] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(120) Project [uid#4373, song_id#4375]
      :  +- *(120) Filter ((month(date#4378) = 4) && isnotnull(uid#4373))
      :     +- *(120) FileScan csv [uid#4373,song_id#4375,date#4378] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(121) Project [uid#4391, song_id#4393]
      :  +- *(121) Filter ((month(date#4396) = 4) && isnotnull(uid#4391))
      :     +- *(121) FileScan csv [uid#4391,song_id#4393,date#4396] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(122) Project [uid#4409, song_id#4411]
      :  +- *(122) Filter ((month(date#4414) = 4) && isnotnull(uid#4409))
      :     +- *(122) FileScan csv [uid#4409,song_id#4411,date#4414] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      :- *(123) Project [uid#4427, song_id#4429]
      :  +- *(123) Filter ((month(date#4432) = 4) && isnotnull(uid#4427))
      :     +- *(123) FileScan csv [uid#4427,song_id#4429,date#4432] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>
      +- *(124) Project [uid#4445, song_id#4447]
         +- *(124) Filter ((month(date#4450) = 4) && isnotnull(uid#4445))
            +- *(124) FileScan csv [uid#4445,song_id#4447,date#4450] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/wanjiewang/PycharmProjects/music_box/music_box/processed_data/play/..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,song_id:int,date:date>

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
	at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.doExecute(WholeStageCodegenExec.scala:363)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.inputRDDs(SortMergeJoinExec.scala:386)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.doExecute(WholeStageCodegenExec.scala:363)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.inputRDDs(SortMergeJoinExec.scala:386)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.doExecute(WholeStageCodegenExec.scala:363)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.inputRDDs(SortMergeJoinExec.scala:386)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.doExecute(WholeStageCodegenExec.scala:363)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.inputRDDs(SortMergeJoinExec.scala:386)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.doExecute(WholeStageCodegenExec.scala:363)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.inputRDDs(SortMergeJoinExec.scala:386)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.UnionExec$$anonfun$doExecute$1.apply(basicPhysicalOperators.scala:557)
	at org.apache.spark.sql.execution.UnionExec$$anonfun$doExecute$1.apply(basicPhysicalOperators.scala:557)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:285)
	at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:557)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
	at org.apache.spark.sql.execution.SampleExec.inputRDDs(basicPhysicalOperators.scala:271)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.BaseLimitExec$class.inputRDDs(limit.scala:62)
	at org.apache.spark.sql.execution.LocalLimitExec.inputRDDs(limit.scala:97)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:214)
java.lang.Thread.run(Thread.java:748)

The currently active SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:214)
java.lang.Thread.run(Thread.java:748)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:99)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1478)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:96)
	at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:160)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:295)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:293)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:313)
	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.UnionExec$$anonfun$doExecute$1.apply(basicPhysicalOperators.scala:557)
	at org.apache.spark.sql.execution.UnionExec$$anonfun$doExecute$1.apply(basicPhysicalOperators.scala:557)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:557)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 149 more


In [194]:
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier

In [195]:
dtc = DecisionTreeClassifier(labelCol='label',featuresCol='features')
# rfc = RandomForestClassifier(labelCol='label',featuresCol='features')
# gbt = GBTClassifier(labelCol='label',featuresCol='features')

In [196]:
# Train the models (its three models, so it might take some time)
dtc_model = dtc.fit(train_data)
# rfc_model = rfc.fit(train_data)
# gbt_model = gbt.fit(train_data)

Py4JJavaError: An error occurred while calling o5118.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 220 in stage 235.0 failed 1 times, most recent failure: Lost task 220.0 in stage 235.0 (TID 44107, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.<init>(UnsafeSorterSpillWriter.java:52)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.spill(UnsafeExternalSorter.java:536)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:200)
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:180)
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:283)
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:310)
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:117)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:383)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:407)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2166.agg_doAggregateWithKeysOutput$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2166.sort_addToSorter$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2166.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2167.findNextInnerJoinRows$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2167.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$2.hasNext(WholeStageCodegenExec.scala:633)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2168.findNextInnerJoinRows$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2168.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$2.hasNext(WholeStageCodegenExec.scala:633)
	at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
	at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:793)
	at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:754)
	at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:916)
	at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:952)
	at org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2294.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
	at org.apache.spark.ml.classification.Classifier.getNumClasses(Classifier.scala:111)
	at org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:102)
	at org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:45)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:82)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.<init>(UnsafeSorterSpillWriter.java:52)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.spill(UnsafeExternalSorter.java:536)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:200)
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:180)
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:283)
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:310)
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:117)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:383)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:407)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2166.agg_doAggregateWithKeysOutput$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2166.sort_addToSorter$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2166.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2167.findNextInnerJoinRows$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2167.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$2.hasNext(WholeStageCodegenExec.scala:633)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2168.findNextInnerJoinRows$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2168.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$2.hasNext(WholeStageCodegenExec.scala:633)
	at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
	at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:793)
	at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:754)
	at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:916)
	at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:952)
	at org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2294.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)


In [None]:
dtc_predictions = dtc_model.transform(test_data)

# evaluation

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluatoriclassClassificationEvaluator

In [None]:
acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

In [None]:
dtc_acc = acc_evaluator.evaluate(dtc_predictions)