# Data Wrangling

Here we will be cleaning and reformating the twitter text column present in the traindf_filtered dataframe obtained in the previous step. Here we will be removing some unwanted elements in the text column so that it will not impact the performance of our machine learning model.

We're going to have to do the following to the data in the text column:

- Remove Email and URLs.
- Extract and then remove user-names (@mentions).
- Extract and then remove hash-tags (#hash-tag).
- For HTML Decoding, we have created a UDF (User Defined Function).
- Finally we have removed any null values introduced in the text column after doing the data cleaning steps mentioned above.

We have used regular expressions here to perform this operations along with a UDF (User Defined Function).

# Initializing spark instance from a JupyterLab environment:

In [1]:
%%time

import findspark
findspark.init()


CPU times: total: 0 ns
Wall time: 2.99 ms


# Importing the necessary libraries:

In [2]:
%%time

import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark import StorageLevel
from pyspark.sql import functions as f
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.sql import DataFrame
import re


CPU times: total: 31.2 ms
Wall time: 249 ms


# Creating a spark session:

In [3]:
%%time

spark = SparkSession \
        .builder \
        .appName("Data Wrangling") \
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
        .config("spark.driver.memory","40g") \
        .config("spark.executor.memory","50g") \
        .master("local") \
        .getOrCreate()

CPU times: total: 0 ns
Wall time: 4.01 s


# For HTML Decoding, we have created a UDF (User Defined Function).

In [None]:
%%time

import html

@f.udf
def html_unescape(s: str):
    if isinstance(s, str):
        return html.unescape(s)
    return s


# Defining regular expressions for performing data wrangling tasks

In [5]:
%%time

user_regex = r"(@\w{1,15})"
hashtag_regex = r"(#\w{1,})"
url_regex = r"((https?|ftp|file):\/{2,3})+([-\w+&@#/%=~|$?!:,.]*)|(www.)+([-\w+&@#/%=~|$?!:,.]*)"
email_regex = r"[\w.-]+@[\w.-]+\.[a-zA-Z]{1,}"

CPU times: total: 0 ns
Wall time: 0 ns


# Creating a function to perform Data Cleaning and Data Wrangling tasks.

In [6]:
%%time

def clean_data(df):
    df = (
        df
        #.withColumn("Original_text", f.col("text")) # including original text for my reference
        .withColumn("text", f.regexp_replace(f.col("text"), url_regex, "")) # replacing urls with empty string
        .withColumn("text", f.regexp_replace(f.col("text"), email_regex, "")) # replacing email with empty strings
        .withColumn("text", f.regexp_replace(f.col("text"), user_regex, "")) # replacing @<user_name> with empty string
        .withColumn("text", f.regexp_replace(f.col("text"), "#", " ")) # replacing '#' with space
        .withColumn("text", html_unescape(f.col("text")))  # removing html using UDF 
        .withColumn("text", f.regexp_replace(f.col("text"), "[^a-zA-Z']", " ")) # remove all numbers
        .withColumn("text", f.regexp_replace(f.col("text"), r'\s{1,}', ' ')) # replace consecutive spaces (1 to any number) with a single space
        .withColumn("text", f.trim(f.col("text"))) # removing leading and trailing whitespaces
        .filter("text != ''") # removing empty strings
    )
    return df

CPU times: total: 0 ns
Wall time: 0 ns


# Importing data from traindf_reduced collection present in the mongoDB Compass.

In [7]:
%%time

mongo_ip = "mongodb://localhost:27017/streaming."

df_raw = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", mongo_ip + "traindf_filtered").load()

df_raw.createOrReplaceTempView("train_df")

df_raw = spark.sql("SELECT polarity,text FROM train_df;")

CPU times: total: 0 ns
Wall time: 3.44 s


# Performing the above defined cleaning tasks on the df_raw data set.

In [8]:
%%time

df_cleaned = clean_data(df_raw)

CPU times: total: 0 ns
Wall time: 153 ms


# Uploading the cleaned data into mongoDB Compass:

In [9]:
%%time

df_cleaned.write \
  .format("com.mongodb.spark.sql.DefaultSource") \
  .option("uri", mongo_ip + "df_cleaned") \
  .mode("overwrite") \
  .option("partitionKey", "polarity") \
  .save()

CPU times: total: 0 ns
Wall time: 40 s
