# Data Wrangling and Transformation

This wrangling is based on exploratory data analysis presented in the EDA-initial Notebook that is in the same folder as this one.   Some additional ad hoc exploration has guided some of the decisions.    Both of these notebooks will be updated from time to time as more data is collected.

In a production system, it would be better to specify all the processing steps before requesting any output, because the optimizer can take the multiple steps into account.

In the interest of clairity and observability, I am breaking up the steps here.



## Initialization

In [1]:
spark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1582306697164_0005,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f52e3821b00>

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1582306697164_0005,pyspark,idle,Link,Link,✔


In [3]:
from pyspark.sql.functions import col, input_file_name, udf, expr
from pyspark.sql.functions import sum as spark_sum

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
from pyspark.sql.functions import date_format

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read in all data from the bucket

In [5]:
raw_df = spark.read.json("s3://topic-sentiment-1/combined/*.json")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
raw_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3654655

First, we get rid of columns we don't need.

In [7]:
columns_to_drop = ['filename', 'image_url', 'localpath', 'title_page', 'title_rss']
clean_df = raw_df.drop(*columns_to_drop)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Next, remove data for sites that have been eliminated from consideration.

In [8]:
not_enough_data = ["'bloomberg.com'",
"'bostonglobe.com'",
"'mediate.com'",
"'nationalreview.com'",
"'theguardian.com'",
"'usatoday.com'"]

clause = "source_domain not in (" + ', '.join(not_enough_data) + ")"

clean_df = clean_df.filter(clause)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
clean_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3652716

Next, we get rid of duplicates.  The crawling process will often get the same URL multiple times because we run it under different conditions, and there is overlap when we do an incremental crawl.  We consider it to be a duplication if the URL and date_publish are the same.

In [10]:
clean_df = clean_df.dropDuplicates(['url', 'date_publish'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
clean_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2141599

In [12]:
clean_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2141599

Next we create a `published` column that contains the conversion of `date_publish` string into a timestamp.

In [13]:
clean_df = clean_df.withColumn("published", (col("date_publish").cast("timestamp")))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Create `year` and `month` columns for easy grouping

In [14]:
clean_df = clean_df.withColumn("year", (date_format(col("published"), 'yyyy').cast("int")))
clean_df = clean_df.withColumn("month", (date_format(col("published"), 'M').cast("int")))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We create a new column `text_or_desc` that will be used in analysis.  This uses the `text` column data, if present, and falls back to `description` if the `text` column was empty.

In [15]:
clean_df = clean_df.withColumn("text_or_desc",
                           expr("case when text IS NULL THEN description ELSE text END"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

As noted in the EDA-inital notebook, we are going to assume that the language is English (en) if the language was not specified.  We are excluding bbc.com where this was not a good assumption.

In [16]:
clean_df = clean_df.withColumn("language_guess",
                          expr("case when (language IS NULL AND source_domain NOT IN ('bbc.com')) THEN 'en' ELSE language END"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now we get rid of any rows where the date of publish, the title or the text/description is empty.

In [17]:
clean_df = clean_df.na.drop(subset=['date_publish', 'published', 'text_or_desc', 'title'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
clean_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2049774

Get rid of some rogue duplicates

In [19]:
clean_df = clean_df.dropDuplicates(['text_or_desc'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
clean_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1859309

Finally, we are only going to use English articles in our analysis

In [21]:
clean_df = clean_df.where("language_guess = 'en'")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
clean_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1799961

# Output

Write the results to a bucket.

In [23]:
clean_df.write.parquet("s3://topic-sentiment-1/clean-data", mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
clean_df.write.json("s3://topic-sentiment-1/clean-json-data", mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error occurred while calling o121.json.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at or

In [25]:
sample_df = clean_df.sample(False, 0.01)
sample_df.write.json("s3://topic-sentiment-1/sample-json-data", mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
Invalid status code '400' from https://172.31.10.117:18888/sessions/4/statements/24 with error payload: {"msg":"requirement failed: Session isn't active."}


In [26]:
sample_df.count()

VBox()

An error was encountered:
Session 4 unexpectedly reached final status 'dead'. See logs:
stdout: 

stderr: 
20/02/21 21:20:58 INFO TaskSetManager: Starting task 8.0 in stage 29.0 (TID 3198, ip-172-31-3-100.ec2.internal, executor 1, partition 8, NODE_LOCAL, 7767 bytes)
20/02/21 21:20:58 INFO TaskSetManager: Starting task 9.0 in stage 29.0 (TID 3199, ip-172-31-3-100.ec2.internal, executor 2, partition 9, NODE_LOCAL, 7767 bytes)
20/02/21 21:20:58 INFO TaskSetManager: Starting task 10.0 in stage 29.0 (TID 3200, ip-172-31-3-100.ec2.internal, executor 1, partition 10, NODE_LOCAL, 7767 bytes)
20/02/21 21:20:58 INFO TaskSetManager: Starting task 11.0 in stage 29.0 (TID 3201, ip-172-31-3-100.ec2.internal, executor 2, partition 11, NODE_LOCAL, 7767 bytes)
20/02/21 21:20:58 INFO TaskSetManager: Starting task 12.0 in stage 29.0 (TID 3202, ip-172-31-3-100.ec2.internal, executor 1, partition 12, NODE_LOCAL, 7767 bytes)
20/02/21 21:20:58 INFO TaskSetManager: Starting task 13.0 in stage 29.0 (TID 3203,