<p style="border:2px solid black"> </p>
<span style="font-family:Lucida Bright;">
<p style="margin-bottom:0.8cm"></p>
<center>
<font size="6"><b>Understanding Music Listening Habits</b></font>
<p style="margin-bottom:-0.1cm"></p>
<font size="6"><b>Using Large-scale Smartphone Data</b>  </font>
    
<p style="margin-bottom:0.5cm"></p>
<font size="3"><b>Wojciech Mazurkiewicz, DTU, 14 May 2021</b></font>
<p style="margin-bottom:1cm"></p>
<font size="5"><b>Data Cleaning</b></font>
<br>
<font size="3"><b></b></font>
</center>
<p style="margin-bottom:0.4cm"></p>
<p style="border:2px solid black"> </p>


# Initialization
<hr style="border:2px solid black"></hr>


In [1]:
%matplotlib inline

from toolbox.initialize import *

# spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")


ModuleNotFoundError: No module named 'toolbox'

# Load data
<hr style="border:2px solid black"></hr>

## Get the dataframes containing data in the raw format

In [None]:
# Get the full dataset
data_root_raw, data_subfolders_raw, data_files_raw = t.get_paths_raw_data()
df_raw = t.load_data_from_files(data_root_raw, spark, method='avro')

# Get the sample data
path_df_sample_raw_1E6 = \
    Config.Path.project_data_root / 'df_sample_raw_1E6.parquet'

df_sample_raw_1E6 = t.load_data_from_files(path_df_sample_raw_1E6, spark)

## Choose dataframe

In [None]:
df = df_raw

display_middle_results = False

# Cleaning
<hr style="border:2px solid black"></hr>


## Removing rows where activity has been deleted

In [3]:
df_clean = (
    df
    .where(f.col('deleted_time').isNull()
           | (f.col('deleted_time') == ''))
)

# Show the top rows of the resulting dataframe.
if display_middle_results:
    df_clean.limit(100).toPandas().head()


Execution time: 0.18348 s.


## Dropping duplicates of activity id (keeping the most recent)

In [4]:
# Define the time format used for the time stamp - strings in the
# raw database.
time_format = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"

# Define a partitioning by activity id.
window_id = Window.partitionBy('id')

# Drop rows containing the duplicates of the activity ID.
# Keep the activity ID with the latest timestamp-
df_clean = (
    df_clean
    
    # Convert the start time to timestamp.
    .withColumn('start_time',
                f.to_timestamp('start_time', time_format)) 
    
    # Save the latest start time for each activity ID in a separate column.
    .withColumn('latest_start_time', f.max('start_time').over(window_id))
    
    # Keep only rows with the latest start time for the given activity ID.
    .where(f.col('start_time') == f.col('latest_start_time'))
        
    # Keep only the first occurrence of each activity ID.
    .dropDuplicates(['id'])
    
    # Delete the column containing the lates start time.
    .drop('latest_start_time')
)

# Show the top rows of the resulting dataframe.
if display_middle_results:
    df_clean.limit(100).toPandas().head()

Execution time: 0.12910 s.


## Exploding the columns `devices` and `tracks` and flattening the schema

In [5]:
# Explore the column devices and tracks
# and flatten the schema.
df_clean = t.format_dataframe(df_clean)

# Show the top rows of the resulting dataframe.
if display_middle_results:
    df_clean.limit(100).toPandas().head()

Formatting dataframe:
	Exploding columns containing lists.
	Flattening the dataframe schema.
	Execution time: 0.11851


## Replacing start and end time with start time and duration

In [6]:
# Define the time format used for the time stamp - strings in the
# raw database.
time_format = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"

df_clean = (
    df_clean
#     .withColumn('start_time',
#                 f.to_timestamp('start_time', time_format))
    .withColumn('end_time',
                f.to_timestamp('end_time', time_format))
    .withColumn('activity_duration',
                f.col('end_time').cast(LongType()) 
                - f.col('start_time').cast(LongType()))   
    .withColumn('tracks_start_time',
                f.to_timestamp('tracks_start_time', time_format))
    .withColumn('tracks_end_time',
                f.to_timestamp('tracks_end_time', time_format))
    .withColumn('track_duration',
                f.col('tracks_end_time').cast(LongType()) 
                - f.col('tracks_start_time').cast(LongType()))
    .drop('end_time')
    .drop('tracks_end_time')
)

# Show the top rows of the resulting dataframe.
if display_middle_results:
    df_clean.limit(100).toPandas().head()
    

Execution time: 0.09664 s.


## Merging Track ID and Track URI

I can't decide whether it's a good idea or not. `tracks_id` seems to be a path to a file in the Sony system, which I can't see give any useful information, maybe aside of comparing the track properties of multiple rows with the same track id to look for inconsitencies. I will be dropping the merge for now.

In [None]:
# df_clean = (
#     df_clean
#     .withColumn('track_id',
#                 f.concat(f.col('tracks_id'),
#                          f.col('tracks_uri')))
#     .drop('tracks_id')
#     .drop('tracks_uri')
# )

