# Preprocessing

## Loading our data from S3

In [None]:
from pyspark.sql import functions as F

In [None]:
filepath = "s3://full-stack-bigdata-datasets/Big_Data/youtube_playlog.csv"
ACCESS_KEY_ID = " " # cle du compte student
SECRET_ACCESS_KEY = " " # secret key du compte student

hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", ACCESS_KEY_ID)
hadoop_conf.set("fs.s3a.secret.key", SECRET_ACCESS_KEY)
hadoop_conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")

In [None]:
#load data into PySpark DataFrame: `playlog`
### BEGIN STRIP ###
playlog = (spark.read.format('csv')\
             .option('header', 'true')\
             .option('inferSchema', 'true')\
             .load(filepath))
### END STRIP ###
playlog.show(5)

## First analysis

In [None]:
#print out our DataFrame's schema
### BEGIN STRIP ###
playlog.printSchema()
### END STRIP ###

In [None]:
#use `.describe(...)` on DataFrame
### BEGIN STRIP ###
playlog.describe().toPandas()
### END STRIP ###

Unnamed: 0,summary,timestamp,user,song
0,count,25739537.0,25739537.0,25739537
1,mean,1442700656.1045842,12697.352275450798,2.532571778181818E8
2,stddev,34432848.72371195,13094.065905828476,8.334645614940468E8
3,min,-139955897.0,0.0,---AtpxbkaE
4,max,1554321113.0,45903.0,zzzcFgRMY6c


### Missing values check

In [None]:
#count the missing values for each column
#put the result in a pandas DataFrame and print it out
### BEGIN STRIP ###
import pandas as pd

pd.DataFrame.from_dict(
  {c: playlog.filter(playlog[c].isNull()).count() for c in playlog.columns},
  orient='index', columns=['missing values']
).T
### END STRIP ###

Unnamed: 0,timestamp,user,song
missing values,0,0,0


### Duplicates check

In [None]:
# TODO: check if playlog without duplicates has the same number of rows as the original
### BEGIN STRIP ###
playlog.count() == playlog.dropDuplicates().count()
### END STRIP ###

In [None]:
#count the number of duplicates
### BEGIN STRIP ###
(playlog.count() - playlog.dropDuplicates().count())
### END STRIP ###

In [None]:
#compute descriptive statistics for duplicates 
#How many duplicates on average (for duplicated rows)?
#And the standard deviation?
#What' the min? The max? 

### BEGIN STRIP ###
playlog.groupBy(playlog.columns) \
  .agg(F.collect_list('timestamp').alias('ids')) \
  .select(F.size('ids').alias('ids_size')) \
  .where(F.col('ids_size') > 1) \
  .describe().toPandas()

'''Alternative 1 (without the F. syntax)

tmp = playlog.groupBy(playlog.columns).count().orderBy('count', ascending=False)
tmp.select('count').where(tmp['count']>1).describe().toPandas()

Alternative 2 (with SQL)

playlog.createOrReplaceTempView('playlog_table')
display(spark.sql("""WITH subrequest AS 
                  (SELECT COUNT(timestamp) AS count_tstp, timestamp, user, song
                  FROM playlog_table
                  GROUP BY timestamp, user, song
                  HAVING count_tstp > 1)
                  
                  SELECT COUNT(count_tstp) AS count, AVG(count_tstp) AS mean, STD(count_tstp) AS stddev, MIN(count_tstp) AS min, MAX(count_tstp) AS max 
                  FROM subrequest"""))

### END STRIP ###

Unnamed: 0,summary,ids_size
0,count,80352.0
1,mean,2.538866487455197
2,stddev,1.6539289063819822
3,min,2.0
4,max,66.0


In [None]:
## Alternative 1 (without the F. syntax)

#tmp = playlog.groupBy(playlog.columns).count().orderBy('count', ascending=False)
#tmp.select('count').where(tmp['count']>1).describe().toPandas()

### Other checks

In [None]:
#order dataframe by ascending `timestamp` and show the first 5 rows
### BEGIN STRIP ###
playlog.orderBy('timestamp').show(5)
### END STRIP ###

Do you see anything suspicious?

---

The first timestamp is negative, and it seems like it's the only one.  
We will make sure there aren't other like this.

In [None]:
#count the number of rows with a negative timestamp
### BEGIN STRIP ###
playlog.where(playlog['timestamp'] < 0).count()
### END STRIP ###

As expected, only one such negative timestamp. Since we have only one we can actually `.collect(...)` it.

In [None]:
#collect the problematic rows
### BEGIN STRIP ###
playlog.where(playlog['timestamp'] < 0).collect()
### END STRIP ###

There's only one problematic value among more than 25M.  This negative timestamp is an error, as such the real value is missing. We could try to reconstruct the real value but that would be a really tedious task, since it's one value over 25M, we will simply remove it.

## Removing the row with a negative timestamp

We will use our new knowledge about the data to perform some preprocessing.  

Our pipeline will have 2 steps:
1. Remove duplicates (123651 rows)
2. Remove row with negative timestamps (1 row)

We will call our new DataFrame `playlog_processed` and save it to S3 in parquet format.

In [None]:
# filter out:
#       - duplicated values
#       - rows with negative timestamp
#       and save the result to a new DataFrame: `playlog_processed`
#       Finally, print out the number of rows in this DataFrame
### BEGIN STRIP ###
playlog_processed = (playlog.dropDuplicates() \
                     .filter(~(playlog.timestamp < 0)))
playlog_processed.count()
### END STRIP ###

In [None]:
ACCESS_KEY_ID = ""# enter your S3 access key
SECRET_ACCESS_KEY = "" # enter your s3 secret key

hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", ACCESS_KEY_ID)
hadoop_conf.set("fs.s3a.secret.key", SECRET_ACCESS_KEY)
hadoop_conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") 

### END STRIP ###
output_path = 's3://name_of_your_bucket/name_of_your_file.parquet'

In [None]:
#save the processed DataFrame to S3 using the parquet format
### BEGIN STRIP ###
playlog_processed.write \
  .parquet(output_path, mode='overwrite')
### END STRIP ###