# Data processing and cleaning

### 1. Environment setup - initialization of Spark Session, installing libraries, and modules

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('DataFrame') \
    .master('local[*]') \
    .getOrCreate()

In [None]:
%%configure -f
{"conf": {
    "spark.jars.packages": "com.databricks:spark-xml_2.12:0.16.0",
    "spark.jars": "https://repo1.maven.org/maven2/com/databricks/spark-xml_2.12/0.16.0/spark-xml_2.12-0.16.0.jar"
    }
}

In [None]:
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, BooleanType, TimestampType
import pyspark.sql.functions as f

### 2. Creating DataFrames and imposing data schema

#### a) Badges

In [None]:
schema_badges = StructType([StructField("_Class", IntegerType()),\
                            StructField("_Date", TimestampType()),\
                            StructField("_Id", IntegerType()),\
                            StructField("_Name", StringType()),\
                            StructField("_TagBased", BooleanType()),\
                            StructField("_UserId", IntegerType())])

Badges_df = spark.read.format("com.databricks.spark.xml")\
 .option("rowTag", "row")\
 .option("charset", "UTF8")\
 .schema(schema_badges)\
 .option("treatEmptyValuesAsNulls", "true")\
 .load("s3://bigdata-project-data-us-east-1/Badges.xml")

Badges_df.printSchema()
Badges_df.show(truncate=False)

#### b) Comments

In [None]:
schema_comments = StructType([StructField("_ContentLicense", StringType()),\
                            StructField("_CreationDate", TimestampType()),\
                            StructField("_Id", IntegerType()),\
                            StructField("_PostId", IntegerType()),\
                            StructField("_Score", IntegerType()),\
                            StructField("_Text", StringType()),\
                            StructField("_UserDisplayName", StringType()),\
                            StructField("_UserId", IntegerType())])

Comments_df = spark.read.format("com.databricks.spark.xml")\
 .option("rowTag", "row")\
 .option("charset", "UTF8")\
 .schema(schema_comments)\
 .option("treatEmptyValuesAsNulls", "true")\
 .load("s3://bigdata-project-data-us-east-1/Comments.xml")

Comments_df.printSchema()
Comments_df.show(3, vertical=True, truncate=False)

#### c) PostHistory

In [None]:
schema_posthistory = StructType([StructField("_Comment", StringType()),\
                            StructField("_ContentLicense", StringType()),\
                            StructField("_CreationDate", TimestampType()),\
                            StructField("_Id", IntegerType()),\
                            StructField("_PostHistoryTypeId", IntegerType()),\
                            StructField("_PostId", IntegerType()),\
                            StructField("_RevisionGUID", StringType()),\
                            StructField("_Text", StringType()),\
                            StructField("_UserDisplayName", StringType()),\
                            StructField("_UserId", IntegerType())])

PostHistory_df = spark.read.format("com.databricks.spark.xml")\
 .option("rowTag", "row")\
 .option("charset", "UTF8")\
 .schema(schema_posthistory)\
 .option("treatEmptyValuesAsNulls", "true")\
 .load("s3://bigdata-project-data-us-east-1/PostHistory.xml")

PostHistory_df.printSchema()
PostHistory_df.show(3, vertical=True)

#### d) PostLinks

In [None]:
schema_postlinks = StructType([StructField("_CreationDate", TimestampType()),\
                            StructField("_Id", IntegerType()),\
                            StructField("_LinkTypeId", IntegerType()),\
                            StructField("_PostId", IntegerType()),\
                            StructField("_RelatedPostId", IntegerType())])

PostLinks_df = spark.read.format("com.databricks.spark.xml")\
 .option("rowTag", "row")\
 .option("charset", "UTF8")\
 .schema(schema_postlinks)\
 .option("treatEmptyValuesAsNulls", "true")\
 .load("s3://bigdata-project-data-us-east-1/PostLinks.xml")

PostLinks_df.printSchema()
PostLinks_df.show(3, vertical=True, truncate=False)