# df_clean.limit(3).toPandas().head()

## Drop redundant columns

Let's remove the columns:

* `deleted_time`
* `devices_type`
* `yearmonth`

In [8]:
df_clean = (
    df_clean
    .drop('deleted_time')
    .drop('devices_type')
    .drop('yearmonth')
)

# Show the top rows of the resulting dataframe.
if display_middle_results:
    df_clean.limit(100).toPandas().head()

## Rename and organize columns

In [9]:
df_clean = (
    df_clean
    
    # Give columns meaningful names.
    .withColumnRenamed('id', 'activity_id')
    .withColumnRenamed('useruuid', 'user_id')
    .withColumnRenamed('start_time', 'activity_start_time')
    .withColumnRenamed('devices_name', 'device_name')
    .withColumnRenamed('devices_id', 'device_id')
    .withColumnRenamed('tracks_start_time', 'track_start_time')
    .withColumnRenamed('tracks_artist', 'track_artist')
    .withColumnRenamed('tracks_album', 'track_album')
    .withColumnRenamed('tracks_title', 'track_title')
    .withColumnRenamed('tracks_player', 'track_player')
    .withColumnRenamed('tracks_id', 'track_id')
    .withColumnRenamed('tracks_uri', 'track_uri')
    
    # Place the columns belonging to the same group next to each other.
    .select('user_id',
            'activity_id', 'activity_start_time', 'activity_duration',
            'device_id', 'device_name',
            'track_artist', 'track_title', 'track_album',
            'track_player', 'track_start_time', 'track_duration',
            'track_id', 'track_uri')
    
    # Sort the data.
    .orderBy(f.asc('user_id'),
             f.asc('activity_start_time'))
)

# Show the top rows of the resulting dataframe.
# df_clean.limit(100).toPandas().head()

Execution time: 0.10798 s.


## Replace undefined cell values with null

In [None]:
# Define a function that will replace cells with empty
# or undefined values with null.
def replace_invalid_with_null(column_name):
    return (f.when(((f.col(column_name) == '')
                   | (f.col(column_name) == '<unknown>')
                   | (f.col(column_name).contains( '�'))),
                  None)
            .otherwise(f.col(column_name))
           )

# Define the expression thaw will be used together with the
# "select" statement to replace undefined values with null.
expression_replace_invalid_with_null = \
    [replace_invalid_with_null(column_name).alias(column_name)
     for column_name in df.columns]

# Execute the cleaning.
df_clean = df_clean.select(*expression_replace_invalid_with_null)

# Show the top rows of the resulting dataframe.
if display_middle_results:
    df_clean.limit(100).toPandas().head()

# Save the dataframe
<hr style="border:2px solid black"></hr>


In [10]:
# Save the start time for timing.
start_time = time.time()

# Save the dataframe with partitions defined by the first
# two letters of the user ID.
(df_clean
 .withColumn('user_id_prefix', f.col('user_id').substr(0, 2))
 .write.mode('overwrite')
 .partitionBy('user_id_prefix')
 .parquet(str(Config.Path.project_data_root / 'df_clean'))
)

# Print the execution time.
print(f'Execution time: {time.time() - start_time:.5f} s.')

Py4JJavaError: An error occurred while calling o185.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:944)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:380)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:829)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 6.0 failed 1 times, most recent failure: Lost task 22.0 in stage 6.0 (TID 11030, sonydata, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:298)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeWriter$9(ParquetWriteSupport.scala:190)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeWriter$9$adapted(ParquetWriteSupport.scala:188)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$Lambda$5068/1046362177.apply(Unknown Source)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$writeFields$1(ParquetWriteSupport.scala:146)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$Lambda$5079/1156368835.apply$mcV$sp(Unknown Source)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeField(ParquetWriteSupport.scala:463)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.writeFields(ParquetWriteSupport.scala:146)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$write$1(ParquetWriteSupport.scala:136)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$Lambda$5078/1816009350.apply$mcV$sp(Unknown Source)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:451)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:136)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:54)
	at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
	at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
	at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:140)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:273)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$$Lambda$5075/707027490.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$$Lambda$5055/1406676144.apply(Unknown Source)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$1314/1218946998.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195)
	... 33 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:298)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeWriter$9(ParquetWriteSupport.scala:190)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeWriter$9$adapted(ParquetWriteSupport.scala:188)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$Lambda$5068/1046362177.apply(Unknown Source)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$writeFields$1(ParquetWriteSupport.scala:146)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$Lambda$5079/1156368835.apply$mcV$sp(Unknown Source)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeField(ParquetWriteSupport.scala:463)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.writeFields(ParquetWriteSupport.scala:146)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$write$1(ParquetWriteSupport.scala:136)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$Lambda$5078/1816009350.apply$mcV$sp(Unknown Source)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:451)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:136)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:54)
	at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
	at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
	at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:140)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:273)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$$Lambda$5075/707027490.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$$Lambda$5055/1406676144.apply(Unknown Source)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$1314/1218946998.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
