# Data cleaning and profiling for reddit post dataset


First we read the csv file. Since the column of body contains text information, `multiLine` is necessary to avoid a mess in format.

In [2]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import spark.implicits._

val filePath = "bitcoin_reddit_all.csv"
val rawDF = spark.read.option("header", "true")
    .option("inferSchema", "true")
    .option("multiLine", "true")
    .option("escape", "\"")
    .csv(filePath)
    
rawDF.show()

In [3]:
rawDF.printSchema()

## Checking on errors and formats

Since there are several different columns of time, we don't have to drow the data if one of record is missing.
Also, checking for null values before dropping duplicated columns helps me reduce data loss.


Here, we found that this dataset is already cleaned. All records have its own id, date, time and unix timestamp, we can simply start data filtering and profiling in the next step.

In [5]:
val errorRows = rawDF.filter(col("_c0").isNull)
 errorRows.cache().count()

In [6]:
val errorRows = rawDF.filter(col("datetime").isNull && col("date").isNotNull)
 errorRows.cache().count()

In [7]:
val errorRows = rawDF.filter(col("date").isNull && col("datetime").isNotNull)
 errorRows.cache().count()

In [8]:
val errorRows = rawDF.filter(col("body").isNull)
 errorRows.cache().count()

We start from selecting useful information. 

 | Column Name| Description |
 | :----------: | :----------: |  
 | _c0 | Id |
 | datetime | Timestamp |
 | score:| Upvotes or Downvotes |
 | controversiality | Controversiality |
 | body | Comment text |

In [10]:

val baseDF = rawDF.select(
            "_c0",
            "datetime",
            "score",
            "controversiality",
            "body")
baseDF.cache().count

In [11]:
baseDF.columns

## Filtering data according to timestamp

Currently the start and end time is set to include all the data. Since We need to join other datasets later in this project, we will need to find the data within the same time range. 

In this step, we convert datatime into the dype of timestamp and select them within given range.

In [13]:
import org.apache.spark.sql.functions.to_timestamp
val dfWithTimestamp = baseDF.withColumn("timestamp", to_timestamp(col("datetime"), "yyyy-MM-dd HH:mm:ss")).drop("datetime")


In [14]:
z.show(dfWithTimestamp)

In [15]:
dfWithTimestamp.printSchema()

In [16]:
val startTime = "2009-1-1 00:00:00"
val endTime = "2019-12-30 00:00:00"
val filteredDateDF = dfWithTimestamp.filter(col("timestamp").between(startTime, endTime))

In [17]:
z.show(filteredDateDF)

## Body message cleanning

Looking at the first few comments, it definetly appears we will need to do some processing on the text to clean it up before pushing through a sentiment analyzer.

A couple of items we should be sure to handle:
- urls
- special characters
- new lines
- foreign languages 
- numbers

In [19]:
val cleanedBodyDF = filteredDateDF
  .withColumn("body", regexp_replace($"body", "[^\\w\\s.?!]", "")) // Remove special characters
  .withColumn("body", regexp_replace($"body", "\\d", "")) // Remove digits
  .withColumn("body", regexp_replace($"body", "\\n", " ")) // Replace new lines with space
  .withColumn("body", regexp_replace($"body", "http.*\\s", " ")) // Remove URLs

In [20]:
cleanedBodyDF.cache().count


In [21]:
z.show(cleanedBodyDF.select("body"))

 
## Text Body Sentiment

Currently untouched since we need to keep the sentiment model of two different dataset the same.

## Data profiling 

Describle this dataset

### Year & Count profiling

In [25]:
import org.apache.spark.sql.functions.{min, max}

val TimestampStatsDF = cleanedBodyDF.agg(
  min($"timestamp").alias("Min Date"),
  max($"timestamp").alias("Max Date")
)

z.show(TimestampStatsDF)

In [26]:
import org.apache.spark.sql.functions.{year, month, dayofweek}

// Example: Count of records per year
val YearDistributionDF =  cleanedBodyDF.groupBy(year($"timestamp").alias("Year")).count().orderBy("Year")

z.show(YearDistributionDF)

This dataset contians posts on reddit related to Bitcoin from 2009-5-8 to 2019-12-29.

The number of posts grows fast in 2011 and keeps increasing in the next several years. In year 2017, it reaches the highest point and then decreases in the next two years. This might indicate the popularity of Bitcoin. 

### Score profiling

After sentiment, the score of posts with different attitute and year can be described in detail.

In [29]:
z.show(cleanedBodyDF.select("score").describe())



### Controversiality profiling

In [31]:
val avgControversialityDf = dateDF.groupBy(year($"timestamp")).agg(mean("controversiality").alias("avg_controversiality"))

z.show(avgControversialityDf)



In [32]:
val dateDF = cleanedBodyDF.withColumn("date", date_format($"timestamp", "yyyy-MM"))
val avgControversialityDf = dateDF.groupBy("date").agg(mean("controversiality").alias("avg_controversiality"))

z.show(avgControversialityDf)

Obviously, a sharp increase of controversiality of this topic appears in 2012. Then, it maintains at a relatively low level. Which might match the record of price change and twitter change of Bitcoin in 2012.

In [34]:
val outputPath = "bitcoin_reddit_clean.csv"

cleanedBodyDF.write.option("header", "true") .mode("overwrite").csv(outputPath)


In [35]:
val shortDF = cleanedBodyDF.limit(50)
shortDF.show()

In [36]:
val outputPathShort = "bitcoin_reddit_clean_short.csv"

cleanedBodyDF.write.option("header", "true") .mode("overwrite").csv(outputPathShort)