#### e) Posts

In [None]:
schema_posts = StructType([StructField("_AcceptedAnswerId", IntegerType()),\
                            StructField("_AnswerCount", IntegerType()),\
                            StructField("_Body", StringType()),\
                            StructField("_ClosedDate", TimestampType()),\
                            StructField("_CommentCount", IntegerType()),\
                            StructField("_CommunityOwnedDate", TimestampType()),\
                            StructField("_ContentLicense", StringType()),\
                            StructField("_CreationDate", TimestampType()),\
                            StructField("_FavoriteCount", IntegerType()),\
                            StructField("_Id", IntegerType()),\
                            StructField("_LastActivityDate", TimestampType()),\
                            StructField("_LastEditDate", TimestampType()),\
                            StructField("_LastEditorDisplayName", StringType()), 
                            StructField("_LastEditorUserId", IntegerType()),\
                            StructField("_OwnerDisplayName", StringType()),\
                            StructField("_OwnerUserId", IntegerType()),\
                            StructField("_ParentId", IntegerType()),\
                            StructField("_PostTypeId", IntegerType()),\
                            StructField("_Score", IntegerType()),\
                            StructField("_Tags", StringType()),\
                            StructField("_Title", StringType()),\
                            StructField("_ViewCount", IntegerType())])

Posts_df = spark.read.format("com.databricks.spark.xml")\
 .option("rowTag", "row")\
 .option("charset", "UTF8")\
 .schema(schema_posts)\
 .option("treatEmptyValuesAsNulls", "true")\
 .load("s3://bigdata-project-data-us-east-1/Posts.xml")

Posts_df.printSchema()
Posts_df.show(3, vertical=True, truncate=False)

#### f) Tags

In [None]:
schema_tags = StructType([StructField("_Count", IntegerType()),\
                            StructField("_ExcerptPostId", IntegerType()),\
                            StructField("_CreationDate", TimestampType()),\
                            StructField("_Id", IntegerType()),\
                            StructField("_TagName", StringType()),\
                            StructField("_WikiPostId", IntegerType())])

Tags_df = spark.read.format("com.databricks.spark.xml")\
 .option("rowTag", "row")\
 .option("charset", "UTF8")\
 .schema(schema_tags)\
 .option("treatEmptyValuesAsNulls", "true")\
 .load("s3://bigdata-project-data-us-east-1/Tags.xml")

Tags_df.printSchema()
Tags_df.show(3, vertical=True)

#### g) Users 

In [None]:
schema_users = StructType([StructField("_AboutMe", StringType()),\
                            StructField("_AccountId", IntegerType()),\
                            StructField("_CreationDate", TimestampType()),\
                            StructField("_DisplayName", StringType()),\
                            StructField("_DownVotes", IntegerType()),\
                            StructField("_Id", IntegerType()),\
                            StructField("_LastAccessDate", TimestampType()),\
                            StructField("_Location", StringType()),\
                            StructField("_Reputation", IntegerType()),\
                            StructField("_UpVotes", IntegerType()),\
                            StructField("_Views", IntegerType()),\
                            StructField("_WebsiteUrl", StringType())])

Users_df = spark.read.format("com.databricks.spark.xml")\
 .option("rowTag", "row")\
 .option("charset", "UTF8")\
 .schema(schema_users)\
 .option("treatEmptyValuesAsNulls", "true")\
 .load("s3://bigdata-project-data-us-east-1/Users.xml")

Users_df.printSchema()
Users_df.show(3, vertical=True)

#### h) Votes

In [None]:
schema_votes = StructType([StructField("_BountyAmount", IntegerType()),\
                            StructField("_CreationDate", TimestampType()),\
                            StructField("_Id", IntegerType()),\
                            StructField("_PostId", IntegerType()),\
                            StructField("_UserId", IntegerType()),\
                            StructField("_VoteTypeId", IntegerType())])

Votes_df = spark.read.format("com.databricks.spark.xml")\
 .option("rowTag", "row")\
 .option("charset", "UTF8")\
 .schema(schema_votes)\
 .option("treatEmptyValuesAsNulls", "true")\
 .load("s3://bigdata-project-data-us-east-1/Votes.xml")

Votes_df.printSchema()
Votes_df.show(3, vertical=True)

### 3. Data cleaning

#### 3.1. Clean column names

#### a) Badges

In [None]:
Badges_DF = Badges_df.select([f.col(colnames).alias(colnames.replace('_', '')) for colnames in Badges_df.columns])
Badges_DF.show()

#### b) Comments

In [None]:
Comments_DF = Comments_df.select([f.col(colnames).alias(colnames.replace('_', '')) for colnames in Comments_df.columns])
Comments_DF.show()

#### c) PostHistory

In [None]:
PostHistory_DF = PostHistory_df.select([f.col(colnames).alias(colnames.replace('_', '')) for colnames in PostHistory_df.columns])
PostHistory_DF.show(3, vertical=True)

#### d) PostLinks

In [None]:
PostLinks_DF = PostLinks_df.select([f.col(colnames).alias(colnames.replace('_', '')) for colnames in PostLinks_df.columns])
PostLinks_DF.show(truncate=False)

#### e) Posts

In [None]:
Posts_DF = Posts_df.select([f.col(colnames).alias(colnames.replace('_', '')) for colnames in Posts_df.columns])
Posts_DF.show(3, vertical=True, truncate=False)

#### f) Tags

In [None]:
Tags_DF = Tags_df.select([f.col(colnames).alias(colnames.replace('_', '')) for colnames in Tags_df.columns])
Tags_DF.show()

#### g) Users

In [None]:
Users_DF = Users_df.select([f.col(colnames).alias(colnames.replace('_', '')) for colnames in Users_df.columns])
Users_DF.show(3, vertical=True,truncate=False)

#### h) Votes

In [None]:
Votes_DF = Votes_df.select([f.col(colnames).alias(colnames.replace('_', '')) for colnames in Votes_df.columns])
Votes_DF.show()

#### 3.2. Clean text columns

#### a) Badges_DF - no cleaning needed

#### b) Comments_DF

##### - clean text columns from multiple spaces, special characters, newline characters etc.

In [None]:
# remove or replace multiple blank spaces, newline, carriage return and special characters 

Comments_DF_clean = Comments_DF.withColumn("Text",f.regexp_replace("Text"," +"," "))

In [None]:
# count records with multiple blank spaces after cleaning

Comments_DF_clean.where(Comments_DF_clean.Text.like("%  %")).count()

In [None]:
Comments_DF_clean.show(3, vertical=True,truncate=False)

#### - removing unnecesarry columns and records with NULL values in crucial columns

In [None]:
# remove rows where UserId has NULL values

Comments_DF_clean2=Comments_DF_clean.dropna(subset="UserId")

In [None]:
# remove column UserDisplayName
Comments_DF_clean3=Comments_DF_clean2.drop("UserDisplayName")

In [None]:
Comments_DF_clean3.show()

#### c) PostHistory_DF

##### - clean text columns from multiple spaces, special characters, newline characters etc.

In [None]:
# count records with multiple blank spaces

PostHistory_DF.where(PostHistory_DF.Text.like("%  %")).count()

In [None]:
# remove newline characters, carriage return characters and replace multiple blank spaces with just one space

PostHistory_DF_clean = PostHistory_DF.withColumn("Text",f.regexp_replace("Text"," +"," "))

In [None]:
# count records with multiple blank spaces after cleaning

PostHistory_DF_clean.where(PostHistory_DF_clean.Text.like("%  %")).count()

#### -removing unnecesarry columns and records with NULL values in crucial columns

In [None]:
PostHistory_DF_clean2=PostHistory_DF_clean.dropna(subset="UserId")
PostHistory_DF_clean3=PostHistory_DF_clean2.drop("UserDisplayName")
PostHistory_DF_clean3.show(3,vertical=True,truncate=False)

#### d) PostLinks_DF - no cleaning needed

In [None]:
# check on NULL values 

PostLinks_DF.where(PostLinks_DF.CreationDate.isNull()).count()

In [None]:
PostLinks_DF.where(PostLinks_DF.Id.isNull()).count()

In [None]:
PostLinks_DF.where(PostLinks_DF.LinkTypeId.isNull()).count()

In [None]:
PostLinks_DF.where(PostLinks_DF.PostId.isNull()).count()

In [None]:
PostLinks_DF.where(PostLinks_DF.PostId.isNull()).count()

In [None]:
PostLinks_DF.where(PostLinks_DF.RelatedPostId.isNull()).count()

#### e) Posts_DF 

#### - remove "<" and ">" and replace "><" with ","

In [None]:
Posts_DF_clean=Posts_DF.withColumn("Tags",f.regexp_replace("Tags","><",",")) \
                       .withColumn("Tags",f.regexp_replace("Tags","^<","")) \
                       .withColumn("Tags",f.regexp_replace("Tags",">$",""))

#### - replace multiple blank spaces with just one space, remove newline and special characters 

In [None]:
Posts_DF_clean2=Posts_DF_clean.withColumn("Body",f.regexp_replace("Body"," +"," "))\
                              .withColumn("Body",f.regexp_replace("Body","<.*?>",""))\
                              .withColumn("Body",f.regexp_replace("Body","\n",""))\
                              .withColumn("Title",f.regexp_replace("Title"," +"," "))\
                              .withColumn("Title",f.regexp_replace("Title","<.*?>",""))\
                              .withColumn("Title",f.regexp_replace("Title","\n",""))

In [None]:
Posts_DF_clean2.show(10, vertical=True,truncate=False)

#### f) Tags_DF

In [None]:
# checking on NULL values

In [None]:
# count all rows

Tags_DF.count()

In [None]:
# count all rows with CreationDate NULL values

Tags_DF.where(Tags_DF.CreationDate.isNull()).count()

In [None]:
# remove column CreationDate

Tags_DFclean=Tags_DF.drop("CreationDate")
Tags_DFclean.show()

#### g) Users_DF

#### - replace multiple blank spaces with just one space, remove newline and special characters

In [None]:
Users_DF_clean=Users_DF.withColumn("AboutMe",f.regexp_replace("AboutMe"," +"," "))\
                              .withColumn("AboutMe",f.regexp_replace("AboutMe","<.*?>",""))\
                              .withColumn("AboutMe",f.regexp_replace("AboutMe","\n",""))\
                              .withColumn("AboutMe",f.regexp_replace("AboutMe","\r",""))

Users_DF_clean.show(10, vertical=True,truncate=False)

#### h) Votes_DF

In [None]:
# check if there are any deleted accounts (UserId=='-1' if user is deleted)
Votes_DF.where(Votes_DF.UserId=='-1').count()

#### - replace NULL values with '0' in column BountyAmount

In [None]:
Votes_DF_clean=Votes_DF.withColumn("BountyAmount",Votes_DF.BountyAmount).fillna(0, subset="BountyAmount")
Votes_DF_clean.show()

### 4. Saving data in Parquet format on S3

In [None]:
Badges_DF.write.parquet("s3://bigdata-parquet-data/Badges.parquet")
Comments_DF_clean3.write.parquet("s3://bigdata-parquet-data/Comments.parquet")
PostHistory_DF_clean3.write.parquet("s3://bigdata-parquet-data/PostHistory.parquet")
PostLinks_DF.write.parquet("s3://bigdata-parquet-data/PostLinks.parquet")
Posts_DF_clean2.write.parquet("s3://bigdata-parquet-data/Posts.parquet")
Tags_DFclean.write.parquet("s3://bigdata-parquet-data/Tags.parquet")
Users_DF_clean.write.parquet("s3://bigdata-parquet-data/Users.parquet")
Votes_DF_clean.write.parquet("s3://bigdata-parquet-data/Votes.parquet")